基于Splunk Connector for Kafka 实现Splunk与Kafka数据集成

Kafka是Apache的基于订阅发布模式的分布式消息队列,主要应用于数据管道和服务之间的流式数据,很多公司和组织都在利用Apache Kafka提供的能力构建下一代流式应用。本文介绍如何使用开源的Splunk Connector for Kafka消费Kafka数据并转发至Splunk,同时可以自由指定写入数据的sourcetype,便于后续数据解析处理。
Splunk Connect for Kafka是一种侵入式的客户端,基于Kafka Connect框架开发,用于将数据从Kafka的topic取出,并通过Splunk HTTP Event Collector(HEC)发送到Splunk

0x01 准备工作

0x02 配置Splunk

HTTP Event Collector(HEC)

配置Splunk接收从Kafka发送过来的消息需要配置HEC,通常有两种方式配置接收,即通过HEC接收后由Heavy Forwarder转发或者直接发送到Indexer节点,本文采用第二种方式,将消息直接发送到Indexer。
配置HEC,登录Splunk Web 页面,Setting > Data Inputs > HTTP Event Collector 点击Global Settings,确认All Tokens 为enabled状态,然后点击Save。这里要注意下面的HTTP Port Number,在后面的配置中会用到它。


image.png

然后点击New Token来创建一个新的HEC 令牌。
有关Splunk Acknowledgement 先不在本文讨论,感兴趣的同学请自行google


image.png

下一步会提示设置sourcetype,context和index等内容,按需配置即可。
对于大规模分布式的Splunk环境,生成token的方式请参考官方文档

Connector安装和配置

首先解压从GitHub上下载的Connector包,并将它复制到所有Kafka Connect节点上(默认情况下就是Kafka的安装节点)
修改配置文件,路径默认在Kafka安装目录下的config目录config/connect-distributed.properties

#These settings may already be configured if you have deployed a connector in your Kafka Connect Environment
bootstrap.servers=<BOOTSTRAP_SERVERS>
plugin.path=<PLUGIN_PATH>
#Required
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=10000
#Recommended
group.id=kafka-connect-splunk-hec-sink

注意:

  • bootstrap.servers 填写所有Kafka Broker的IP:端口,用逗号隔开
  • plugin.path 填写Splunk connector for kafka JAR包位置,必须保证位置可达
    修改完成后保存文档,并确保所有Kafka Connect节点都已近完成上述修改,然后在每个节点都执行以下命令开启Kafka Connect
./bin/connect-distributed.sh config/connect-distributed.properties

执行以下命令验证插件是否启动

curl http://<KAFKA_CONNECT_HOST>:8083/connector-plugins

返回信息应该会是Connector的名称com.splunk.kafka.connect.SplunkSinkConnector

创建任务

通过上面的步骤我们启动了一个Splunk Connector for Kafka的服务,但是并没有指定要订阅哪个主题以及发送的目的端,接下来我们需要通过RESTful API创建任务。
在Kafka Connect节点上执行:

curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d'{
  "name": "splunk-prod-financial",
    "config": {
     "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
     "tasks.max": "3",
     "topics": "NIP_SYSLOG_1515UDP2",
     "splunk.indexes": "main",
     "splunk.hec.uri":"https://192.168.134.56:8088",
     "splunk.hec.token": "4b6c844b-125c-4133-b23f-2f4de21b634a",
     "splunk.hec.ack.enabled" : "true",
     "splunk.hec.raw" : "false",
     "splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
     "splunk.hec.ssl.validate.certs": "false",
     "splunk.hec.track.data" : "true"
    }
}'

这里需要注意:

  • name: Connector的名字,这个名字也将作为Kafka消费者的组名
  • topics: Splunk消费的目标主题名称,用逗号隔开
  • Splunk.hec.uri: Splunk HEC URI,此处填写开启HEC的节点的URI,包含IP和端口,这里的端口是上面在配置HEC的时候设置的,比如8088
  • Splunk.hec.token: Splunk HTTP Event Collector token
  • Splunk.hec.ack.enabled: 可以设置为true或者false,设置为true时,Splunk Kafka Connector在查询Kafka offsets之前会先通过POST发送ACK请求,这个机制用来保障数据不丢失
  • Splunk.hec.json.event.enrichment: 只对event HEC endpoint有效,主要是用来在元数据中增加字段
  • splunk.hec.ssl.validate.certs: 开启或关闭SSL证书校验,如果你的Splunk使用自签发证书,这里可以考虑关闭SSL证书校验,否则会报证书错误
    完整的参数列表可以参考GitHub的说明:
    https://github.com/splunk/kafka-connect-splunk

验证

要验证配置是否正确,需要往Kafka指定的主题里面写入数据,然后在Splunk上查看是否可以收到对应数据内容。写入数据有很多种方式,可以使用Splunk提供的数据生成工具Kafka data-gen-app或者kafka-console-producer,再或者使用kafka自带的客户端直接写消息进去,比如:
在kafka节点上执行:

bin/kafka-console-producer.sh --broker-list 192.168.134.57:9092 --topic NIP_SYSLOG_1515UDP2
image.png

然后在Splunk 搜索:


image.png

其他常用API

通过调用任务创建API,Splunk Connector开始订阅指定主题,此时我们可以通过另外几个API查询任务状态

描述 命令
列出有效connectors curl http://localhost:8083/connectors
获取指定Connector信息 curl http://localhost:8083/connectors/kafka-connect-splunk
获取指定Connector配置 curl http://localhost:8083/connectors/kafka-connect-splunk/config
删除指定Connector curl http://localhost:8083/connectors/kafka-connect-splunk -X DELETE
获取指定connector任务信息 curl http://localhost:8083/connectors/kafka-connect-splunk/tasks

参考链接

Install and Use Splunk Connect for Kafka
Splunk Connect for Kafka – Connecting Apache Kafka with Splunk
Github Kafa-connect-splunk

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容