【Canal源码分析】Sink及Store工作过程

一、序列图

parser-sink-store.png

二、源码分析

2.1 Sink

Sink阶段所做的事情,就是根据一定的规则,对binlog数据进行一定的过滤。我们之前跟踪过parser过程的代码,发现在parser完成后,会把数据放到一个环形队列TransactionBuffer中,也就是这个方法:

transactionBuffer.add(entry);

我们具体看下add这个方法。

public void add(CanalEntry.Entry entry) throws InterruptedException {
    switch (entry.getEntryType()) {
        case TRANSACTIONBEGIN:
            flush();// 刷新上一次的数据
            put(entry);
            break;
        case TRANSACTIONEND:
            put(entry);
            flush();
            break;
        case ROWDATA:
            put(entry);
            // 针对非DML的数据,直接输出,不进行buffer控制
            EventType eventType = entry.getHeader().getEventType();
            if (eventType != null && !isDml(eventType)) {
                flush();
            }
            break;
        default:
            break;
    }
}

判断一下事件的类型,如果是事务开头,那么直接刷新之前的数据,然后把当前事件加到队列中;如果是事务的结束,那么先把当前事务放到队列后,刷新到下一个阶段;如果是普通的事件,直接放到队列中,如果事务头类型不为空,且不是DML类型,那么直接刷新队列中数据到下一个阶段。

我们需要理清楚这块的逻辑,什么时候flush,什么时候put,针对不同的事件,采取的策略不一样。

这里我们分析下flush和put两个步骤。

2.1.1 flush队列

这块其实还没有涉及到sink阶段,还在维护一个事件环形队列。这个环形队列,维护了两个指针,一个是flush的指针,一个是put的指针,flush的指针永远是滞后于put指针的。

private void flush() throws InterruptedException {
    long start = this.flushSequence.get() + 1;
    long end = this.putSequence.get();

    if (start <= end) {
        List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
        for (long next = start; next <= end; next++) {
            transaction.add(this.entries[getIndex(next)]);
        }

        flushCallback.flush(transaction);
        flushSequence.set(end);// flush成功后,更新flush位置
    }
}

start就是flush的指针,end就是put的指针,flush的动作就是把当前flush到put中间的数据,全部刷新到下一个阶段。具体传递到下一个阶段的代码在flushCallback.flush方法中。这块我们下文再分析。

2.1.2 put

private void put(CanalEntry.Entry data) throws InterruptedException {
    // 首先检查是否有空位
    if (checkFreeSlotAt(putSequence.get() + 1)) {
        long current = putSequence.get();
        long next = current + 1;

        // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
        entries[getIndex(next)] = data;
        putSequence.set(next);
    } else {
        flush();// buffer区满了,刷新一下
        put(data);// 继续加一下新数据
    }
}

这块的注释都比较清晰了,就不赘述了。

2.1.3 flush到sink

具体的代码在AbstractEventParser中,定义transactionBuffer的地方。

public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
    boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
    if (!running) {
        return;
    }

    if (!successed) {
        throw new CanalParseException("consume failed!");
    }

    LogPosition position = buildLastTransactionPosition(transaction);
    if (position != null) { // 可能position为空
        logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
    }
}

主要的处理在consumeTheEventAndProfilingIfNecessary里面。这里面调用了eventSink.sink()方法。

2.1.4 sink

这里面进行了binlog数据的过滤。首先判断是否需要过滤事务头和尾,如果需要过滤的话,直接过滤掉,默认不过滤。

遍历传到这个阶段的binlog列表,根据正则表达式判断,是否需要进行过滤,一般来说是根据表名、库名等进行过滤。这边的过滤类主要是AviaterRegexFilter,根据库名.表名和表达式进行过滤。如果需要进行过滤,那么直接把这个事件过滤。否则,加到binlog列表中,进行二次过滤。第二次过滤的主要内容是HEARTBEAT类型的事件,主要的代码在这里:

protected boolean doSink(List<Event> events) {
    for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
        events = handler.before(events);//处理heartbeat事件
    }

    int fullTimes = 0;
    do {
        if (eventStore.tryPut(events)) {
            for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                events = handler.after(events);
            }
            return true;
        } else {
            applyWait(++fullTimes);
        }

        for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
            events = handler.retry(events);
        }

    } while (running && !Thread.interrupted());
    return false;
}

这里的CanalEventDownStreamHandler其实只有HeartBeatEntryEventHandler,也就是在before方法中把heartbeat事件从events去掉。这个心跳事件其实是parser过程生成的,我们之前有提到过。after目前是空的方法。

去掉之后,剩余的事件列表就会被调用tryPut()方法,送到下一步骤store中。

这里还有个applyWait方法,防止无限等待。

private void applyWait(int fullTimes) {
    int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
    if (fullTimes <= 3) { // 3次以内
        Thread.yield();
    } else { // 超过3次,最多只sleep 10ms
        LockSupport.parkNanos(1000 * 1000L * newFullTimes);
    }

}

2.2 Store

目前只有基于内存模式的Store,这个阶段是真正Server中的落盘阶段。数据经历了mysql master到parser,再到sink,最后终于到了这里。

public boolean tryPut(List<Event> data) throws CanalStoreException {
    if (data == null || data.isEmpty()) {
        return true;
    }

    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (!checkFreeSlotAt(putSequence.get() + data.size())) {
            return false;
        } else {
            doPut(data);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

在进行数据put的时候,加了一把锁。首先计算下是否还有剩余的空间进行数据处理,这里的计算,不光是计算了队列的剩余长度,还计算了剩余空间。队列的长度默认是16*1024,如果空间不足,直接拒绝,返回false,等待空间空余出来后,再进行put操作。否则,直接doPut()。

/**
 * 执行具体的put操作
 */
private void doPut(List<Event> data) {
    long current = putSequence.get();
    long end = current + data.size();

    // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
    for (long next = current + 1; next <= end; next++) {
        entries[getIndex(next)] = data.get((int) (next - current - 1));
    }

    putSequence.set(end);

    // 记录一下gets memsize信息,方便快速检索
    if (batchMode.isMemSize()) {
        long size = 0;
        for (Event event : data) {
            size += calculateSize(event);
        }

        putMemSize.getAndAdd(size);
    }

    // tell other threads that store is not empty
    notEmpty.signal();
}

这里主要对put一些指针,还有空间做了重新的计算。放到队列中之后,通知其他等待notEmpty的线程,来进行数据的获取,这时候,client可以进行数据获取了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容