otter安装笔记
一、环境准备:
1、下载初始化otter库sql,地址:https://raw.github.com/alibaba/otter/master/manager/deployer/src/main/resources/sql/otter-manager-schema.sql,执行即可
2、下载manage和node,地址:https://github.com/alibaba/otter/releases/
3、manger和node可以分不同机器安装,可以放一台机器安装,所在机器必须安装jdk,配置好环境变量path,classpath都必须配置好
4、依赖zk,需要zk环境,单台,集群都可以
二、manger配置
1、mkdir ~/manager
2、tar zxvf manager.deployer-4.2.17.tar.gz
3、 vi ~/manager/conf/otter.properties
##修改为正确访问ip,生成URL使用,node的配置需要用到
otter.domainName= 内网ip
##为node连接manager的端口, node的配置需要用到
otter.communication.manager.port= 1099
##manage页面的访问端口
otter.port =8080
##修改为正确数据库信息
otter.database.driver.class.name = com.mysql.jdbc.Driver
otter.database.driver.url = jdbc:mysql://127.0.01:3306/ottermanager
otter.database.driver.username = root
otter.database.driver.password = hello
##配置zookeeper集群机器,配置一个即可
otter.zookeeper.cluster.default= 127.0.0.1:2181
其他配置默认即可
4、启动 ~/manager/bin/startup.sh,看日志tail -f ~/manager/logs/manager.log
5、访问otter.domainName:otter.port看看控制台,默认登录是admin/admin,sql中写死的,可以执行sql时修改
6、登录进去先配置zk集群
集群名字随便取
ZooKeeper集群:如果是单台,就配置一个也可以,多个以;号结束,单个也要分号结束
7、再配置node
机器名称:可以随意定义,方便自己记忆即可
机器ip:对应node节点将要部署的机器ip,如果有多ip时,可选择其中一个ip进行暴露. (此ip是整个集群通讯的入口,实际情况千万别使用127.0.0.1,否则多个机器的node节点会无法识别)
机器端口:对应node节点将要部署时启动的数据通讯端口,建议值:2088
下载端口:对应node节点将要部署时启动的数据下载端口,建议值:9090
外部ip :对应node节点将要部署的机器ip,存在的一个外部ip,允许通讯的时候走公网处理。
zookeeper集群:为提升通讯效率,不同机房的机器可选择就近的zookeeper集群.
node这种设计,是为解决单机部署多实例而设计的,允许单机多node指定不同的端口
注意点:
otter.domainName配置的地址,页面访问都会使用这个地址,如果配置了内网,或者ip外网不能访问,可以在manager.deployer-4.2.15\webapp\WEB-INF\common\uris.xml 配置文件中,修改一行 <serverURI>http://192.168.99.1:${otter.port}/</serverURI>,即可解决
三、node配置
1、 mkdir ~ /node
2、tar zxvf node.deployer-4.2.15.tar.gz
3、vi ~/otter/conf/otter.properties
# node的安装目录
otter.nodeHome = ${user.dir}/node
#manager的服务地址,manage中otter.properties配置的otter.domainName:otter.port
otter.manager.address = 127.0.0.1:1099
4、配置nid
通过manage配置node操作后,获取到了node节点对应的唯一标示,称之为node id,简称nid,比如我添加的机器对应序号为1
执行echo 1 >~/node/conf/nid , 保存到conf目录下的nid文件;一个node一个序号一个文件
执行好如图:
5、启动node,/node/bin/startup.sh
6、查看node是否启动
注意:因为node是注册到zk可能会慢,需要等待一会才能变成已启动
四、配置同步任务
1、环境准备
搭建一个数据库同步任务,源数据库必须开启binlog,并且binlog_format为ROW,设置server_id,即在mysql的配置文件新增/修改以下配置
log-bin=mysql-bin
binlog-format=ROW
server_id=12314123 //保证同步的源库/目标库 id不一样即可
2、添加canal
注意:如果zk修改了名称,这里需要把zookeeper集群下拉框重新选择编辑下,否则会出现同步异常相关的错误
3、添加数据源
一般我按库名新增,也可以从实例级别配置即可
4、配置表
table示例说明
单表配置:alibaba.product
分表配置:alibaba[1-64].product , alibaba.product[01-32]
正则配置:(.).(.)
schema name和table name都设置成.*表示全库同步
5、配置channel
6、配置pipeline
7、添加映射关系
如果不是自定义,就不需要填写EventProcessor
8、启动channel即可
启动后,点击channel看pipeline是否工作中,并且最后位点时间有时间,无时间显示代表未成功
扩展功能-自定义EventProcessor
因为公司是多马甲,而且主键id没有设置步长和起始id,导致无法用自带的otter字段映射进行主键同步,所以用了otter的扩展功能,根据主键id和马甲字段projectName进行联合主键同步,此功能不支持新增唯一判断,需要在目标表手动建立联合唯一索引
1、新建maven project
2、添加pom
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>shared.etl</artifactId>
<version>4.2.15</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/node.extend -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>node.extend</artifactId>
<version>4.2.15</version>
</dependency>
3、自己抽象一个父类
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;
/**
* 通用otter同步类,必须存在马甲字段,否则同步失败
*
* @author weihui
*
*/
public class OtterEventProcessor extends AbstractEventProcessor {
public OtterEventProcessor() {
}
public OtterEventProcessor(String[] replaceColumnArr) {
if (replaceColumnArr != null) {
Set<String> set = new HashSet<String>();
for (String columnName : replaceColumnArr) {
set.add(columnName);
}
this.replaceColumnList = set;
}
}
private EventColumn key;// 源表主键ID
private Map<String, EventColumn> map = new HashMap<String, EventColumn>();// 所有列
private String typeName = "project_name";// 马甲名,默认project_name
private Set<String> replaceColumnList;// 自定义同步字段
private void init(EventData eventData) {
key = eventData.getKeys().get(0);
for (EventColumn column : eventData.getColumns()) {
map.put(column.getColumnName(), column);
}
}
@Override
public boolean process(EventData eventData) {
System.out.println("db=" + eventData.getSchemaName() + ",table="
+ eventData.getTableName() + ",event="
+ eventData.getEventType());
if (eventData.getEventType() == EventType.INSERT
|| eventData.getEventType() == EventType.UPDATE
|| eventData.getEventType() == EventType.DELETE) {
init(eventData);
replaceKey(eventData);
replaceCols(eventData);
return true;
} else {
return false;
}
}
private void replaceKey(EventData eventData) {
EventColumn projectNameColumn = new EventColumn();
String typeValue = map.get(typeName).getColumnValue();
System.out.println(key.getColumnName() + "=" + key.getColumnValue()
+ "," + typeName + "=" + typeValue);
projectNameColumn.setColumnValue(typeValue);
projectNameColumn.setColumnType(Types.VARCHAR);
projectNameColumn.setColumnName(typeName);
projectNameColumn.setKey(true);
List<EventColumn> keys = new ArrayList<EventColumn>();
keys.add(key);
keys.add(projectNameColumn);
eventData.setKeys(keys);
eventData.setOldKeys(keys);//必须设置oldkeys,否则修改不了
}
private void replaceCols(EventData eventData) {
if (replaceColumnList == null || replaceColumnList.isEmpty()) {// 没有自定义同步字段,默认同步所有字段
replaceColumnList = map.keySet();
}
List<EventColumn> cols = new ArrayList<EventColumn>();
for (String columnName : replaceColumnList) {
EventColumn column = map.get(columnName);
if (typeName.equals(column.getColumnName())) {// 排除马甲字段,因为变成联合主键已经会替换此字段
continue;
}
if (column != null) {
cols.add(column);
}
}
eventData.setColumns(cols);
}
}
4、具体使用
- 如果是同步所有源表列,在自定义的Event Processor选择SOURCE写上如下代码即可
public class ChannelInfoProcessor extends OtterEventProcessor {
}
- 如果是同步源表指定列,在自定义的Event Processor选择SOURCE写上如下代码即可
public class AssetRepaymentProcessor extends OtterEventProcessor {
public AssetRepaymentProcessor() {
super(new String[] { "user_id", "asset_order_id", "repayment_amount",
"repaymented_amount", "repayment_principal",
"repayment_interest", "plan_late_fee", "true_late_fee",
"late_fee_apr", "credit_repayment_time", "period",
"repayment_time", "repayment_real_time", "late_fee_start_time",
"interest_update_time", "late_day", "created_at", "updated_at",
"auto_debit_fail_times", "renewal_count", "status",
"collection", "repayment_no", "grant_time",
"first_repayment_time" });
}
}
必须先把OtterEventProcessor 这个类可以打包一个jar,放入node的lib目录下,如果是相同的表同步可以用同一个类名,如果是不同表一定要新的类名,否则会同步出问题,因为字段不一样; 当然也可以直接写源码,不需要打jar包,这是封装的写法
自带监控报警功能
目前支持邮件发送
1、添加监控,发送人key在系统管理-系统参数中配置
2、配置接收邮箱
3、manage中的otter.properties配置发送邮箱,邮箱必须要开启smtp服务,然后配置授权码,有的邮箱是直接登录密码(比如QQ企业邮箱)
manage配置完,要重启,然后看看日志
注意:可以会报错,邮件发布出去504等报错
日常积累
1.node的logs目录下1,2,3目录,代表pipeline的序号,哪里pipeline报错,就在哪个目录下查看日志
2.一台机器可以配置多个node,直接启动即可,前提是同一台机器的端口配置不一样,从manage的node配置不同端口即可,不同机器可以配置同一个端口
假设同一台多node启动报错,可能是端口占用报错,需要修改上面的端口,找个不存在的端口使用
3.老版本可以直接升级新版本,注意manage的端口,node会配置这个端口,假设manage挂了,node没挂,同步的时候,页面统计的数字没法增长,可以看node.log有报错信息,dubbo连manage失败,但是不影响数据同步,启动manage后就好了
4.pipeline优化
5.升级node,manage可以直接升级
6.pipeline切换node,有可能会出现同步进度中的node还是老node,假设出现这种问题可以先停掉老node,在启动channel
7.新增node保存后可能会没有数据,可以看看是否有其他node有问题,目前我这边是把老node都停掉,再新增node就没问题
- node的同步进度,存在zk中
[zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/crm_canal/2/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"
10.1.1.17","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000006","position":391302782,"serverId":160623,"timestamp":1545880850000}}
相关报错积累
配置了jdk,但没有配置classpath
2018-07-18 09:58:13.392 [New I/O server worker #1-4] WARN c.a.d.common.threadpool.support.AbortPolicyWithReport - [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-127.0.0.1:2088, Pool Size: 50 (active: 50, core: 50, max: 50, largest: 50), Task: 113 (completed: 63), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://127.0.0.1:2088!, dubbo version: 2.5.3, current host: 127.0.0.1
线程池报错,听说是15版本的bug,14版本没问题
pid:2 nid:2 exception:setl:com.google.common.collect.ComputationException: java.lang.ArrayIndexOutOfBoundsException: 0
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:889)
at com.alibaba.otter.canal.common.zookeeper.ZkClientx.getZkClient(ZkClientx.java:34)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.getZkclientx(CanalInstanceWithManager.java:401)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.initMetaManager(CanalInstanceWithManager.java:121)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.(CanalInstanceWithManager.java:76)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1$1.(CanalEmbedSelector.java:139)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1.generate(CanalEmbedSelector.java:139)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:68)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:65)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingValueReference.compute(ComputingConcurrentHashMap.java:356)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.compute(ComputingConcurrentHashMap.java:182)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.getOrCompute(ComputingConcurrentHashMap.java:151)
at com.google.common.collect.ComputingConcurrentHashMap.getOrCompute(ComputingConcurrentHashMap.java:67)
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:885)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.start(CanalServerWithEmbedded.java:98)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector.start(CanalEmbedSelector.java:206)
at com.alibaba.otter.node.etl.select.SelectTask.startup(SelectTask.java:170)
at com.alibaba.otter.node.etl.select.SelectTask.run(SelectTask.java:126)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.util.Arrays$ArrayList.get(Arrays.java:3841)
at com.alibaba.otter.canal.common.zookeeper.ZooKeeperx.connect(ZooKeeperx.java:68)
zk名称被修改,需要在canal和node等重新选择一遍(即使一模一样),在保存下即可
pid:5 nid:1 exception:canal:有零花(ulinghua_cs_online):java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:94)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:137)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)
可能是之前同步任务没成功,我们用的rds,默认只保留18个小时,然后后面在启动的时候,同步进度已经过了18个小时,导致一直报这个错误,解放方案:
停掉同步任务
进入对应 Pipeline ,删除同步进度,重新启动即可,但是会丢失之前的数据,需要人工同步丢失的数据
pid:8 nid:1 exception:canal:微现金(vxianjin_online):com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4832)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:160)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:759)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:428)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:114)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:66)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser.parseAndProfilingIfNecessary(AbstractEventParser.java:337)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3$1.sink(AbstractEventParser.java:184)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:145)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)
排查下canel的用户名密码是否正确、数据源是否连接失败、白名单配置等等,实在都没问题就重启node解决