Kafka是Apache的基于订阅发布模式的分布式消息队列,主要应用于数据管道和服务之间的流式数据,很多公司和组织都在利用Apache Kafka提供的能力构建下一代流式应用。本文介绍如何使用开源的Splunk Connector for Kafka消费Kafka数据并转发至Splunk,同时可以自由指定写入数据的sourcetype,便于后续数据解析处理。
Splunk Connect for Kafka是一种侵入式的客户端,基于Kafka Connect框架开发,用于将数据从Kafka的topic取出,并通过Splunk HTTP Event Collector(HEC)发送到Splunk
0x01 准备工作
- 获取Splunk Connector for Kafka插件
- 已完成Kafka集群(或单点)部署,不熟悉的同学请参考Kafka官方手册
- 已完成Splunk集群(或单点)部署,这个如果不熟悉那就没办法了
0x02 配置Splunk
HTTP Event Collector(HEC)
配置Splunk接收从Kafka发送过来的消息需要配置HEC,通常有两种方式配置接收,即通过HEC接收后由Heavy Forwarder转发或者直接发送到Indexer节点,本文采用第二种方式,将消息直接发送到Indexer。
配置HEC,登录Splunk Web 页面,Setting > Data Inputs > HTTP Event Collector 点击Global Settings,确认All Tokens 为enabled状态,然后点击Save。这里要注意下面的HTTP Port Number,在后面的配置中会用到它。
然后点击New Token来创建一个新的HEC 令牌。
有关Splunk Acknowledgement 先不在本文讨论,感兴趣的同学请自行google
下一步会提示设置sourcetype,context和index等内容,按需配置即可。
对于大规模分布式的Splunk环境,生成token的方式请参考官方文档
Connector安装和配置
首先解压从GitHub上下载的Connector包,并将它复制到所有Kafka Connect节点上(默认情况下就是Kafka的安装节点)
修改配置文件,路径默认在Kafka安装目录下的config目录config/connect-distributed.properties
#These settings may already be configured if you have deployed a connector in your Kafka Connect Environment
bootstrap.servers=<BOOTSTRAP_SERVERS>
plugin.path=<PLUGIN_PATH>
#Required
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=10000
#Recommended
group.id=kafka-connect-splunk-hec-sink
注意:
- bootstrap.servers 填写所有Kafka Broker的IP:端口,用逗号隔开
- plugin.path 填写Splunk connector for kafka JAR包位置,必须保证位置可达
修改完成后保存文档,并确保所有Kafka Connect节点都已近完成上述修改,然后在每个节点都执行以下命令开启Kafka Connect
./bin/connect-distributed.sh config/connect-distributed.properties
执行以下命令验证插件是否启动
curl http://<KAFKA_CONNECT_HOST>:8083/connector-plugins
返回信息应该会是Connector的名称com.splunk.kafka.connect.SplunkSinkConnector
创建任务
通过上面的步骤我们启动了一个Splunk Connector for Kafka的服务,但是并没有指定要订阅哪个主题以及发送的目的端,接下来我们需要通过RESTful API创建任务。
在Kafka Connect节点上执行:
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d'{
"name": "splunk-prod-financial",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "3",
"topics": "NIP_SYSLOG_1515UDP2",
"splunk.indexes": "main",
"splunk.hec.uri":"https://192.168.134.56:8088",
"splunk.hec.token": "4b6c844b-125c-4133-b23f-2f4de21b634a",
"splunk.hec.ack.enabled" : "true",
"splunk.hec.raw" : "false",
"splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.track.data" : "true"
}
}'
这里需要注意:
- name: Connector的名字,这个名字也将作为Kafka消费者的组名
- topics: Splunk消费的目标主题名称,用逗号隔开
- Splunk.hec.uri: Splunk HEC URI,此处填写开启HEC的节点的URI,包含IP和端口,这里的端口是上面在配置HEC的时候设置的,比如8088
- Splunk.hec.token: Splunk HTTP Event Collector token
- Splunk.hec.ack.enabled: 可以设置为true或者false,设置为true时,Splunk Kafka Connector在查询Kafka offsets之前会先通过POST发送ACK请求,这个机制用来保障数据不丢失
- Splunk.hec.json.event.enrichment: 只对event HEC endpoint有效,主要是用来在元数据中增加字段
- splunk.hec.ssl.validate.certs: 开启或关闭SSL证书校验,如果你的Splunk使用自签发证书,这里可以考虑关闭SSL证书校验,否则会报证书错误
完整的参数列表可以参考GitHub的说明:
https://github.com/splunk/kafka-connect-splunk
验证
要验证配置是否正确,需要往Kafka指定的主题里面写入数据,然后在Splunk上查看是否可以收到对应数据内容。写入数据有很多种方式,可以使用Splunk提供的数据生成工具Kafka data-gen-app或者kafka-console-producer,再或者使用kafka自带的客户端直接写消息进去,比如:
在kafka节点上执行:
bin/kafka-console-producer.sh --broker-list 192.168.134.57:9092 --topic NIP_SYSLOG_1515UDP2
然后在Splunk 搜索:
其他常用API
通过调用任务创建API,Splunk Connector开始订阅指定主题,此时我们可以通过另外几个API查询任务状态
描述 | 命令 |
---|---|
列出有效connectors | curl http://localhost:8083/connectors |
获取指定Connector信息 | curl http://localhost:8083/connectors/kafka-connect-splunk |
获取指定Connector配置 | curl http://localhost:8083/connectors/kafka-connect-splunk/config |
删除指定Connector | curl http://localhost:8083/connectors/kafka-connect-splunk -X DELETE |
获取指定connector任务信息 | curl http://localhost:8083/connectors/kafka-connect-splunk/tasks |
参考链接
Install and Use Splunk Connect for Kafka
Splunk Connect for Kafka – Connecting Apache Kafka with Splunk
Github Kafa-connect-splunk