链接器组件
Connectors
通过任务协调数据流处理, 逻辑上的job,一个job包括多个task
Tasks
实现数据进入或流出kafka ,也是逻辑层面
Workers
执行Connectors与Tasks的进程,workers是对逻辑层面的job,拆分成逻辑层面的task后的实际执行
Converters
实现Connect具体用于发送与接受系统数据的代码 (异构数据的传输)
Confluent生态下,将数据转化成connectors能够识别的数据格式,默认提供如下四种格式的数据转化:
AvroConverter
: use with Confluent Schema Registry,推荐使用的格式
JsonConverter
: great for structured data
StringConverter
: simple string format
ByteArrayConverter
: provides a "pass-through" option that does no
conversionTransforms
处理connector生成与接收消息,消息内容与结构的变化场景,目前支持对消息进行如下转换:
字段或整个键或值转换为一个特定的类型,如强制一个整数字段宽度较小。
将一个key或值从记录或集合中设置为null
从当前schema的结构化数据中提取指定的字段,或从非结构化数据中抽取map,其中任意null值不经修改进行传递
源于已有topic的键或值,用一个新的topic对其进行替换
将嵌套数据结构简化,为每个字段名称拼接一个可配置分隔符得到嵌套数据结构j简化后的字段名称字符串
从当前schema的结构化数据中封装指定的字段,或从非结构化数据中封装map
基于已有元数据或配置静态属性值插入字段
指定一个有效的null值的字段类型来掩码字段
使用配置的正则表达式和替换字符串更新已有topic
过滤或重命名字段
设置schema名称,版本等元数据
不同格式(如Unix epoch,字符串,Connect日期、时间戳类型)之间时间戳转换
更新记录的topic字段作为最初的topic函数值及时间戳
将记录键替换为一个由记录中的字段值的子集组成的新的key
链接器类型
覆盖各类型存储,如Jdbc、Elastic Search、Cassandra、MongoDB、HBase、Syslog、MQTT、Twitter 、S3等
链接器的使用
运行如下命令,通过:http://XXXXX:8483/connector-plugins 查看已经加载的链接器插件信息。
connect-standalone worker.properties connector1.properties connector2.properties
FileConnector使用
修改FileStreamSource、FileStreamSink、Broker相关配置,启动broker并创建对应的topic,最后运行链接器。
FileStreamSource
从test.txt中读取并发布到Broker中
Source相关配置${KAFKA_HOME}/config/connect-file-source.properties,针对具体场景修改
name=local-file-source
connector.
class
=FileStreamSource tasks.max=
1
file=test.txt
topic=connect-test
FileStreamSink
从Broker中读取数据并写入到test.sink.txt文件中
Sink相关配置文件${KAFKA_HOME}/config/connect-file-sink.properties (针对具体场景修改)
name=local-file-sink
connector.
class
=FileStreamSink tasks.max=
1
file=test.sink.txt
topics=connect-test
Broker
修改配置文件${KAFKA_HOME}/config/connect-standalone.properties (一次配置多次使用)
bootstrap.servers=localhost:
9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=
true
value.converter.schemas.enable=
true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=
false
internal.value.converter.schemas.enable=
false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=
10000
启动broker并创建对应的topic
zk启动的前提下,kafka-server-start.sh ./config/server.properties &
kafka-topics --create --zookeeper XXX --topic test --replication-factor 1 --partitions 1
运行链接器
connect-standalone config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
HDFS sink的使用
配置文件修改,设置hdfs的URL,对应kafka中的topic名字
name=hdfs-sink
connector.
class
=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=
1
topics=test_hdfs
hdfs.url=hdfs:
flush.size=
3
运行命令:connect-standalone config/connect-standalone.properties
Mysql 中的使用
配置文件修改
name=mysql-jdbc-autoincrement
connector.
class
=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=
1
connection.url=jdbc:mysql:
mode=incrementing
incrementing.column.name=HeatMapId
topic.prefix=test-mysql-jdbc-
针对测试kafka链接器中mysql的链接,初始化mysql的库表、账号信息
CREATE DATABASE testkafka;
GRANT ALL PRIVILEGES ON testkafka.* TO 'kafka'@'%' IDENTIFIED BY 'XXXX' WITH GRANT OPTION;
FLUSH PRIVILEGES;