canal:
1.配置mysql
canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,建议配置binlog模式为row.
配置如下
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
canal的原理是模拟自己为mysql slave,所以这里一定需要设置mysql slave的相关权限.
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
重启mysql服务即可
2.配置canal server
新建canal目录,下载canal,解压。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz
进入到canal/conf目录,新建目录brand,将example目录中的instance.properties拷贝到brand中。
进入brand中的instance.properties,修改配置如下:
canal.instance.dbUsername=canal#给canal配置的账号
canal.instance.dbPassword=canal #给canal配置的密码
canal.instance.connectionCharset = UTF-8#连接数据库用的字符集
canal.instance.defaultDatabaseName =test#默认的数据库名
canal.instance.enableDruid=false#是否使用druid
canal.instance.filter.regex=.\.. #mysql 数据解析关注的表,Perl正则表达式
修改canal/canal.properties中的配置
canal.destinations = brand
多个用,分隔。配置之后,启动canal会conf/brand目录里面的instance.properties。
启动canal server。
如果启动报错,确认一下
<1>mysqld中的server_id和canal有没有冲突
<2>canal.properties中的canal.instance.parser.parallelThreadSize没有有打开,这里我配置的值为256
3.配置elasticsearch
canal adapter 的 Elastic Search 版本支持6.x.x以上, 如需其它版本的es可替换依赖重新编译client-adapter.elasticsearch模块
下载es并解压
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz
es不能在root账户下执行,创建一个账户,给它分配权限。切换到该账户中,启动es,如果报错max virtual memory areas vm.maxmapcount [65530] is too low,则执行命令
sudo sysctl -w vm.max_map_count=262144
4.配置canal adapter
本文使用的adapter版本有明显bug,官网已经更新了,请留意官网更新,下文仅作参考。
新建canal_adapter目录,下载并解压。
解压,进入conf/application.yml作如下配置
srcDataSources:
defaultDS:
url://数据库的地址username:password:
canalAdapters:
instance: brand
groups:
groupId: g1
outerAdapters:
name: es #这是canal_adapters的内置实现
hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
properties:
cluster.name: elasticsearch # es cluster name
adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件。新增brand.yml,内容仿照下面的配置
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # cannal的instance或者MQ的topic
esMapping:
_index: mytest_user # es 的索引名称
_type: _doc # es 的doc名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_timeas_c_time, c.labelsas_labels from user a left join role b on b.id=a.role_id left join (select user_id, group_concat(label order by id desc separator';')aslabels from label group by user_id) c on c.user_id=a.id"
objFields:
_labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
_obj: object # json对象
etlCondition: "where a.c_time>='{}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
启动canal adapter。在es中新增相关的index和type,就可以实现了
参考文档:canal官方文档