kafka链接器connecter

链接器组件

Connectors

通过任务协调数据流处理, 逻辑上的job,一个job包括多个task

Tasks 

实现数据进入或流出kafka ,也是逻辑层面 

Workers 

执行Connectors与Tasks的进程,workers是对逻辑层面的job,拆分成逻辑层面的task后的实际执行

Converters

实现Connect具体用于发送与接受系统数据的代码 (异构数据的传输)

Confluent生态下,将数据转化成connectors能够识别的数据格式,默认提供如下四种格式的数据转化:

AvroConverter : use with Confluent Schema Registry,推荐使用的格式

JsonConverter: great for structured data

StringConverter: simple string format

ByteArrayConverter: provides a "pass-through" option that does no 

conversionTransforms 

处理connector生成与接收消息,消息内容与结构的变化场景,目前支持对消息进行如下转换:

Cast

字段或整个键或值转换为一个特定的类型,如强制一个整数字段宽度较小。

Drop

将一个key或值从记录或集合中设置为null

ExtractField

从当前schema的结构化数据中提取指定的字段,或从非结构化数据中抽取map,其中任意null值不经修改进行传递

ExtractTopic

源于已有topic的键或值,用一个新的topic对其进行替换

Flatten

将嵌套数据结构简化,为每个字段名称拼接一个可配置分隔符得到嵌套数据结构j简化后的字段名称字符串

HoistField

从当前schema的结构化数据中封装指定的字段,或从非结构化数据中封装map

InsertField

基于已有元数据或配置静态属性值插入字段

MaskField

指定一个有效的null值的字段类型来掩码字段

RegexRouter

使用配置的正则表达式和替换字符串更新已有topic

ReplaceField

过滤或重命名字段

SetSchemaMetadata

设置schema名称,版本等元数据

TimestampConverter

不同格式(如Unix epoch,字符串,Connect日期、时间戳类型)之间时间戳转换

TimestampRouter

更新记录的topic字段作为最初的topic函数值及时间戳

ValueToKey

将记录键替换为一个由记录中的字段值的子集组成的新的key

链接器类型

覆盖各类型存储,如Jdbc、Elastic Search、Cassandra、MongoDB、HBase、Syslog、MQTT、Twitter 、S3等

链接器的使用

运行如下命令,通过:http://XXXXX:8483/connector-plugins  查看已经加载的链接器插件信息。

connect-standalone   worker.properties  connector1.properties connector2.properties

FileConnector使用

修改FileStreamSource、FileStreamSink、Broker相关配置,启动broker并创建对应的topic,最后运行链接器。

FileStreamSource

从test.txt中读取并发布到Broker中

Source相关配置${KAFKA_HOME}/config/connect-file-source.properties,针对具体场景修改

name=local-file-source

connector.class=FileStreamSource tasks.max=1

file=test.txt

topic=connect-test

FileStreamSink

从Broker中读取数据并写入到test.sink.txt文件中

Sink相关配置文件${KAFKA_HOME}/config/connect-file-sink.properties (针对具体场景修改)

name=local-file-sink

connector.class=FileStreamSink tasks.max=1

file=test.sink.txt

topics=connect-test

Broker

修改配置文件${KAFKA_HOME}/config/connect-standalone.properties (一次配置多次使用)

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true

value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000

启动broker并创建对应的topic

zk启动的前提下,kafka-server-start.sh ./config/server.properties &

kafka-topics --create --zookeeper XXX  --topic test  --replication-factor 1 --partitions 1

运行链接器

connect-standalone config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 

HDFS sink的使用

配置文件修改,设置hdfs的URL,对应kafka中的topic名字

name=hdfs-sink

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector

tasks.max=1

topics=test_hdfs

hdfs.url=hdfs:

flush.size=3

运行命令:connect-standalone config/connect-standalone.properties  

Mysql 中的使用

配置文件修改

name=mysql-jdbc-autoincrement

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:mysql:

mode=incrementing

incrementing.column.name=HeatMapId

topic.prefix=test-mysql-jdbc-

针对测试kafka链接器中mysql的链接,初始化mysql的库表、账号信息

CREATE DATABASE testkafka;
GRANT ALL PRIVILEGES ON testkafka.* TO 'kafka'@'%' IDENTIFIED BY 'XXXX' WITH GRANT OPTION;
FLUSH PRIVILEGES;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容