使用logstash同步MySQL数据到elasticsearch

需求:将MySQL的数据同步到es中,通过es提供搜索和数据分析服务
网上有很多教程,但是自己在使用过程中还是遇到了很多坑,为了避免遇到同样的问题
下面将自己学习过程和资料整理下来,方便自己查阅,也希望对有同样需求的同学提供帮助

上一篇文章简单介绍了logstash的安装步骤,这篇文章主要介绍logstash同步mysql数据到es的配置文件的编写

一、logstash 配置文件介绍

logstash 设计了自己的 DSL(Domain-Specific Language 领域特定语言) 包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断,字段引用等。

logstash{} 来定义区域,区域内可以包括插件区域定义,你可以在一个区域内定义多个插件,插件区域内则可以定义键值对设置,示例如下:

input {}
filter {}
output {}

logstash input插件
logstash filter插件
logstash output插件

二、同步MySQL数据到ES的配置文件logstash-mysql.conf

1. 创建logstash-mysql.conf

首先创建logstash的配置文件,直接从logstash-6.7.0/config/logstash-sample.conf中复制一份配置文件logstash-mysql.conf, 使用如下命令:

# 复制配置文件,并重命名
cp config/logstash-sample.conf mysql/logstash-mysql.conf
# 编辑配置文件
vim mysql/logstash-mysql.conf

修改内容如下:

input {
  # 用于接收控制台输入数据,进行测试
  stdin { }
  jdbc {
      # 数据库  数据库名称为elk,表名为book_table
      jdbc_connection_string => "jdbc:mysql://localhost:3306/elk?characterEncoding=UTF-8"
      # 数据库用户名
      jdbc_user => "root"
      # 数据库密码
      jdbc_password => "root"
      # jar包的位置
      jdbc_driver_library => "mysql/mysql-connector-java-5.1.40.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      # 开启分页
      jdbc_paging_enabled => "true"
      # 每页条数
      jdbc_page_size => "100"
      # 需要执行的mysql文件的位置,会执行文件中的sql语句
      #statement_filepath => "mysql/test.sql"
      statement => "select * from book_table where id > :sql_last_value"
      # 调度时间,默认为1分钟执行一次
      schedule => "* * * * *"
      #是否记录上次执行结果, 如果为true,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
      record_last_run => true
 
      #是否需要记录某个column 的值,如果 record_last_run 为true,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
      use_column_value => true
 
      #如果 use_column_value 为true,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
      tracking_column => id
 
      #指定文件,来记录上次执行到的 tracking_column 字段的值
      #比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.
      #我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
      last_run_metadata_path => "config/metadata"
 
      #是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
 
      #是否将 column 名称转小写
      #lowercase_column_names => false
      # 标识输入的类型,自定义字符串,可以用于输出类型选择
      type => "book_table"
    }
}
filter {
 # 默认使用的是Unix时间,我们需要将logstash同步数据到ES时间+8小时
#  数据库中的时间字段也可以这样设置
   ruby {
      code => "event.set('@timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
   }
}

output {
 if [type] == "book_table" {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
        password => "changeme"
    #按分钟
    #index => "mysql-%{+YYYY.MM.dd.HH.mm}"
    #按小时
    # 索引名字,必须小写
    index => "mysql-%{+YYYY.MM.dd.HH}"
    #index => "logstash-%{[fields][document_type]}-%{+YYYY.MM.dd}"
        #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    # 数据唯一索引(建议使用数据库主键 KeyID)
    document_id => "%{id}"
  }
  # 以json格式输出到控制台
  stdout {
        codec => json_lines
  }
 }
    
}

注意点:
1、 logstash默认使用的是Unix时间,我们需要将logstash同步数据到ES时间+8小时
2、数据库中的时间字段也可以这样设置

 filter {
   ruby {
     code => "event.set('@timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
    }
  }

3、logstash里面不支持驼峰字段
4、因为logstash里面使用了type关键字,所有数据库中的字段不能命名为type,如果有type需要使用别名替换

2. 配置说明

从上面的配置中可以看出,需要创建一个mysql目录,并且下载mysql-connector-java-5.1.40.jar用于连接mysql数据库,可以直接从maven仓库:https://mvnrepository.com/中搜索mysql-connector-java下载jar包

在logstash-6.7.0目录下创建mysql目录

# 创建mysql目录
mkdir mysql
# 进入mysql 目录
cd mysql
# 下载mysql-connector-java.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.40/mysql-connector-java-5.1.40.jar

3. 运行logstash

bin/logstash -f mysql/logstash-mysql.conf

参考文档:
Logstash 简易教程
https://blog.csdn.net/diaobatian/article/details/98624709

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352