一、安装ElasticSearch(下面统称es,版本6.0.0,环境windows10)
直接上下载地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.0.0.zip
解压后目录如下:
启动es,./bin/elasticsearch.bat
;启动成功如图
默认cluster_name是elasticsearch和端口9200可以修改,需要修改在config/elasticsearch.yml;上图
二、安装logstash
下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-6.0.0.zip
解压目录
先安装logstash-input-jdbc插件
./bin/logstash-plugin.bat install logstash-input-jdbc
在logstash目录下创建config-mysql,见图4
创建配置文件load_data.conf,配置文件随便取名,可以创建sql文件,也可以在conf配置文件中定义,具体下面有说明
先上配置文件内容
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/jfinal_club?characterEncoding=utf8&useSSL=false"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/mysql-connector-java-5.1.43.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/store_list.sql"
schedule => "* * * * *"
use_column_value => false
record_last_run => true
last_run_metadata_path => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/run/store_list"
type => "sl"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/jfinal_club?characterEncoding=utf8&useSSL=false"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/mysql-connector-java-5.1.43.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement => "select * from store where updated > date_add(:sql_last_value, interval 8 hour)"
schedule => "* * * * *"
use_column_value => false
record_last_run => true
last_run_metadata_path => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/run/store_s"
type => "st"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if[type] == "sl"{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "store_list"
document_type => "jdbc"
document_id => "%{store_id}}"
}
}
if[type] == "st"{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "store_st"
document_type => "jdbc"
document_id => "%{id}}"
}
}
stdout {
codec => json_lines
}
}
字段解释;具体的见:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
图6中有个run目录,在这里是用来存放:sql_last_value的时间值的
store_list.sql
先在es中生成index
PUT /store_list
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"jdbc": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"store_id": {
"type": "long"
},
"store_name": {
"type": "keyword"
},
"uid": {
"type": "text"
},
"telephone": {
"type": "text"
},
"street_id": {
"type": "text"
},
"detail": {
"type": "keyword"
},
"address": {
"type": "keyword"
},
"store_created": {
"type": "date"
},
"store_updated": {
"type": "date"
},
"detail_id": {
"type": "long"
},
"type_name": {
"type": "text"
},
"tag": {
"type": "keyword"
},
"overall_rating": {
"type": "text"
},
"navi_location_lng": {
"type": "double"
},
"navi_location_lat": {
"type": "double"
},
"detail_url": {
"type": "text"
},
"comment_num": {
"type": "integer"
},
"detail_created": {
"type": "date"
},
"detail_updated": {
"type": "date"
},
"location_id": {
"type": "long"
},
"lng": {
"type": "double"
},
"lat": {
"type": "double"
}
}
}
}
}
上面这种方式可以通过es管理工具执行,比如kibana->dev tools
;或者使用curl
的方式也可以
curl -XPUT "http://localhost:9200/store_list" -H 'Content-Type: application/json' -d'
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"jdbc": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"store_id": {
"type": "long"
},
"store_name": {
"type": "keyword"
},
"uid": {
"type": "text"
},
"telephone": {
"type": "text"
},
"street_id": {
"type": "text"
},
"detail": {
"type": "keyword"
},
"address": {
"type": "keyword"
},
"store_created": {
"type": "date"
},
"store_updated": {
"type": "date"
},
"detail_id": {
"type": "long"
},
"type_name": {
"type": "text"
},
"tag": {
"type": "keyword"
},
"overall_rating": {
"type": "text"
},
"navi_location_lng": {
"type": "double"
},
"navi_location_lat": {
"type": "double"
},
"detail_url": {
"type": "text"
},
"comment_num": {
"type": "integer"
},
"detail_created": {
"type": "date"
},
"detail_updated": {
"type": "date"
},
"location_id": {
"type": "long"
},
"lng": {
"type": "double"
},
"lat": {
"type": "double"
}
}
}
}
}'
然后通过http://localhost:9200/store_list/查看字段生成情况
store_list就是index,相当于数据库的database
然后回到logstash目录下
执行 nohup.exe ./bin/logstash.bat -f config-mysql/load_data.conf &
最好加上& 结尾,后台运行
然后看数据库同步情况
可能有些细节没能写全,如果在集成中遇到什么情况,可以评论指出