【elasticsearch实操】1、logstash导入mysql数据到es

es安装

【elasticsearch】2、kibana安装

logstash安装

【elasticsearch】3、logstash安装

下载mysql的connector的jar包

mysql的connector的jar包
我这里是用的是mysql-connector-java-5.1.38.jar

安装相关插件

#安装jdbc的入插件
bin/logstash-plugin install logstash-input-jdbc
#安装elasticsearch的出插件
bin/logstash-plugin install logstash-output-elasticsearch

编写logstash-mysql-es.conf文件用于启动数据传输

input {
  jdbc {
    # mysql相关jdbc配置
    jdbc_connection_string => "jdbc:mysql://127.0.0.1/database_name?useUnicode=true&characterEncoding=utf8&useSSL=true"
    jdbc_user => "xxx"
    jdbc_password => "xxx"

    # jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
    jdbc_driver_library => "../config/jar/mysql-connector-java-5.1.38.jar"
    # the name of the driver class for mysql
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => "50000"

    jdbc_default_timezone =>"Asia/Shanghai"

    # mysql文件, 也可以直接写SQL语句在此处,如下:
    statement => "select * from exp_store where create_time >= :sql_last_value"
    # statement_filepath => "./config/jdbc.sql"

    # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
    schedule => "* * * * *"
    #type => "jdbc"

    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    #record_last_run => true

    # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    use_column_value => true

    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
    tracking_column => "create_time"
    
    tracking_column_type => "timestamp"

    last_run_metadata_path => "./logstash_ineyes_exp_store_last_id"

    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false

    #是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
}

output {
  elasticsearch {
    hosts => "127.0.0.1:9200"
    index => "ineyes_exp_store"
    document_id => "%{id}"
    template_overwrite => true
  }

  # 这里输出调试,正式运行时可以注释掉
  stdout {
      codec => json_lines
  } 
}

启动数据传输

#启动数据传输
nohup bin/logstash -f config/transfer-config/logstash-mysql-es.conf > logs/logstash.out &

启动问题

block in load_driver_jars

Sending Logstash logs to /Users/cutie/workspace/elasticsearch/logstash-7.6.0/logs which is now configured via log4j2.properties
[2020-03-21T17:07:30,361][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-03-21T17:07:30,466][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.6.0"}
[2020-03-21T17:07:32,347][INFO ][org.reflections.Reflections] Reflections took 33 ms to scan 1 urls, producing 20 keys and 40 values 
[2020-03-21T17:07:33,115][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2020-03-21T17:07:33,281][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}
[2020-03-21T17:07:33,326][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
[2020-03-21T17:07:33,330][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-03-21T17:07:33,527][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1:9200"]}
[2020-03-21T17:07:33,571][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
[2020-03-21T17:07:33,604][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-03-21T17:07:33,609][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["/Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf"], :thread=>"#<Thread:0x4f7fd90b run>"}
[2020-03-21T17:07:33,631][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-03-21T17:07:33,649][INFO ][logstash.outputs.elasticsearch][main] Installing elasticsearch template to _template/logstash
[2020-03-21T17:07:34,622][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-03-21T17:07:34,683][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-03-21T17:07:34,976][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
/Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/cronline.rb:77: warning: constant ::Fixnum is deprecated
{ 2054 rufus-scheduler intercepted an error:
  2054   job:
  2054     Rufus::Scheduler::CronJob "* * * * *" {}
  2054   error:
  2054     2054
  2054     LogStash::PluginLoadingError
  2054     unable to load /Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf from :jdbc_driver_library, no such file to load -- /Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:151:in `block in load_driver_jars'
  2054       org/jruby/RubyArray.java:1814:in `each'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:144:in `load_driver_jars'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:166:in `open_jdbc_connection'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:242:in `execute_statement'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/inputs/jdbc.rb:309:in `execute_query'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/inputs/jdbc.rb:276:in `block in run'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:234:in `do_call'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:258:in `do_trigger'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:300:in `block in start_work_thread'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:299:in `block in start_work_thread'
  2054       org/jruby/RubyKernel.java:1446:in `loop'
  2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:289:in `block in start_work_thread'
  2054   tz:
  2054     ENV['TZ']: 
  2054     Time.now: 2020-03-21 17:08:00 +0800
  2054   scheduler:
  2054     object_id: 2052
  2054     opts:
  2054       {:max_work_threads=>1}
  2054       frequency: 0.3
  2054       scheduler_lock: #<Rufus::Scheduler::NullLock:0x403e43b8>
  2054       trigger_lock: #<Rufus::Scheduler::NullLock:0x7eba4eef>
  2054     uptime: 26.022160999999997 (26s23)
  2054     down?: false
  2054     threads: 2
  2054       thread: #<Thread:0x67e9c5b3>
  2054       thread_key: rufus_scheduler_2052
  2054       work_threads: 1
  2054         active: 1
  2054         vacant: 0
  2054         max_work_threads: 1
  2054       mutexes: {}
  2054     jobs: 1
  2054       at_jobs: 0
  2054       in_jobs: 0
  2054       every_jobs: 0
  2054       interval_jobs: 0
  2054       cron_jobs: 1
  2054     running_jobs: 1
  2054     work_queue: 0
} 

需要检查配置文件,检查数据库连接是否正确等

成功运行启动

启动后会定时根据sql拉取指定的数据已供后续的操作

2020-03-21T23:13:00,350][INFO ][logstash.inputs.jdbc     ][main] (0.006952s) SELECT version()
[2020-03-21T23:13:00,373][INFO ][logstash.inputs.jdbc     ][main] (0.019545s) SELECT version()
[2020-03-21T23:13:00,391][INFO ][logstash.inputs.jdbc     ][main] (0.013471s) SELECT count(*) AS `count` FROM (select * from exp_store where create_time >= '2020-01-16 15:00:34') AS `t1` LIMIT 1
[2020-03-21T23:13:00,417][INFO ][logstash.inputs.jdbc     ][main] (0.023679s) SELECT * FROM (select * from exp_store where create_time >= '2020-01-16 15:00:34') AS `t1` LIMIT 50000 OFFSET 0
{"time_out":null,"create_time":"2020-01-16T07:00:34.000Z","retention_pay_amount":0,"exp_mail_id":2114,"express_name":null,"receiver_city":null,"receiver_address":null,"exp_no":null,"retention_status":0,"paid_exp_delay_reward":false,"express_id":null,"receiver_province":null,"courier_id":null,"update_time":"2020-01-16T07:00:34.000Z","arriver_time":"2020-01-16T07:00:34.000Z","push_type":null,"exp_locker_no":"C46E7B1A1D1D","exp_store_success_reason":null,"quit_store_exp_reason":null,"exp_type":"EXP_LOCKER","receiver_phone":null,"exp_agent_company_id":-1,"reverse_receiver_phone":null,"exp_locker_detail_id":150,"exp_locker_manager_id":12,"paid_exp_delivery_pay":false,"exp_locker_detail_money":0,"express_status":9,"take_delivery_code":null,"waybill_no":null,"push_flag":null,"exp_locker_id":5,"shop_id":null,"id":2106,"receiver_district":null,"leave_time":null,"delivery_pay_amount":0,"@version":"1","receiver_name":null,"is_secret":false,"@timestamp":"2020-03-21T15:13:00.419Z"}

配置logstash以服务方式运行

https://www.jianshu.com/p/54cdddf89989
如何优雅关停logstash:https://www.elastic.co/guide/en/logstash/current/shutdown.html

数据库新增字段同步是否有影响

数据库新增字段,必须先在es中声明新字段的mapping信息,新增后再在数据库中新增并进行同步;原来的数据无影响

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容