CDC(change feed caputre)实现流程:
1.changefeed数据变动监控:使用poller轮询器,调用存储引擎层的监控sst memtable中的变化然后把变化的kv值和时间戳写入缓存,每一次轮询中,当前的时间作为high-water mark,
指定表的spans被range boundary切分开,在上一次和最新的high-water mark区间sst(使用slurpSST函数,调用存储引擎层的迭代器监控内存表)每一个改变的kv
被作为输出请求拿出来并传到buffer,最终所有改变写入缓存后,resolved时间戳被写入buffer.
2.changeaggregator把kvs从buffer中取出来进行encode,同时处理表的lease(调用分布式sql的lease管理器),然后把span-level的数据转序列化
换成数据行row通过缓存emit到sink中;
转换成行数据时从poller写入的buffer中get数据,使用encoder生成的数据,包括rows,值的时间戳,以及表描述
3.changefrontier,追踪上边提到span,追踪他们当前的resolved的时间戳水印,负责对这些span做checkpoint,之后把changefeed-level的时间戳编码并写入缓存
,最终emit到sink里;
同样的也会从poller写入的buffer中get数据,但是这个数据是没有rows,只有resolved的时间戳数据,也会经过encoder加工.
4.这些row数据包括kv,mvcc timestamp,表的描述(avro需要)以及删除标志位,还有resolved的时间戳,如果前边这些值都为空,那就是checkpoint的resolved的时间戳
5.sink接口会按照顺序异步接受数据和时间戳最终flush到目标的sink,在同一个goroutine保证在一个span内顺序是安全的,多goroutine是非安全.在一个span中,
上次的resolved timestamp到最新的一个resolved timestamp会有排序,并去掉多余的输出最终的结果.
flush的动作是使用sarama的producer调用sarama的client来自动刷到kafka中.(github.com/Shopify/sarama)是kafka的go的实现
目前sink支持json和avro两种格式的数据行.支持输出到kafka,网络存储,本地文件,小强自己本身,后几种是experimental选项.
6. 3,4这两步骤,最终是调用小强的job调度来调度的,都有内存和硬盘监视器来检查资源使用情况,目前有一个问题是在关闭3,4时会有资源竞争导致panic
目前的限制:
1.支持单column family,目前还不支持操作多个column family的kv值.
2.表drop和truncate之后job需要会被废弃,要重新设置
3.不支持库级别及使用通配符匹配多个表
additional :
resolved timestamp的体现:
建立job是指定resolved参数可以定期的输出时间戳即使期间是没有数据变化的,可以用来监控延迟