Kafka核心API——Connect API

Kafka Connect基本概念介绍

Kafka Connect是一个用于将数据流输入和输出Kafka的框架。Confluent平台附带了几个内置connector,可以使用这些connector进行关系数据库或HDFS等常用系统到Kafka的数据传输,也是用来构建ETL的一种方案。

Kafka Connect基本概念:

  • Kafka Connect实际上是Kafka流式计算的一部分
  • Kafka Connect主要用来与其他中间件建立流式通道
  • Kafka Connect支持流式和批处理集成

Kafka Connect的架构如下图所示:


image.png

Kafka Connect关键词:

  • Connectors:通过管理task来协调数据流的高级抽象
  • Tasks:如何将数据复制到Kafka或从Kafka复制数据的实现
  • Workers:执行Connector和Task的运行进程
  • Converters: 用于在Connect和外部系统发送或接收数据之间转换数据的代码
  • Transforms:更改由连接器生成或发送到连接器的每个消息的简单逻辑

Connectors

Kafka Connect中的connector定义了数据应该从哪里复制到哪里。connector实例是一种逻辑作业,负责管理Kafka与另一个系统之间的数据复制。

我们在大多数情况下都是使用一些平台提供的现成的connector。但是,也可以从头编写一个新的connector插件。在高层次上,希望编写新连接器插件的开发人员遵循以下工作流:


image.png

Task

Task是Connect数据模型中的主要处理数据的角色,也就是真正干活的。每个connector实例协调一组实际复制数据的task。通过允许connector将单个作业分解为多个task,Kafka Connect提供了内置的对并行性和可伸缩数据复制的支持,只需很少的配置。

这些任务没有存储任何状态。任务状态存储在Kafka中的特殊主题config.storage.topicstatus.storage.topic中。因此,可以在任何时候启动、停止或重新启动任务,以提供弹性的、可伸缩的数据管道。

image.png

Workers

Workers是负责管理和执行connector和task的,Workers有两种模式,Standalone(单机)和Distributed(分布式)。

Standalone Workers:

Standalone模式是最简单的模式,用单一进程负责执行所有connector和task

Distributed Workers:

Distributed模式为Kafka Connect提供了可扩展性和自动容错能力。在分布式模式下,你可以使用相同的组启动许多worker进程。它们自动协调以跨所有可用的worker调度connector和task的执行。

如果你添加一个worker、关闭一个worker或某个worker意外失败,那么其余的worker将检测到这一点,并自动协调,在可用的worker集重新分发connector和task。

image.png

Task Rebalance

当connector首次提交到集群时,workers会重新平衡集群中的所有connector及其tasks,以便每个worker的工作量大致相同。当connector增加或减少它们所需的task数量,或者更改connector的配置时,也会使用相同的重新平衡过程。

当一个worker失败时,task在活动的worker之间重新平衡。当一个task失败时,不会触发再平衡,因为task失败被认为是一个例外情况。因此,失败的task不会被框架自动重新启动,应该通过REST API重新启动。

image.png

Converters

在向Kafka写入或从Kafka读取数据时,Converter是使Kafka Connect支持特定数据格式所必需的。task使用Converters将数据格式从字节转换为连接内部数据格式,反之亦然。并且Converter与Connector本身是解耦的,以便在Connector之间自然地重用Converter。

默认提供以下Converters:

  • AvroConverter(建议):与Schema Registry一起使用
  • JsonConverter:适合结构数据
  • StringConverter:简单的字符串格式
  • ByteArrayConverter:提供不进行转换的“传递”选项

AvroConverter处理数据的流程图:


image.png

Transforms

Connector可以配置Transforms,以便对单个消息进行简单且轻量的修改。这对于小数据的调整和事件路由十分方便,且可以在connector配置中将多个Transforms连接在一起。然而,应用于多个消息的更复杂的Transforms最好使用KSQL和Kafka Stream来实现。

Transforms是一个简单的函数,输入一条记录,并输出一条修改过的记录。Kafka Connect提供许多Transforms,它们都执行简单但有用的修改。可以使用自己的逻辑定制实现转换接口,将它们打包为Kafka Connect插件,将它们与connector一起使用。

当Transforms与Source Connector一起使用时,Kafka Connect通过第一个Transforms传递connector生成的每条源记录,第一个Transforms对其进行修改并输出一个新的源记录。将更新后的源记录传递到链中的下一个Transforms,该Transforms再生成一个新的修改后的源记录。最后更新的源记录会被转换为二进制格式写入到Kafka。Transforms也可以与Sink Connector一起使用。

以下为Confluent平台提供的Transforms:


Kakfa Connect环境准备

前面已经铺垫了Kakfa Connect的基本概念,接下来用一个简单的例子演示一下Kakfa Connect的使用方式,以便对其作用有一个直观的了解。

在演示Kakfa Connect的使用之前我们需要先做一些准备,因为依赖一些额外的集成。例如在本文中使用MySQL作为数据源的输入和输出,所以首先得在MySQL中创建两张表(作为Data Source和Data Sink)。建表SQL如下:

CREATE TABLE `users_input` (
  `uid` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(20) NOT NULL,
  `age` int(11) NOT NULL,
  PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `users_output` (
  `uid` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(20) NOT NULL,
  `age` int(11) NOT NULL,
  PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

接下来就是考虑怎么实现Kafka Connect了,前面有提到过Kafka Connect中的connector定义了数据应该从哪里复制到哪里。connector实例是一种逻辑作业,负责管理Kafka与另一个系统之间的数据复制。

因此,如果要自己实现一个Connect的话还是稍微有些复杂的,好在Confluent平台有些现成的Connect。例如Confluent平台就有JDBC的Connect,下载地址如下:

我们需要到Kafka Server上进行相应的配置才能使用该Connect,所以复制下载链接到服务器上使用wget命令进行下载:

[root@txy-server2 ~]# cd /usr/local/src
[root@txy-server2 /usr/local/src]# wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/5.5.0/confluentinc-kafka-connect-jdbc-5.5.0.zip

除此之外,由于要连接MySQL,所以还得去maven仓库上复制mysql-connector驱动包的下载链接,然后使用同样命令进行下载:

[root@txy-server2 /usr/local/src]# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar

解压下载好的Connect压缩包,创建一个存放目录,将解压后的文件移到到该目录下,并将MySQL驱动包移动到kafka-connect-jdbclib目录下:

[root@txy-server2 /usr/local/src]# unzip confluentinc-kafka-connect-jdbc-5.5.0.zip
[root@txy-server2 /usr/local/src]# mkdir -p /opt/kafka/plugins
[root@txy-server2 /usr/local/src]# mv confluentinc-kafka-connect-jdbc-5.5.0 /opt/kafka/plugins/kafka-connect-jdbc
[root@txy-server2 /usr/local/src]# mv mysql-connector-java-8.0.20.jar /opt/kafka/plugins/kafka-connect-jdbc/lib/

Connect包准备好后,编辑connect-distributed.properties配置文件,修改如下配置项:

[root@txy-server2 ~]# vim /usr/local/kafka/config/connect-distributed.properties
# Broker Server的访问ip和端口号
bootstrap.servers=172.21.0.10:9092
# 指定集群id
group.id=connect-cluster
# 指定rest服务的端口号
rest.port=8083
# 指定Connect插件包的存放路径
plugin.path=/opt/kafka/plugins

由于rest服务监听了8083端口号,如果你的服务器开启了防火墙就需要使用以下命令开放8083端口,否则外部无法访问:

[root@txy-server2 ~]# firewall-cmd --zone=public --add-port=8083/tcp --permanent
[root@txy-server2 ~]# firewall-cmd --reload

完成前面的步骤后,我们就可以启动Kafka Connect了。有两种启动方式,分别是:前台启动和后台启动,前者用于开发调试,后者则通常用于正式环境。具体命令如下:

# 前台启动
[root@txy-server2 ~]# connect-distributed.sh /usr/local/kafka/config/connect-distributed.properties

# 后台启动
[root@txy-server2 ~]# connect-distributed.sh -daemon /usr/local/kafka/config/connect-distributed.properties

启动成功后,使用浏览器访问http://{ip}:8083/connector-plugins,正常情况下会返回这样一段JSON数据:

image.png

到此为止,我们就已经完成Kafka Connect的环境准备了,接下来演示一下Source Connector与Sink Connector如何与MySQL做集成。


Kafka Connect Source和MySQL集成

首先我们要知道rest服务提供了一些API去操作connector,如下表:

image.png

使用浏览器访问http://{ip}:8083/connectors,可以查看所有的connector,此时返回的是一个空数组,说明没有任何的connector

image.png

此时我们可以使用POST方式请求/connectors接口来新增一个connector,这里以curl命令为例,调用示例如下:

curl -X POST -H 'Content-Type: application/json' -i 'http://{ip}:8083/connectors' \
--data \
'{"name":"test-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://{ip}:3306/kafka_store?user=root&password=123456a.",
"table.whitelist":"users_input",
"incrementing.column.name": "uid",
"mode":"incrementing",
"topic.prefix": "test-mysql-"}}'

参数说明:

  • name:指定新增的connector的名称
  • config:指定该connector的配置信息
  • connector.class:指定使用哪个Connector类
  • connection.url:指定MySQL的连接url
  • table.whitelist:指定需要加载哪些数据表
  • incrementing.column.name:指定表中自增列的名称
  • mode:指定connector的模式,这里为增量模式
  • topic.prefix:Kafka会创建一个Topic,该配置项就是用于指定Topic名称的前缀,后缀为数据表的名称。例如在本例中将生成的Topic名称为:test-mysql-users_input

调用成功后,会返回如下响应数据:

HTTP/1.1 201 Created
Date: Mon, 25 May 2020 13:48:16 GMT
Location: http://{ip}:8083/connectors/test-upload-mysql
Content-Type: application/json
Content-Length: 368
Server: Jetty(9.4.24.v20191120)

{"name":"test-upload-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:mysql://{ip}:3306/kafka_store?user=root&password=123456a.","table.whitelist":"users_input","incrementing.column.name":"uid","mode":"incrementing","topic.prefix":"test-mysql-","name":"test-upload-mysql"},"tasks":[],"type":"source"}

然后刷新浏览器页面,可以看到test-upload-mysql这个connector已经能被列出来了:

image.png

新增connector完成后,我们尝试往数据表里添加一些数据,具体的sql如下:

insert into users_input(`name`, `age`) values('小明', 15);
insert into users_input(`name`, `age`) values('小白', 13);
insert into users_input(`name`, `age`) values('小李', 17);

接着使用kafka-console-consumer.sh脚本命令去拉取test-mysql-users_input中的数据:

[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-mysql-users_input --from-beginning

拉取出来的数据是JSON结构的,其中的payload就是数据表中的数据,如下:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"uid"},{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"age"}],"optional":false,"name":"users_input"},"payload":{"uid":1,"name":"小明","age":15}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"uid"},{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"age"}],"optional":false,"name":"users_input"},"payload":{"uid":2,"name":"小白","age":13}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"uid"},{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"age"}],"optional":false,"name":"users_input"},"payload":{"uid":3,"name":"小李","age":17}}

能拉取到这样的数据就代表已经成功将MySQL数据表中的数据传输到Kafka Connect Source里了,也就是完成输入端的工作了。


Kafka Connect Sink和MySQL集成

现在我们已经能够通过Kafka Connect将MySQL中的数据写入到Kafka中了,接下来就是完成输出端的工作,将Kafka里的数据输出到MySQL中。

首先,我们需要调用Rest API新增一个Sink类型的connector。具体请求如下:

curl -X POST -H 'Content-Type: application/json' -i 'http://{ip}:8083/connectors' \
--data \
'{"name":"test-download-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://{ip}:3306/kafka_store?user=root&password=123456a.",
"topics":"test-mysql-users_input",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"uid",
"table.name.format": "users_output"}}'

参数说明:

  • name:指定新增的connector的名称
  • config:指定该connector的配置信息
  • connector.class:指定使用哪个Connector类
  • connection.url:指定MySQL的连接url
  • topics:指定从哪个Topic中读取数据
  • auto.create:是否自动创建数据表
  • insert.mode:指定写入模式,upsert表示可以更新及写入
  • pk.mode:指定主键模式,record_value表示从消息的value中获取数据
  • pk.fields:指定主键字段的名称
  • table.name.format:指定将数据输出到哪张数据表上

调用成功后,会返回如下响应数据:

HTTP/1.1 201 Created
Date: Mon, 25 May 2020 14:37:41 GMT
Location: http://49.232.153.84:8083/connectors/test-download-mysql
Content-Type: application/json
Content-Length: 409
Server: Jetty(9.4.24.v20191120)

{"name":"test-download-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://47.106.206.51:3306/kafka_store?user=root&password=Zero-One1.","topics":"test-mysql-users_input","auto.create":"false","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"uid","table.name.format":"users_output","name":"test-download-mysql"},"tasks":[],"type":"sink"}

刷新浏览器页面,此时就有两个connector了:

image.png

该Sink类型的connector创建完成后,就会读取Kafka里对应Topic的数据,并输出到指定的数据表中。如下:

image.png


小结

回顾一下本文中的示例,可以直观的看到Kafka Connect实际上就做了两件事情:使用Source Connector从数据源(MySQL)中读取数据写入到Kafka Topic中,然后再通过Sink Connector读取Kafka Topic中的数据输出到另一端(MySQL)。

虽然本例中的Source端和Sink端都是MySQL,但是不要被此局限了,因为Source端和Sink端可以是不一样的,这也是Kafka Connect的作用所在。它就像一个倒卖数据的中间商,将Source端的数据读取出来写到自己的Topic,这就像进货一样,然后再将数据输出给Sink端。至此,就完成了一个端到端的数据同步,其实会发现与ETL过程十分类似,这也是为啥Kafka Connect可以作为实现ETL方案的原因。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350