ElasticSearch 导入mysql数据

做一个项目需要实现mysql的数据导入到 es 中去,参考来诸多方案,之前是想着使用,ingest node 来进行 pipeline 的数据处理。但是ingest 不支持 jdbc。无法实现。考虑到数据不需要进行强同步,且数据单向的流动。
所以综上使用 Logstash 这种经典方案来进行数据的导入和清洗。

Pipeline 配置

这里直接给出来 Pipeline 的配置,input里面使用了 dbc 的插件,来配置连接信息的和账号。大体的配置如下:

input {
    jdbc {
      jdbc_driver_library => "/home/qspace/logstash-7.3.1/plugin/mysql-connector-java-8.0.17.jar"
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_connection_string => "jdbc:mysql://172.172.66.0:3306/db_sgk"
      # 你的账户密码
      jdbc_user => "root"
      jdbc_password => "password"

      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      #jdbc_fetch_size => "50000"

      jdbc_default_timezone => "Asia/Shanghai"
      statement => "select * from test order by <some>"
      schedule => "* * * * *"
    }
}

output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "test"
        #document_type => "opinion_type"
        document_id => "%{id}"
    }
}

重点

这里对配置文件的几个重点来进行说明,

  1. 查询分页
    因为导入表的数据量是非常的大,所以在查询时候需要进行分页。这里有两种方式jdbc_page_sizejdbc_fetch_size,前者是分多次查询使用 offset来进行,后者是一次查询之后分批次来导入。
    所以 page 的话用于表较大的导入场景,fetch 用于表适中情况下的快速导入。
    另外在之前使用的时候,出现导入数据不全的情况,后面差了相关资料,如果使用分页的话,需要使用 order 来进行排序。
  1. document_id 这个是ES里面的唯一主键,来进行数据的更新。
    为了实现唯一主键有时候hash 比拼字变量拼接优雅。所以这里就使用 filter 段的fingerpoint 的方式来进行唯一id 的表示。可复用配置如下,这样可以来得到一个hid 的值来进行唯一主键的表示

    fingerprint {
        concatenate_sources => "true"
        source => ["f1", "f2"]
        method => "MD5"
        target => "hid"
    }

    Fingerprint filter pluginedit

参考

留下点什么吧