有个同事在搭ES+Logstash来同步mysql的数据,问我,我也不懂,就网上学习一下,顺便弄了一下,在这分享一下流程,有啥不对或问题的,欢迎各位斧正。
Elasticsearch:是实时分布式全文搜索分析引擎,提供搜集、分析、存储数据三大功能,速度快;是一套开放REST和JAVA API等结构提供高效搜索功能,可扩展的分布式系统。它构建于Apache Lucene搜索引擎库之上。在这Elasticsearch作为后台数据的存储。
Logstash:是动态数据收集管道,拥有可扩展的插件生态系统,能够与 Elasticsearch 产生强大的协同作用。在这Logstash在其过程中担任搬运工的角色。
![https://upload-images.jianshu.io/upload_images/6791900-23e17655467a2af1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/864/format/webp](https://upload-images.jianshu.io/upload_images/6791900-23e17655467a2af1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/864/format/webp)
Logstash工作三个阶段:1,input数据输入端,可以接收来自任何地方的源数据。
2,Filter数据中转层,主要进行格式处理,数据类型转换、数据过滤、字段添加,修改等。
3,output是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用。
软件安装:
1,环境--1.8以上的JDK。
2,下载Elasticsearch和Logstash的软件压缩包,https://www.elastic.co/,这选择6.6.1版的Linux 64位压缩包解压到自己建立的目录下面,此处 放在目录/home/el/,然后执行就可以运行了。(<ins>注意选着的elasticsearch和logstash版本保持一致,不然会出问题</ins>)
启动和配置:
一,Elasticsearch
启动 ---进入到/home/el/elasticsearch-6.6.1/目录下,执行启动命令 bin/elasticsearch(<ins>*启动时候不能以root用户启动,否则会报错*</ins>),
启动成功时,可以看到IP的绑定(<ins>*10.112.76.31:9300 , 10.112.76.31:9200*</ins>)。
浏览器访问elasticsearch ---进入到 /home/el/elasticsearch-6.6.1/config/elasticsearch.yml,修改属性network.host: 0.0.0.0和http.port: 9200,
修改端口,是因为ES 每次启动时,端口可能会变(端口被占用,ES 自己会改端口),还不能正常启动 ES,并 提示端口被占用,就查看一下什么程序占用 9200 端口,kill 掉,重启 ES 就行。最后通过在浏览器通过地址 http://10.112.76.31:9200或http://localhost:9200访问elasticsearch。
二,Logstash
配置:
1,安装插件,由于这里是从mysql同步数据到elasticsearch,所以需要安装jdbc的入插件和elasticsearch的出插件:logstash-input- jdbc、logstash-output-elasticsearch(<ins>*Logstash5版本后自带input和out 插件,这边选用的是6.6.1,所以不用安装*</ins>)
2,下载mysql连接库,logstash是ruby开发的,所以这里要下载mysql的连接库jar包,从[官网](https://dev.mysql.com/downloads/connector/j/)下载,我这里下载的是:mysql-connector- java-5.1.46.jar,将下载好的mysql-connector-java-5.1.46.jar,放至/home/el/elasticsearch-6.6.1/config/目录下。
3,创建配置文件,在/home/el/elasticsearch-6.6.1/config/目录下,创建配置文件(mysql-to-es.conf):
<pre>input {
jdbc {
# mysql 数据库链接,shop为数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/shop"</pre>
<pre> # 用户名和密码
jdbc_user => "root"
jdbc_password => "root"</pre>
<pre> # jdbc连接mysql驱动的文件目录
jdbc_driver_library => "./config/mysql-connector-java-5.1.46.jar"</pre>
<pre> # 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"</pre>
<pre> jdbc_paging_enabled => "true"
jdbc_page_size => "50000"</pre>
<pre> # sql 文件路径, 也可以直接写SQL语句在此处,如下:
# statement => "select * from t_order where update_time >= :sql_last_value;"
statement_filepath => "./config/jdbc.sql"</pre>
<pre> # 同步的频率 各字段含义(由左至右)分、时、天、月、年,全部为默认含义为每分钟都更新
schedule => " * * * *"</pre>
<pre> # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
#record_last_run => true</pre>
<pre> # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,
# 该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true</pre>
<pre> # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "update_time"</pre>
<pre> tracking_column_type => "timestamp"</pre>
<pre> last_run_metadata_path => "./logstash_capital_bill_last_id"</pre>
<pre> # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false</pre>
<tt class="wikimodel-verbatim">#是否将 字段(column) 名称转小写 lowercase_column_names => false }</tt> }
<pre>output {
elasticsearch {</pre>
<pre> #es地址与端口</pre>
<pre> #hosts => ["192.168.100.101:9200","192.168.100.102:9200","192.168.100.103:9200"]</pre>
<pre> hosts => "10.112.76.31:9200"</pre>
<pre> #es索引(自己定义)</pre>
<pre> index => "mysql_order"</pre>
<pre> #表示按照查询出的字段key_id(一般用主键,防止重复)进行数据的同步</pre>
<pre> document_id => "%{key_id}"</pre>
<pre> template_overwrite => true</pre>
<pre>}
</pre>
4,启动,bin/logstash -f config/mysql-to-es.conf
最后,方便查看,Elasticsearch 可以安装head插件,通过http://...:9100,可以查看Elasticsearch里的数据。
扩展--日志收集平台elk与上述过程基本一致,不同之处:
1,Logstash的input不是连接mysql,而是去收集控制台或日志文件;
2,需要安装kibana软件,该软件与Elasticsearch对接,用户通过kibana查看Elasticsearch的日志