开源组件Kafka Connect推荐

Kafka Connect简介

Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

使用Kafka Connect连接Kafka和Elasticsearch

1 测试环境准备

ES服务、Kafka服务、kafka topic:kafka_es_test

2 Kafka Connect 安装

下载地址:https://www.confluent.io/download/

3 Worker配置

1) 配置参考 ,参考如下 :

[通用配置]

[Standalone Woker配置]

[Distributed Worker配置]

Kafka Connect默认使用AvroConverter,使用该AvroConverter时必须先启动Schema Registry服务

2) 实际操作  

修改./schema-registry/connect-avro-standalone.properties,配置bootstrap.servers=kafka服务地址

4 Elasticsearch Connector配置

1) 配置参考  

[Connectors通用配置]

[Elasticsearch Configuration Options]

2) 实际操作  

对./kafka-connect-elasticsearch/quickstart-elasticsearch.properties做如下修改:

name=elasticsearch-sink

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1

topics=kafka_es_test

key.ignore=true

connection.url=http://ES服务地址

type.name=kafka-connect

注意: 其中topics、kafka中topic名称、Elasticsearch的索引名三者通常一致,也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射

5 启动connector

5.1  启动Schema Registry服务

该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。具体操作如下: 

1) 启动Zookeeper

./bin/zookeeper-server-start-daemon etc/kafka/zookeeper.properties

2) 启动kafka

./bin/kafka-server-start-daemon etc/kafka/server.properties

3) 启动schema Registry

./bin/schema-registry-start-daemon etc/schema-registry/schema-registry.properties

4) 使用netstat -natpl 查看各服务端口是否正常启动  :zookeeper 、kafka 、schema registry各自的地址

5.2 启动Connector

./bin/connect-standalone -daemon  etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

以上启动各服务均可在logs目录下找到对应日志

6 启动Kafka Producer

1) 启动Producer

./bin/kafka-avro-console-producer --broker-list XXXX:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'

2) 输入如下数据

{"nickname":"michel"}{"nickname":"mushao"}

7 Kibana验证结果

查看索引 ,GET_cat/indices结果可以看到名为kafka_es_test的索引被成功创建

Confluent CLI

CLI目前只是适用于开发阶段,不能用于生产环境。  

它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务;使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置  

使用Confluent CLI

confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等

1) 启动./bin/confluent start

2) 检查confluent运行状态./bin/confluent status

3) 问题定位,如果第二步出现问题,可以使用log命令查看,如connect未启动成功则./bin/confluent log connect

4) 加载Elasticsearch Connector

a) 查看connector

./bin/confluentlistconnectors

b) 加载Elasticsearch connector

./bin/confluentloadelasticsearch-sink

结果,输出quickstart-elasticsearch.properties配置文件中包含的信息

{"name":"elasticsearch-sink","config": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"kafka_es_test","key.ignore":"true","connection.url":"http://192.168.0.8:9200","type.name":"kafka-connect","name":"elasticsearch-sink"    },"tasks": [],"type":null}

5) 使用producer生产数据,并使用kibana验证是否写入成功

Kafka Connect Rest API

Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作

1) GET connectors 获取运行中的connector列表

2) POST connectors 使用指定的名称和配置创建connector

3) GET connectors/(string:name) 获取connector的详细信息

4) GET connectors/(string:name)/config 获取connector的配置

5) PUT connectors/(string:name)/config 设置connector的配置

6) GET connectors/(string:name)/status 获取connector状态

7) POST connectors/(stirng:name)/restart 重启connector

8) PUT connectors/(string:name)/pause 暂停connector

9) PUT connectors/(string:name)/resume 恢复connector

10)DELETEconnectors/(string:name)/ 删除connector

11)GETconnectors/(string:name)/tasks 获取connectors任务列表

12)GET/connectors/(string:name)/tasks/(int: taskid)/status获取任务状态

13) POST /connectors/(string:name)/tasks/(int: taskid)/restart 重启任务

14)GET/connector-plugins/ 获取已安装插件列表

15) PUT /connector-plugins/(string:name)/config/validate验证配置

原文链接

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容

  • 1.背景 最近研究了数据库实时转移的方案,目的是要把数据库中某些表的指定数据实时的转移到别的数据库中。 这与平时了...
    七分熟pizza阅读 9,940评论 1 7
  • 喧嚣像是众多羽毛的海啸, 它很轻,無孔不入 它铺满了城市和乡村的每一个酒馆。 每一天都喧囂着空洞的演講, 那空洞是...
    田泊阅读 319评论 2 1
  • 张占丽 焦点解决网络初级第14期 平顶山坚持分享第74天 2019年5月11日 身为父母,怎样爱孩子...
    张占丽阅读 81评论 0 0
  • 春芳艳,暖风掀湖裙,一派袅娜引人醉。葬花情,伴我潇湘空自流。一杯浊酒,两行清泪,三点天思血。白发恨不尽,挥霍芳华,...
    逸之阅读 561评论 2 1
  • 今天听了一本书,刻意练习 有一个说话叫一件事坚持一万小时就能成功 其实一万小时不一定能成功,成功也不一定需要一...
    葩泫露阅读 91评论 0 0