(1)编写logstash的pipeline文件 abc.conf。
input {
stdin { }
jdbc {
jdbc_connection_string => "jdbc:mysql://ip:3306/db?serverTimezone=Asia/Shanghai"
#mysql账号
jdbc_user => "root"
#mysql密码
jdbc_password => "root"
jdbc_driver_library => "/resource/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
#设置jdbc时区
jdbc_default_timezone =>"Asia/Shanghai"
#使用其他字段追踪而不是时间
use_column_value => true
#追踪的字段
tracking_column => "id"
#开启分页查询
jdbc_paging_enabled => true
#分页大小
jdbc_page_size => "100"
#检索表的sql语句(根据last_update_time进行增量检索同步)
statement => "SELECT dish_id as id,dish_name as name,code,price,shop_id ,store_id ,1 as type,last_update_time FROM t_dish where last_update_time > :sql_last_value order by last_update_time desc"
#执行周期
schedule => "0/5 * * * * *"
}
}
input {
stdin { }
jdbc {
jdbc_connection_string => "jdbc:mysql://ip:3306/db?serverTimezone=Asia/Shanghai"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/resource/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
#设置jdbc时区
jdbc_default_timezone => "Asia/Shanghai"
#使用其他字段追总而不是时间
use_column_value => true
#追踪的字段
tracking_column => "last_update_time"
#追踪列的类型
tracking_column_type => "timestamp"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#根据最后更新时间增量同步数据
statement => "SELECT setmeal_id as id,setmeal_name as name,code,shop_id,store_id,2 as type,last_update_time FROM t_setmeal where last_update_time > :sql_last_value"
schedule => "0/5 * * * * *"
}
}
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => "ip:9200"
index => "dish"
document_id => "%{id}"
}
}
filter {
ruby {
code => "event.timestamp.time.localtime"
}
}
(2)将abc.conf放入/root/soft/base/data/logstash/pipeline/目录。
(3)重新启动logstash,观察索引库中是否同步了数据