[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋

概述
企业正在经历其数据资产的爆炸式增长,这些数据包括批式或流式传输的结构化、半结构化以及非结构化数据,随着海量数据批量导入的场景的增多,企业对于 Data Pipeline 的需求也愈加复杂。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。

SelectDB Cloud 基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为客户提供极简运维和极致性价比的数仓服务。

Kafka Connect For SelectDB Cloud

Kafka Connect 是一款可扩展、可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以定义 Connectors 来将大量数据迁入迁出Kafka。
SelectDB提供了 Sink Connector 插件,可以将Kafka Topic中的JSON数据保存到SelectDB数据库中。

架构

image.png

在业务场景中,通常会通过Debezium Connector将数据库的变更数据实时写入Kafka,或者调用API往Kafka中推送JSON格式数据,使用SelectDB Connector即可将这些数据同步到SelectDB数据库中。
工作原理
Kafka Connector 通过以下过程订阅 Kafka topic 的数据,并将数据 sink 到 SelectDB 中。

image.png

SelectDB Connector 通过内部的 task 一对一或一对多的消费对应 topic partition 的数据。当达到阈值(时间或内存或消息数量)时,connector 会将该批次的 records 生成一个临时文件,并上传至 SelectDB 的对象储存中。
当临时文件数达到 50 个或 connector 向 Kafka 集群预提交已成功消费的 offset 时(默认 10s ),将对象存储中临时文件的通过 Copy-Into 的操作,导入至对应的 table 中。
Exactly-Once
Exactly-Once 语义是指即使在机器或应用出现故障的情况下,也不会重复处理数据或者丢失数据。Selectdb-kafka-connector 通过 Kafka 集群与 SelectDB 实现 Exactly_Once,具体原理如下:

kafka-connector 在初始化时会主动向 Selectdb 获取当前所在 partition 已提交的 last_offset。

从 kafka 消费数据,只有当前 record 的 offset 大于从 Selectdb 获取的 last_offset 后,才能被正常消费。当消费的 record 达到阈值,会生成一个以 last_offset 命名的临时文件,并将该文件上传至对象存储中。

image.png

在 kafka 调用执行 preCommit 时,会将对象存储中的数据由 copy-into 操作导入至 SelectDB 中,此时 SelectDB会记录已提交成功的 last_offset。
若此时 Kafka-connector 执行 copy-into 失败,则会从 Kafka 中获取当前 partition 上一次执行成功的 offset,继续消费,从而保证数据不丢不重。


image.png

成功执行 copy-into 后,向 kafka 提交记录当前 partition 已成功消费的 offset。
若此时 kafka-connector 意外挂掉,重启该 task 或其他 task 在 kafka 的分区自平衡机制下继续消费该 partition。通过初始化阶段可获取到 SelectDB 中已提交成功的 last_offset,继续消费,直至下一个 preCommit 阶段再向 kafka 提交成功消费的 offset。

image.png

使用场景
环境准备

下载并解压

wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -zxvf kafka_2.12-2.4.0.tgz
 
bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties 
bin/kafka-server-start.sh -daemon  config/server.properties

快速同步JSON数据
在业务场景中,Kafka中会存放业务写入的数据流,通常格式为JSON(对象/数组),使用SelectDB Sink Connector可以快速的同步数据到SelectDB数据库中。

配置SelectDB Sink

name=selectdb-sink
connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector
topics=test_topic
selectdb.topic2table.map=test_topic:test_tbl
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com
selectdb.http.port=47057
selectdb.query.port=30523
selectdb.user=admin
selectdb.password=password
selectdb.database=test_db
selectdb.cluster=cluster_name
#配置convert
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
 
#配置死信队列,可选
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error
errors.deadletterqueue.context.headers.enable = true
errors.deadletterqueue.topic.replication.factor=1

启动Kafka Connect

bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

使用Debezium数据同步MySQL数据到SelectDB
在很多业务场景中,经常需要从业务数据库中实时同步数据,在这个时候就需要使用数据库的变更数据捕获(Change Data Capture,简称 CDC)机制。
而Debezium是基于Kafka Connect的CDC工具,可以对接 MySQL、PostgreSQL、SQL Server、Oracle、MongoDB 等多种数据库,把数据库的数据持续以统一的格式发送到 Kafka 的Topic中,以供下游Sink端进行实时消费。
这里以MySQL为例

下载Debezium

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.4.Final/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
 
tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz

配置Debezium Source

name=mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=127.0.0.1
database.port=3306
database.user=root
database.password=123456
database.server.id=1
# kafka中的该client的唯一标识
database.server.name=test
#需要同步的数据库,默认是同步所有数据库
database.include.list=db
database.history.kafka.bootstrap.servers=localhost:9092
#用于存储数据库表结构变化的 Kafka topic
database.history.kafka.topic=dbhistory
transforms=unwrap
#参考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
#记录删除事件
transforms.unwrap.delete.handling.mode=rewrite

配置好之后,Kafka中默认的Topic名称格式是 SERVER_NAME.DATABASE_NAME.TABLE_NAME
注:其他Debezium配置可参考 https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties

配置SelectDB Sink

name=selectdb-sink
connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector
topics=test.db.table
selectdb.topic2table.map=test.db.table:test_tbl
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com
selectdb.http.port=57338
selectdb.query.port=15392
selectdb.user=admin
selectdb.password=password
selectdb.database=test
selectdb.cluster=cluster_name
#配置convert
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
 
#配置死信队列,可选
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error
errors.deadletterqueue.context.headers.enable = true
errors.deadletterqueue.topic.replication.factor=1

同步到SelectDB时,需要先提前创建好库表。

启动Kafka Connect

bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties

启动后,可以观察日志 logs/connect.log 是否启动成功。
使用效果
在调研的使用场景中,使用 kafka 同步上游 JSON 数据。这里数据维持以每秒 10w 条的超高频导入,在 8c16g 的机器上,仅部署单节点 kafka 集群,同时在 topic 中配置 20 个 partition,以 distributed 模式启动 connector。在实际处理过程中,topic 中的总体消息平均积压在 120w 条左右,单个 partition 积压 6w 条消息,表现相当优秀。

image.png

总结
整体来看,Kafka-SelectDB Connector 打通了从 kafka 直接导入数据至 SelectDB 的数据链路,降低了通过 Flink 作为中间数据同步组件的链路复杂度;通过 Exactly once 实现数据的一次性精确导入,确保了数据的准确性;通过以 Kafka 集群作为载体,在超高频的数据导入场景中,性能表现非常优秀。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容