otter相关

otter安装笔记

一、环境准备:

1、下载初始化otter库sql,地址:https://raw.github.com/alibaba/otter/master/manager/deployer/src/main/resources/sql/otter-manager-schema.sql,执行即可

2、下载manage和node,地址:https://github.com/alibaba/otter/releases/

3、manger和node可以分不同机器安装,可以放一台机器安装,所在机器必须安装jdk,配置好环境变量path,classpath都必须配置好

4、依赖zk,需要zk环境,单台,集群都可以

二、manger配置

1、mkdir ~/manager

2、tar zxvf manager.deployer-4.2.17.tar.gz

3、 vi ~/manager/conf/otter.properties

         ##修改为正确访问ip,生成URL使用,node的配置需要用到
         otter.domainName= 内网ip
        ##为node连接manager的端口, node的配置需要用到
         otter.communication.manager.port= 1099
         ##manage页面的访问端口
         otter.port =8080
         ##修改为正确数据库信息
         otter.database.driver.class.name = com.mysql.jdbc.Driver
         otter.database.driver.url = jdbc:mysql://127.0.01:3306/ottermanager
         otter.database.driver.username = root
         otter.database.driver.password = hello
       
         ##配置zookeeper集群机器,配置一个即可
         otter.zookeeper.cluster.default= 127.0.0.1:2181

其他配置默认即可

4、启动 ~/manager/bin/startup.sh,看日志tail -f ~/manager/logs/manager.log

5、访问otter.domainName:otter.port看看控制台,默认登录是admin/admin,sql中写死的,可以执行sql时修改

6、登录进去先配置zk集群

image.png

集群名字随便取
ZooKeeper集群:如果是单台,就配置一个也可以,多个以;号结束,单个也要分号结束

7、再配置node


image.png

机器名称:可以随意定义,方便自己记忆即可
机器ip:对应node节点将要部署的机器ip,如果有多ip时,可选择其中一个ip进行暴露. (此ip是整个集群通讯的入口,实际情况千万别使用127.0.0.1,否则多个机器的node节点会无法识别)
机器端口:对应node节点将要部署时启动的数据通讯端口,建议值:2088
下载端口:对应node节点将要部署时启动的数据下载端口,建议值:9090
外部ip :对应node节点将要部署的机器ip,存在的一个外部ip,允许通讯的时候走公网处理。
zookeeper集群:为提升通讯效率,不同机房的机器可选择就近的zookeeper集群.
node这种设计,是为解决单机部署多实例而设计的,允许单机多node指定不同的端口

注意点:
otter.domainName配置的地址,页面访问都会使用这个地址,如果配置了内网,或者ip外网不能访问,可以在manager.deployer-4.2.15\webapp\WEB-INF\common\uris.xml 配置文件中,修改一行 <serverURI>http://192.168.99.1:${otter.port}/</serverURI>,即可解决

三、node配置
1、 mkdir ~ /node
2、tar zxvf node.deployer-4.2.15.tar.gz
3、vi ~/otter/conf/otter.properties
# node的安装目录
otter.nodeHome = ${user.dir}/node
#manager的服务地址,manage中otter.properties配置的otter.domainName:otter.port
otter.manager.address = 127.0.0.1:1099
4、配置nid


image.png

通过manage配置node操作后,获取到了node节点对应的唯一标示,称之为node id,简称nid,比如我添加的机器对应序号为1
执行echo 1 >~/node/conf/nid , 保存到conf目录下的nid文件;一个node一个序号一个文件
执行好如图:


image.png

5、启动node,/node/bin/startup.sh
6、查看node是否启动
image.png

注意:因为node是注册到zk可能会慢,需要等待一会才能变成已启动

四、配置同步任务
1、环境准备
搭建一个数据库同步任务,源数据库必须开启binlog,并且binlog_format为ROW,设置server_id,即在mysql的配置文件新增/修改以下配置
log-bin=mysql-bin
binlog-format=ROW
server_id=12314123 //保证同步的源库/目标库 id不一样即可
2、添加canal


image.png

注意:如果zk修改了名称,这里需要把zookeeper集群下拉框重新选择编辑下,否则会出现同步异常相关的错误
3、添加数据源


image.png

一般我按库名新增,也可以从实例级别配置即可
4、配置表

image.png

table示例说明
单表配置:alibaba.product
分表配置:alibaba[1-64].product , alibaba.product[01-32]
正则配置:(.).(.)
schema name和table name都设置成.*表示全库同步
5、配置channel
image.png

6、配置pipeline
image.png

image.png

7、添加映射关系
image.png

如果不是自定义,就不需要填写EventProcessor
8、启动channel即可
启动后,点击channel看pipeline是否工作中,并且最后位点时间有时间,无时间显示代表未成功


image.png

扩展功能-自定义EventProcessor

因为公司是多马甲,而且主键id没有设置步长和起始id,导致无法用自带的otter字段映射进行主键同步,所以用了otter的扩展功能,根据主键id和马甲字段projectName进行联合主键同步,此功能不支持新增唯一判断,需要在目标表手动建立联合唯一索引

1、新建maven project
2、添加pom
    <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>shared.etl</artifactId>
            <version>4.2.15</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/node.extend -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>node.extend</artifactId>
            <version>4.2.15</version>
        </dependency>

3、自己抽象一个父类

import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;

/**
 * 通用otter同步类,必须存在马甲字段,否则同步失败
 * 
 * @author weihui
 *
 */
public class OtterEventProcessor extends AbstractEventProcessor {

    public OtterEventProcessor() {

    }

    public OtterEventProcessor(String[] replaceColumnArr) {
        if (replaceColumnArr != null) {
            Set<String> set = new HashSet<String>();
            for (String columnName : replaceColumnArr) {
                set.add(columnName);
            }
            this.replaceColumnList = set;
        }
    }

    private EventColumn key;// 源表主键ID

    private Map<String, EventColumn> map = new HashMap<String, EventColumn>();// 所有列

    private String typeName = "project_name";// 马甲名,默认project_name

    private Set<String> replaceColumnList;// 自定义同步字段

    private void init(EventData eventData) {
        key = eventData.getKeys().get(0);

        for (EventColumn column : eventData.getColumns()) {
            map.put(column.getColumnName(), column);
        }
    }

    @Override
    public boolean process(EventData eventData) {
        System.out.println("db=" + eventData.getSchemaName() + ",table="
                + eventData.getTableName() + ",event="
                + eventData.getEventType());
        if (eventData.getEventType() == EventType.INSERT
                || eventData.getEventType() == EventType.UPDATE
                || eventData.getEventType() == EventType.DELETE) {

            init(eventData);

            replaceKey(eventData);

            replaceCols(eventData);
            return true;
        } else {
            return false;
        }
    }

    private void replaceKey(EventData eventData) {
        EventColumn projectNameColumn = new EventColumn();
        String typeValue = map.get(typeName).getColumnValue();
        System.out.println(key.getColumnName() + "=" + key.getColumnValue()
                + "," + typeName + "=" + typeValue);
        projectNameColumn.setColumnValue(typeValue);
        projectNameColumn.setColumnType(Types.VARCHAR);
        projectNameColumn.setColumnName(typeName);
        projectNameColumn.setKey(true);

        List<EventColumn> keys = new ArrayList<EventColumn>();
        keys.add(key);
        keys.add(projectNameColumn);
        eventData.setKeys(keys);
        eventData.setOldKeys(keys);//必须设置oldkeys,否则修改不了
    }

    private void replaceCols(EventData eventData) {
        if (replaceColumnList == null || replaceColumnList.isEmpty()) {// 没有自定义同步字段,默认同步所有字段
            replaceColumnList = map.keySet();
        }
        List<EventColumn> cols = new ArrayList<EventColumn>();
        for (String columnName : replaceColumnList) {
            EventColumn column = map.get(columnName);
            if (typeName.equals(column.getColumnName())) {// 排除马甲字段,因为变成联合主键已经会替换此字段
                continue;
            }
            if (column != null) {
                cols.add(column);
            }
        }
        eventData.setColumns(cols);
    }
}

4、具体使用

  • 如果是同步所有源表列,在自定义的Event Processor选择SOURCE写上如下代码即可
public class ChannelInfoProcessor extends OtterEventProcessor {
}

  • 如果是同步源表指定列,在自定义的Event Processor选择SOURCE写上如下代码即可

public class AssetRepaymentProcessor extends OtterEventProcessor {
    public AssetRepaymentProcessor() {
        super(new String[] { "user_id", "asset_order_id", "repayment_amount",
                "repaymented_amount", "repayment_principal",
                "repayment_interest", "plan_late_fee", "true_late_fee",
                "late_fee_apr", "credit_repayment_time", "period",
                "repayment_time", "repayment_real_time", "late_fee_start_time",
                "interest_update_time", "late_day", "created_at", "updated_at",
                "auto_debit_fail_times", "renewal_count", "status",
                "collection", "repayment_no", "grant_time",
                "first_repayment_time" });

    }
}

必须先把OtterEventProcessor 这个类可以打包一个jar,放入node的lib目录下,如果是相同的表同步可以用同一个类名,如果是不同表一定要新的类名,否则会同步出问题,因为字段不一样; 当然也可以直接写源码,不需要打jar包,这是封装的写法

自带监控报警功能

目前支持邮件发送
1、添加监控,发送人key在系统管理-系统参数中配置


image.png

2、配置接收邮箱


image.png

3、manage中的otter.properties配置发送邮箱,邮箱必须要开启smtp服务,然后配置授权码,有的邮箱是直接登录密码(比如QQ企业邮箱)
image.png

manage配置完,要重启,然后看看日志
image.png

注意:可以会报错,邮件发布出去504等报错

日常积累

1.node的logs目录下1,2,3目录,代表pipeline的序号,哪里pipeline报错,就在哪个目录下查看日志


image.png

image.png

2.一台机器可以配置多个node,直接启动即可,前提是同一台机器的端口配置不一样,从manage的node配置不同端口即可,不同机器可以配置同一个端口


image.png

假设同一台多node启动报错,可能是端口占用报错,需要修改上面的端口,找个不存在的端口使用

3.老版本可以直接升级新版本,注意manage的端口,node会配置这个端口,假设manage挂了,node没挂,同步的时候,页面统计的数字没法增长,可以看node.log有报错信息,dubbo连manage失败,但是不影响数据同步,启动manage后就好了

4.pipeline优化


image.png

image.png

5.升级node,manage可以直接升级

6.pipeline切换node,有可能会出现同步进度中的node还是老node,假设出现这种问题可以先停掉老node,在启动channel

7.新增node保存后可能会没有数据,可以看看是否有其他node有问题,目前我这边是把老node都停掉,再新增node就没问题

  1. node的同步进度,存在zk中
    [zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/crm_canal/2/cursor
    {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"
    10.1.1.17","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000006","position":391302782,"serverId":160623,"timestamp":1545880850000}}

相关报错积累

image.png

配置了jdk,但没有配置classpath

2018-07-18 09:58:13.392 [New I/O server worker #1-4] WARN  c.a.d.common.threadpool.support.AbortPolicyWithReport -  [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-127.0.0.1:2088, Pool Size: 50 (active: 50, core: 50, max: 50, largest: 50), Task: 113 (completed: 63), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://127.0.0.1:2088!, dubbo version: 2.5.3, current host: 127.0.0.1

线程池报错,听说是15版本的bug,14版本没问题

pid:2 nid:2 exception:setl:com.google.common.collect.ComputationException: java.lang.ArrayIndexOutOfBoundsException: 0
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:889)
at com.alibaba.otter.canal.common.zookeeper.ZkClientx.getZkClient(ZkClientx.java:34)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.getZkclientx(CanalInstanceWithManager.java:401)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.initMetaManager(CanalInstanceWithManager.java:121)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.(CanalInstanceWithManager.java:76)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1$1.(CanalEmbedSelector.java:139)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1.generate(CanalEmbedSelector.java:139)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:68)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:65)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingValueReference.compute(ComputingConcurrentHashMap.java:356)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.compute(ComputingConcurrentHashMap.java:182)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.getOrCompute(ComputingConcurrentHashMap.java:151)
at com.google.common.collect.ComputingConcurrentHashMap.getOrCompute(ComputingConcurrentHashMap.java:67)
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:885)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.start(CanalServerWithEmbedded.java:98)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector.start(CanalEmbedSelector.java:206)
at com.alibaba.otter.node.etl.select.SelectTask.startup(SelectTask.java:170)
at com.alibaba.otter.node.etl.select.SelectTask.run(SelectTask.java:126)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.util.Arrays$ArrayList.get(Arrays.java:3841)
at com.alibaba.otter.canal.common.zookeeper.ZooKeeperx.connect(ZooKeeperx.java:68)

zk名称被修改,需要在canal和node等重新选择一遍(即使一模一样),在保存下即可

pid:5 nid:1 exception:canal:有零花(ulinghua_cs_online):java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:94)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:137)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)

可能是之前同步任务没成功,我们用的rds,默认只保留18个小时,然后后面在启动的时候,同步进度已经过了18个小时,导致一直报这个错误,解放方案:
停掉同步任务
进入对应 Pipeline ,删除同步进度,重新启动即可,但是会丢失之前的数据,需要人工同步丢失的数据


image.png
pid:8 nid:1 exception:canal:微现金(vxianjin_online):com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4832)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:160)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:759)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:428)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:114)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:66)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser.parseAndProfilingIfNecessary(AbstractEventParser.java:337)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3$1.sink(AbstractEventParser.java:184)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:145)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)

排查下canel的用户名密码是否正确、数据源是否连接失败、白名单配置等等,实在都没问题就重启node解决

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容