FileBeat+Confluent Kafka Connector推送日志到Elasticsearch

confluent

本文使用的confluent版本是confluent-5.2.1

1. FileBeat

FileBeat是一款轻量型的日志采集器,通常会在每一台运行业务服务的服务器上部署一套,本文使用的版本是6.7.0。官网提供CentOS版本的rpm包,下载并安装的命令如下

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.7.0-x86_64.rpm
sudo rpm -vi filebeat-6.7.0-x86_64.rpm

安装完之后,有关filebeat的配置文件位于/etc/filebeat/filebeat.yml一个比较典型的配置如下:

#配置输入源,可以收集不同的服务日志,推送到不同的KAFKA TOPIC
filebeat.inputs:
- type: log
  enabled: true
  fields:
    to_kafka_topic: "v1-service-logs-topic"
  paths:
    - /path/to/v1/service.log
    - /path/to/v1/another/*.log
- type: log
  enabled: true
  fields:
    to_kafka_topic: "v2-service-logs-topic"
  paths:
    - /path/to/v2/service.log
    - /path/to/v2/another/*.log
#配置输出源-KAFKA
output.kafka:
  #这里需要注意,配置的kafka地址必须和kafka启动时server.property中配置的listener地址保持一致。
  #即便是localhost,也需要配置成具体的hostname,否则会连不上kafka
  hosts: ["172.22.122.23:9092"]
  #topic: 'e3-service-logs-topic'
  topic: '%{[fields.to_kafka_topic]}'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

配置完毕之后,我们用如下命令启动filebeat

service filebeat start

1. Confluent及Kafka Connector

Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

Kafka Connect分为企业版和开源版,企业版在开源版的基础之上提供了监控,负载均衡,副本等功能,实际生产环境中建议使用企业版。(本测试使用开源版)

Kafka Connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)

更多的Kafka Connect的详细情况可以参考Kafka Connect官网

2. 搭建日志收集系统

Confluent套件包中自带了一个默认的传输连接器配置connect-avro-standalone.properties,因为Filebeat传输过来的数据是JSON格式的,并非Avro格式,因此我们新建一个connector,命名位connect-json-standalone。需要更改connector的ConnectorConverter

connect-avro-standalone.properties

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=172.22.122.23:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=false
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java

另外,我们需要更改elastic-sink的配置,用来配置监听的KAFKA TOPIC, 以及elastic-search的地址。

${CONFLUENT_HOME}/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
-------------------------------------------------------------------------------------
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#监听输入的kafka topics
topics=filebeat-topic,anther-want-to-listen-topic
key.ignore=true
#输入到的elastic-search地址
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false

更改完毕之后,我们需要独立启动4个服务,分别是zookeeper,kafka-server,schema-registry,connect-standalone

// Apache ZooKeeper服务
${CONFLUENT_HOME}/bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties 
// Apache Kafka服务
${CONFLUENT_HOME}/bin/kafka-server-start -daemon etc/kafka/server.properties
${CONFLUENT_HOME}/bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
${CONFLUENT_HOME}/bin/connect-standalone -daemon \
etc/schema-registry/connect-avro-standalone.properties \
etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

启动完上述4个服务之后,需要加载elasticsearch connector

${CONFLUENT_HOME}/bin/confluent load elasticsearch-sink

大功告成,打开kibana首页,在management标签中查看elastic-search的index,当业务系统有日志打出来时候,index中就会有document数量的变化。

image.png

删除Topic

在使用过程中可能会需要不再收集某个Topic的日志,这一点需要配置Kafka,首先我们需要检查Kafka服务的server.properties配置文件:

#配置kafka服务的server.properties,添加以下配置。支持删除topic
delete.topic.enable=true
#kafka日志存放位置
log.dirs=/tmp/kafka-logs

然后,使用可以使用如下命令删除topic。

kafka-topics --list --zookeeper localhost:2181
kafka-topics --delete --zookeeper localhost:2181 --topic xxx-service-logs-topic

我试验的时候删除了topic之后,kafka的日志文件也自动被删除了,检查下如果没有被自动删除日志文件,那么需要去手动删除。

手动查看Topic中的消息

有时候,配置完毕之后,ES依旧没获取到消息,这时候我们可以手动的进入KAFKA Topic查看消息有没有进Kafka队列,我们可以用如下命令查看:

kafka-console-consumer --bootstrap-server 172.22.122.23:9092 --topic xxx-service-logs-topic --from-beginning
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容

  • kafka当前的版本是0.10.1新增加的新特性还是很多的,使用0.8.×的同学,可以多关注下了。原文地址:htt...
    wangliang938阅读 1,468评论 1 12
  • 1.背景 最近研究了数据库实时转移的方案,目的是要把数据库中某些表的指定数据实时的转移到别的数据库中。 这与平时了...
    七分熟pizza阅读 9,950评论 1 7
  • 死亡有一万多道门,让人们各自退场离去。治和不治都是没有对错之分的选择。治,可能多一分钟心跳,多一天活着。不治,病人...
    Bernardxiao阅读 206评论 2 1
  • 1.更改配置文件 2.创建新的仓库文件夹 3.更改IntelliJ IDEA的配置 再勾选User setting...
    李姗姗_8ef1阅读 10,369评论 0 0
  • ——写在老婆六十一岁生日之际 (一) 缘分定, 戏话便成真。 闯北走南皆无意, 钟情一见伴终身。 风雨固情深。 (...
    清泉玉阅读 4,293评论 49 59