mysql数据同步elasticsearch(es)全文检索容器;其它数据库同理

一、安装ElasticSearch(下面统称es,版本6.0.0,环境windows10)

直接上下载地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.0.0.zip

解压后目录如下:

图1

启动es,./bin/elasticsearch.bat ;启动成功如图


图2

默认cluster_name是elasticsearch和端口9200可以修改,需要修改在config/elasticsearch.yml;上图


图3

二、安装logstash

下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-6.0.0.zip

解压目录


图4

先安装logstash-input-jdbc插件 

./bin/logstash-plugin.bat install logstash-input-jdbc


图5

在logstash目录下创建config-mysql,见图4


图6

创建配置文件load_data.conf,配置文件随便取名,可以创建sql文件,也可以在conf配置文件中定义,具体下面有说明

先上配置文件内容

input {

    stdin {

    }

    jdbc {

      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/jfinal_club?characterEncoding=utf8&useSSL=false"

      jdbc_user => "root"

      jdbc_password => "root"

      jdbc_driver_library => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/mysql-connector-java-5.1.43.jar"

      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"

      jdbc_page_size => "50000"

      statement_filepath => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/store_list.sql"

      schedule => "* * * * *"

      use_column_value => false

      record_last_run => true

      last_run_metadata_path => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/run/store_list"

      type => "sl"

    }

jdbc {

      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/jfinal_club?characterEncoding=utf8&useSSL=false"

      jdbc_user => "root"

      jdbc_password => "root"

      jdbc_driver_library => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/mysql-connector-java-5.1.43.jar"

      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"

      jdbc_page_size => "50000"

      statement => "select * from store where updated > date_add(:sql_last_value, interval 8 hour)"

      schedule => "* * * * *"

      use_column_value => false

      record_last_run => true

      last_run_metadata_path => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/run/store_s"

      type => "st"

    }

}

filter {

    json {

        source => "message"

        remove_field => ["message"]

    }

}

output {

    if[type] == "sl"{

        elasticsearch {

hosts => ["127.0.0.1:9200"]

index => "store_list"

document_type => "jdbc"

document_id => "%{store_id}}"

        }

    }

if[type] == "st"{

        elasticsearch {

hosts => ["127.0.0.1:9200"]

index => "store_st"

document_type => "jdbc"

document_id => "%{id}}"

        }

    }

    stdout {

        codec => json_lines

    }

}

字段解释(来自官方文档)

clean_run

值类型是布尔值

默认值是 false

是否应保留之前的运行状态

columns_charset

值类型是散列

默认值是 {}

特定列的字符编码。该选项将覆盖:charset指定列的选项。

例:

输入{  jdbc { ...    columns_charset => { “column0” => “ISO-8859-1” } ... } }             

这只会将具有ISO-8859-1的column0转换为原始编码。

connection_retry_attempts

值类型是数字

默认值是 1

尝试连接到数据库的最大次数

connection_retry_attempts_wait_time

值类型是数字

默认值是 0.5

在连接尝试之间休眠的秒数

jdbc_connection_string

这是一个必需的设置。

值类型是字符串

此设置没有默认值。

JDBC连接字符串

jdbc_default_timezone

值类型是字符串

此设置没有默认值。

时区转换。SQL不允许timestamp字段中的时区数据。此插件会自动将您的SQL时间戳字段转换为Logstash时间戳,相对UTC时间采用ISO8601格式。

使用此设置将手动分配指定的时区偏移量,而不是使用本地计算机的时区设置。例如,您必须使用规范时区,美国/丹佛

jdbc_driver_class

这是一个必需的设置。

值类型是字符串

此设置没有默认值。

如果您使用的是Oracle JDBC,则需要根据https://github.com/logstash-plugins/logstash-input-jdbc/issues/43加载JDBC驱动程序类,例如“org.apache.derby.jdbc.ClientDriver”NB 驱动程序(ojdbc6.jar)正确的jdbc_driver_class是"Java::oracle.jdbc.driver.OracleDriver"

jdbc_driver_library

值类型是字符串

此设置没有默认值。

暂时将JDBC逻辑抽象为混合,以便在其他插件(输入/输出)中重复使用当某人包含此模块时调用此方法将这些方法添加到给定的基础中。JDBC驱动程序库路径到第三方驱动程序库。如果需要多个库,您可以通过逗号分隔它们。

如果未提供,Plugin将在Logstash Java类路径中查找驱动程序类。

jdbc_fetch_size

值类型是数字

此设置没有默认值。

JDBC提取大小。如果未提供,则将使用相应的驱动程序默认值

jdbc_page_size

值类型是数字

默认值是 100000

JDBC页面大小

jdbc_paging_enabled

值类型是布尔值

默认值是 false

JDBC启用分页

这将导致一个sql语句被分解成多个查询。每个查询将使用限制和偏移来共同检索完整的结果集。限制尺寸设置为jdbc_page_size。

请注意,查询之间不保证排序。

jdbc_password

值类型是密码

此设置没有默认值。

JDBC密码

jdbc_password_filepath

值类型是路径

此设置没有默认值。

JDBC密码文件名

jdbc_pool_timeout

值类型是数字

默认值是 5

连接池配置。在引发PoolTimeoutError之前等待获取连接的秒数(默认值为5)

jdbc_user

这是一个必需的设置。

值类型是字符串

此设置没有默认值。

JDBC用户

jdbc_validate_connection

值类型是布尔值

默认值是 false

连接池配置。使用前验证连接。

jdbc_validation_timeout

值类型是数字

默认值是 3600

连接池配置。验证连接的频率(以秒为单位)

last_run_metadata_path

值类型是字符串

默认值是 "/home/ph/.logstash_jdbc_last_run"

使用上次运行时间的文件路径

lowercase_column_names

值类型是布尔值

默认值是 true

是否强制标识符域的缩小

parameters

值类型是散列

默认值是 {}

例如,查询参数的哈希值 { "target_id" => "321" }

record_last_run

值类型是布尔值

默认值是 true

是否在last_run_metadata_path中保存状态

schedule

值类型是字符串

此设置没有默认值。

以Cron格式定期运行语句的时间表,例如:“* * * * *”(每分钟执行一次查询,每分钟执行一次)

没有默认的时间表。如果没有给出任何时间表,那么该语句只运行一次。

sequel_opts

值类型是散列

默认值是 {}

一般/供应商特定的续集配置选项。

可选连接池配置的示例max_connections - 连接池的最大连接数

特定于供应商的选项的示例可以在此文档页面中找到:https//github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc

sql_log_level

值可以是任何的:fatal,error,warn,info,debug

默认值是 "info"

记录SQL查询的日志级别,接受的值是常见的值,致命错误,警告,信息和调试。默认值是info。

statement

值类型是字符串

此设置没有默认值。

如果未定义,即使编解码器未被使用,Logstash也会投诉。要执行的语句

要使用参数,请使用命名参数语法。例如:

“SELECT * FROM MYTABLE WHERE id =:target_id”

在这里,“:target_id”是一个命名参数。您可以使用该parameters设置配置命名参数。

statement_filepath

值类型是路径

此设置没有默认值。

包含要执行的语句的文件的路径

tracking_column

值类型是字符串

此设置没有默认值。

如果跟踪列值而不是时间戳,那么将跟踪其值的列

tracking_column_type

值可以是任何的:numeric,timestamp

默认值是 "numeric"

跟踪列的类型。目前只有“数字”和“时间戳”

use_column_value

值类型是布尔值

默认值是 false

使用增量列值而不是时间戳

add_field

值类型是散列

默认值是 {}

添加一个字段到一个事件

codec

值类型是编解码器

默认值是 "plain"

用于输入数据的编解码器。输入编解码器是在数据进入输入之前解码数据的便捷方法,无需在Logstash管道中使用单独的筛选器。

enable_metric

值类型是布尔值

默认值是 true

在默认情况下,为特定插件实例禁用或启用度量标准日志记录,我们会记录所有可以使用的度量标准,但是您可以禁用特定插件的度量标准收集。

id

值类型是字符串

此设置没有默认值。

添加一个独特ID的插件配置。如果没有指定ID,Logstash会生成一个。强烈建议在您的配置中设置此ID。当你有两个或多个相同类型的插件时,这是特别有用的,例如,如果你有2个jdbc输入。在这种情况下添加命名标识将有助于在使用监视API时监视Logstash。

输入{  jdbc {    id => “my_plugin_id” } } 

tags

值类型是数组

此设置没有默认值。

为您的活动添加任意数量的任意标签。

这可以帮助稍后处理。

type

值类型是字符串

此设置没有默认值。

type为由此输入处理的所有事件添加一个字段。

类型主要用于过滤器激活。

该类型存储为事件本身的一部分,因此您也可以使用该类型在Kibana中搜索它。

如果您尝试在已经有一个事件的事件上设置类型(例如,当您将事件从发货方发送到索引器时),则新输入不会覆盖现有类型。托运人设置的类型即使在发送到另一个Logstash服务器时也会保留该事件的生命。

以上翻译是google翻译提供

具体的见:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

图6中有个run目录,在这里是用来存放:sql_last_value的时间值的

store_list.sql


图7

先在es中生成index

PUT /store_list

{

  "settings": {

    "number_of_shards": 3,

    "number_of_replicas": 1

  },

  "mappings": {

    "jdbc": {

      "properties": {

        "@timestamp": {

          "type": "date"

        },

        "@version": {

          "type": "keyword"

        },

        "store_id": {

          "type": "long"

        },

        "store_name": {

          "type": "keyword"

        },

        "uid": {

          "type": "text"

        },

        "telephone": {

          "type": "text"

        },

        "street_id": {

          "type": "text"

        },

        "detail": {

          "type": "keyword"

        },

        "address": {

          "type": "keyword"

        },

        "store_created": {

          "type": "date"

        },

        "store_updated": {

          "type": "date"

        },

        "detail_id": {

          "type": "long"

        },

        "type_name": {

          "type": "text"

        },

        "tag": {

          "type": "keyword"

        },

        "overall_rating": {

          "type": "text"

        },

        "navi_location_lng": {

          "type": "double"

        },

        "navi_location_lat": {

          "type": "double"

        },

        "detail_url": {

          "type": "text"

        },

        "comment_num": {

          "type": "integer"

        },

        "detail_created": {

          "type": "date"

        },

        "detail_updated": {

          "type": "date"

        },

        "location_id": {

          "type": "long"

        },

        "lng": {

          "type": "double"

        },

        "lat": {

          "type": "double"

        }

      }

    }

  }

}

上面这种方式可以通过es管理工具执行,比如kibana->dev tools

或者

curl -XPUT "http://localhost:9200/store_list" -H 'Content-Type: application/json' -d'

{

  "settings": {

    "number_of_shards": 3,

    "number_of_replicas": 1

  },

  "mappings": {

    "jdbc": {

      "properties": {

        "@timestamp": {

          "type": "date"

        },

        "@version": {

          "type": "keyword"

        },

        "store_id": {

          "type": "long"

        },

        "store_name": {

          "type": "keyword"

        },

        "uid": {

          "type": "text"

        },

        "telephone": {

          "type": "text"

        },

        "street_id": {

          "type": "text"

        },

        "detail": {

          "type": "keyword"

        },

        "address": {

          "type": "keyword"

        },

        "store_created": {

          "type": "date"

        },

        "store_updated": {

          "type": "date"

        },

        "detail_id": {

          "type": "long"

        },

        "type_name": {

          "type": "text"

        },

        "tag": {

          "type": "keyword"

        },

        "overall_rating": {

          "type": "text"

        },

        "navi_location_lng": {

          "type": "double"

        },

        "navi_location_lat": {

          "type": "double"

        },

        "detail_url": {

          "type": "text"

        },

        "comment_num": {

          "type": "integer"

        },

        "detail_created": {

          "type": "date"

        },

        "detail_updated": {

          "type": "date"

        },

        "location_id": {

          "type": "long"

        },

        "lng": {

          "type": "double"

        },

        "lat": {

          "type": "double"

        }

      }

    }

  }

}'

然后通过http://localhost:9200/store_list/查看字段生成情况

store_list就是index,相当于数据库的database


图8

然后回到logstash目录下

执行 nohup.exe ./bin/logstash.bat -f config-mysql/load_data.conf &

图9

最好加上& 结尾,后台运行

然后看数据库同步情况


图10

可能有些细节没能写全,如果在集成中遇到什么情况,可以评论指出

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容