做一个项目需要实现mysql的数据导入到 es 中去,参考来诸多方案,之前是想着使用,ingest node 来进行 pipeline 的数据处理。但是ingest 不支持 jdbc。无法实现。考虑到数据不需要进行强同步,且数据单向的流动。
所以综上使用 Logstash 这种经典方案来进行数据的导入和清洗。
Pipeline 配置
这里直接给出来 Pipeline 的配置,input里面使用了 dbc 的插件,来配置连接信息的和账号。大体的配置如下:
input {
jdbc {
jdbc_driver_library => "/home/qspace/logstash-7.3.1/plugin/mysql-connector-java-8.0.17.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://172.172.66.0:3306/db_sgk"
# 你的账户密码
jdbc_user => "root"
jdbc_password => "password"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#jdbc_fetch_size => "50000"
jdbc_default_timezone => "Asia/Shanghai"
statement => "select * from test order by <some>"
schedule => "* * * * *"
}
}
output {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "test"
#document_type => "opinion_type"
document_id => "%{id}"
}
}
重点
这里对配置文件的几个重点来进行说明,
- 查询分页
因为导入表的数据量是非常的大,所以在查询时候需要进行分页。这里有两种方式jdbc_page_size
和jdbc_fetch_size
,前者是分多次查询使用 offset来进行,后者是一次查询之后分批次来导入。
所以 page 的话用于表较大的导入场景,fetch 用于表适中情况下的快速导入。
另外在之前使用的时候,出现导入数据不全的情况,后面差了相关资料,如果使用分页的话,需要使用 order 来进行排序。
- document_id 这个是ES里面的唯一主键,来进行数据的更新。
为了实现唯一主键有时候hash 比拼字变量拼接优雅。所以这里就使用 filter 段的fingerpoint 的方式来进行唯一id 的表示。可复用配置如下,这样可以来得到一个hid 的值来进行唯一主键的表示fingerprint { concatenate_sources => "true" source => ["f1", "f2"] method => "MD5" target => "hid" }