kafka链接器connecter

链接器组件

Connectors

通过任务协调数据流处理, 逻辑上的job,一个job包括多个task

Tasks 

实现数据进入或流出kafka ,也是逻辑层面 

Workers 

执行Connectors与Tasks的进程,workers是对逻辑层面的job,拆分成逻辑层面的task后的实际执行

Converters

实现Connect具体用于发送与接受系统数据的代码 (异构数据的传输)

Confluent生态下,将数据转化成connectors能够识别的数据格式,默认提供如下四种格式的数据转化:

AvroConverter : use with Confluent Schema Registry,推荐使用的格式

JsonConverter: great for structured data

StringConverter: simple string format

ByteArrayConverter: provides a "pass-through" option that does no 

conversionTransforms 

处理connector生成与接收消息,消息内容与结构的变化场景,目前支持对消息进行如下转换:

Cast

字段或整个键或值转换为一个特定的类型,如强制一个整数字段宽度较小。

Drop

将一个key或值从记录或集合中设置为null

ExtractField

从当前schema的结构化数据中提取指定的字段,或从非结构化数据中抽取map,其中任意null值不经修改进行传递

ExtractTopic

源于已有topic的键或值,用一个新的topic对其进行替换

Flatten

将嵌套数据结构简化,为每个字段名称拼接一个可配置分隔符得到嵌套数据结构j简化后的字段名称字符串

HoistField

从当前schema的结构化数据中封装指定的字段,或从非结构化数据中封装map

InsertField

基于已有元数据或配置静态属性值插入字段

MaskField

指定一个有效的null值的字段类型来掩码字段

RegexRouter

使用配置的正则表达式和替换字符串更新已有topic

ReplaceField

过滤或重命名字段

SetSchemaMetadata

设置schema名称,版本等元数据

TimestampConverter

不同格式(如Unix epoch,字符串,Connect日期、时间戳类型)之间时间戳转换

TimestampRouter

更新记录的topic字段作为最初的topic函数值及时间戳

ValueToKey

将记录键替换为一个由记录中的字段值的子集组成的新的key

链接器类型

覆盖各类型存储,如Jdbc、Elastic Search、Cassandra、MongoDB、HBase、Syslog、MQTT、Twitter 、S3等

链接器的使用

运行如下命令,通过:http://XXXXX:8483/connector-plugins  查看已经加载的链接器插件信息。

connect-standalone   worker.properties  connector1.properties connector2.properties

FileConnector使用

修改FileStreamSource、FileStreamSink、Broker相关配置,启动broker并创建对应的topic,最后运行链接器。

FileStreamSource

从test.txt中读取并发布到Broker中

Source相关配置${KAFKA_HOME}/config/connect-file-source.properties,针对具体场景修改

name=local-file-source

connector.class=FileStreamSource tasks.max=1

file=test.txt

topic=connect-test

FileStreamSink

从Broker中读取数据并写入到test.sink.txt文件中

Sink相关配置文件${KAFKA_HOME}/config/connect-file-sink.properties (针对具体场景修改)

name=local-file-sink

connector.class=FileStreamSink tasks.max=1

file=test.sink.txt

topics=connect-test

Broker

修改配置文件${KAFKA_HOME}/config/connect-standalone.properties (一次配置多次使用)

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true

value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000

启动broker并创建对应的topic

zk启动的前提下,kafka-server-start.sh ./config/server.properties &

kafka-topics --create --zookeeper XXX  --topic test  --replication-factor 1 --partitions 1

运行链接器

connect-standalone config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 

HDFS sink的使用

配置文件修改,设置hdfs的URL,对应kafka中的topic名字

name=hdfs-sink

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector

tasks.max=1

topics=test_hdfs

hdfs.url=hdfs:

flush.size=3

运行命令:connect-standalone config/connect-standalone.properties  

Mysql 中的使用

配置文件修改

name=mysql-jdbc-autoincrement

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:mysql:

mode=incrementing

incrementing.column.name=HeatMapId

topic.prefix=test-mysql-jdbc-

针对测试kafka链接器中mysql的链接,初始化mysql的库表、账号信息

CREATE DATABASE testkafka;
GRANT ALL PRIVILEGES ON testkafka.* TO 'kafka'@'%' IDENTIFIED BY 'XXXX' WITH GRANT OPTION;
FLUSH PRIVILEGES;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容