背景:公司订单数据越来越多,因为有海外业务,数据也比较分散,业务查询数据,需要遍历多个库,查询数据慢,需要做一个数仓来统一查询且需要延时控制
架构图如下:
测试环境,资源有限,没有做canal-server集群,为了改善性能,canal-server 跟canal-adapter之间 加了一层 kafka,图中没有展示
资源下载:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
提前安装好java 环境 版本推荐jdk1.8
步骤一:
先安装 kafka,zk,es 安装都比较简单,如果需要集群,请参考官网文档
1.1 zookeeper
解压
#tar -zxvf zookeeper-3.4.14.tar.gz &&cd zookeeper-3.4.14
#cp conf/zoo_sample.cfg conf/zoo.cfg
修改数据目录与日志目录
cat conf/zoo.cfg<< EOF
dataDir=/mnt/zookeeper/data
dataLogDir=/mnt/zookeeper/log
EOF
启动 zookeeper
# bin/zkServer.sh start
1.2 kafka
解压
#tar zxvf kafka_2.12-2.5.0.tgz && cd kafka_2.12-2.5.0
修改依赖的zookeeper地址,及其自己的port 跟日志存放路径
#cat config/server.properties << EOF
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9092
log.dirs=/mnt/kafka-logs
启动
./bin/kafka-server-start.sh -daemon ../config/server.properties
1.3 ES
创建 ES 用户 useradd es && passwd es && su - es
解压
#tar -zxvf elasticsearch-6.2.4.tar.gz && cd elasticsearch-6.2.4
修改配置文件
#cat conf/elasticsearch.yml <<EOF
cluster.name: test ##集群名称,如果是单机可不填
network.host: 0.0.0.0
http.port:9200
path.data: /data/es/data
path.logs: /data/es/logs
# 增加参数,使head插件可以访问es
http.cors.enabled: true
http.cors.allow-origin: "*"
EOF
启动
./bin/elasticsearch -d
验证:curl -X GET http://localhost:9200
1.4 安装elasticsearch-head 查看ES数据
wget https://github.com/mobz/elasticsearch-head/archive/master.zip
unzip master.zip && cd elasticsearch-head-master
修改配置文件
cat Gruntfile.js <<EOF
安装 node.js
#curl-sL https://rpm.nodesource.com/setup_8.x|bash-
#yum install-y nodejs
#npm install -g grunt-cli && npm install
启动
grunt server
IE 访问 http://ip:9400
1.5 安装canal 伪装 slave 并配置web界面
解压 tar zxvf canal.deployer-1.1.4.tar.gz -C /data/canal-server
解压 tar zxvf canal.admin-1.1.4.tar.gz -C /data/canal-web
先配置canal-web
cd /data/canal-web && cat /data/canal-web/conf/application.yml<<EOF
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: admin
password: admin
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456
配置元数据库,并初始化
mysql -uadmin -padmin <canal_manager.sql
启动:
/bin/start.sh
http://127.0.0.1:8089
配置canal
cd /data/canal-server && cat /data/canal-server/conf/canal.properties<<EOF
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
canal.admin.register.auto = true
canal.admin.register.cluster =
##配置kafka
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
启动 ./bin/start.sh
这个时候可以通过 canal-web 配置 instance http://127.0.0.1:8089
1.6安装canal-adapter
解压 tar zxvf canal.deployer-1.1.4.tar.gz -C /data/canal-deployer && cd /data/canal-deployer
修改配置文件:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
username: admin
password: admin
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es
hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
cluster.name: test
建立表的映射关系
cat es/conf/canal.yml <<
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: test
_type: _doc
_id: _id
pk: id
upsert: true
sql: "select t.id as _id, t.name, t.address from test t"
etlCondition: "where t._id>={100}"
commitBatch: 3000
启动
./bin/start.sh
通过 es-head 创建索引
{
"mappings":{
"_doc":{
"properties":{
"name":{
"type":"text" },
"address":{
"type":"text" }
}
}
}
}
测试