日志收集之kafka

一、介绍

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输
  • 同时支持离线数据处理和实时数据处理
  • Scale out:支持在线水平扩展

架构与原理

kafka的架构和原理想必大家都已经在很多地方看过,今天暂时不讲,下次再开篇详谈,整个kafka的具体工作流和架构如下图:

整体架构

  如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

二、安装

在centos上安装kafka,我推荐安装confluent公司的kafka套装,我们可以选择自己想要的组件就行。

2.1 yum安装

  1. sudo rpm --import http://packages.confluent.io/rpm/2.0/archive.key
  2. 添加yum源, confluent.repo

[confluent-2.0]
name=Confluent repository for 2.0.x packages
baseurl=http://packages.confluent.io/rpm/2.0
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/2.0/archive.keyenabled=1

  1. sudo yum install confluent-platform-2.11.7安装即可,里面包含confluent-kafka-2.11.7confluent-schema-registry等组件。

安装完成后马上快速开始吧。

三、kafka命令行

kafka工具安装后,会有很多自带的工具来测试kafka,下面就举几个例子

3.1 kafka-topics

创建、改变、展示全部和描述topics, 例子:

 [root@localhost ~]#/usr/bin/kafka-topics --zookeeper zk01.example.com:2181 --list
sink1
test
[root@localhost ~]#/usr/bin/kafka-topics --zookeeper zk01.example.com:2181 --create --topic

3.2 kafka-console-consumer

从kafka中读取数据,输出到控制台

[root@localhost ~]#kafka-console-consumer --zookeeper zk01.example.com:2181 --topic test

3.3 kafka-console-producer

从标准输出读取数据然后写入到kafka队列中

[root@localhost ~]#/usr/bin/kafka-console-producer --broker-list kafka02.example.com:9092,kafka03.example.com:9092 --topic test2

3.4 kafka-consumer-offset-checker

检查读写的消息量

[root@localhost ~]#/usr/bin/kafka-consumer-offset-checker --group flume --topic test1 --zookeeper zk01.example.com:2181

四、kafka web UI

利用开源项目KafkaOffsetMonitor或者kafka-manager将kafka情况直观的展示出来。

4.1 运行KafkaOffsetMonitor

java -cp /root/kafka_web/KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb  --dbName kafka --zk  zk-server1,zk-server2 --port 8080 --refresh 10.seconds --retain 2.days```
- 利用supervisor运行, 在/etc/supervisord.d目录下创建kafka_web.conf文件,内容如下

[program:kafka_web]
command=java -cp /root/kafka_web/KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --dbName kafka -zk zk-server1,zk-server2 --port 8080 --refresh 10.seconds --retain 2.days
startsecs=0
stopwaitsecs=0
autostart=true
autorestart=true


## 4.2 运行kafka-manager
运行kafka-manager需要sbt编译,但是编译起来太麻烦了,而且还不一定成功,所以我就直接用docker跑了一个。

- 在centos上,在docker的配置```/etc/sysconfig/docker```上增加[daocloud](https://dashboard.daocloud.io/mirror)的加速mirror, 修改docker运行参数:
      other_args=" --registry-mirror=http://7919bcde.m.daocloud.io --insecure-registry=0.0.0.0:5000 -H tcp://0.0.0.0:2375 -H unix:///var/run/docker.sock -api-enable-cors=true"
直接重启docker即可。
- 运行docker命令即可访问
        docker run -p 9000:9000 -e ZK_HOSTS="zk_host:2181" -e APPLICATION_SECRET=kafka-manager  sheepkiller/kafka-manager

# 五 、性能测试
   关于性能测试,找到了Kafka的创始人之一的[Jay Kreps](http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines)的bechmark。以下描述皆基于该benchmark。(该benchmark基于Kafka0.8.1)
## 5.1 测试环境
  该benchmark用到了六台机器,机器配置如下
>Intel Xeon 2.5 GHz processor with six cores
>Six 7200 RPM SATA drives
>32GB of RAM
>1Gb Ethernet    

这6台机器其中3台用来搭建Kafka broker集群,另外3台用来安装Zookeeper及生成测试数据。6个drive都直接以非RAID方式挂载。实际上kafka对机器的需求与Hadoop的类似。

##5.2 producer吞吐率
  该项测试只测producer的吞吐率,也就是数据只被持久化,没有consumer读数据。
* 1个producer线程,无replication
  在这一测试中,创建了一个包含6个partition且没有replication的topic。然后通过一个线程尽可能快的生成50 million条比较短(payload100字节长)的消息。测试结果是***821,557 records/second***(***78.3MB/second***)。  
  之所以使用短消息,是因为对于消息系统来说这种使用场景更难。因为如果使用MB/second来表征吞吐率,那发送长消息无疑能使得测试结果更好。  
  整个测试中,都是用每秒钟delivery的消息的数量乘以payload的长度来计算MB/second的,没有把消息的元信息算在内,所以实际的网络使用量会比这个大。对于本测试来说,每次还需传输额外的22个字节,包括一个可选的key,消息长度描述,CRC等。另外,还包含一些请求相关的overhead,比如topic,partition,acknowledgement等。这就导致我们比较难判断是否已经达到网卡极限,但是把这些overhead都算在吞吐率里面应该更合理一些。因此,我们已经基本达到了网卡的极限。  
  初步观察此结果会认为它比人们所预期的要高很多,尤其当考虑到Kafka要把数据持久化到磁盘当中。实际上,如果使用随机访问数据系统,比如RDBMS,或者key-velue store,可预期的最高访问频率大概是5000到50000个请求每秒,这和一个好的RPC层所能接受的远程请求量差不多。而该测试中远超于此的原因有两个。
Kafka确保写磁盘的过程是线性磁盘I/O,测试中使用的6块廉价磁盘线性I/O的最大吞吐量是822MB/second,这已经远大于1Gb网卡所能带来的吞吐量了。许多消息系统把数据持久化到磁盘当成是一个开销很大的事情,这是因为他们对磁盘的操作都不是线性I/O。
在每一个阶段,Kafka都尽量使用批量处理。如果想了解批处理在I/O操作中的重要性,可以参考David Patterson的”[Latency Lags Bandwidth](http://www.ll.mit.edu/HPEC/agendas/proc04/invited/patterson_keynote.pdf)“
* 1个producer线程,3个异步replication
  该项测试与上一测试基本一样,唯一的区别是每个partition有3个replica(所以网络传输的和写入磁盘的总的数据量增加了3倍)。每一个broker即要写作为leader的partition,也要读(从leader读数据)写(将数据写到磁盘)作为follower的partition。测试结果为***75.1MB/second***。  
  该项测试中replication是异步的,也就是说broker收到数据并写入本地磁盘后就acknowledge producer,而不必等所有replica都完replication。也就是说,如果leader crash了,可能会丢掉一些最新的还未备份的数据。但这也会让message acknowledgement延迟更少,实时性更好。  
  这项测试说明,replication可以很快。整个集群的写能力可能会由于3倍的replication而只有原来的三分之一,但是对于每一个producer来说吞吐率依然足够好。  
* 1个producer线程,3个同步replication
  该项测试与上一测试的唯一区别是replication是同步的,每条消息只有在被in sync集合里的所有replica都复制过去后才会被置为committed(此时broker会向producer发送acknowledgement)。
  在这种模式下,Kafka可以保证即使leader crash了,也不会有数据丢失。测试结果***40.2MB/second***。Kafka同步复制与异步复制并没有本质的不同。leader会始终track follower replica从而监控它们是否还alive,只有所有in sync集合里的replica都acknowledge的消息才可能被consumer所消费。而对follower的等待影响了吞吐率。可以通过增大batch size来改善这种情况,但为了避免特定的优化而影响测试结果的可比性,本次测试并没有做这种调整。  
* 3个producer,3个异步replication
  该测试相当于把上文中的1个producer,复制到了3台不同的机器上(在1台机器上跑多个实例对吞吐率的增加不会有太大帮忙,因为网卡已经基本饱和了),这3个producer同时发送数据。整个集群的吞吐率为***193,0MB/second***。

###5.3 Producer Throughput Vs. Stored Data
  消息系统的一个潜在的危险是当数据能都存于内存时性能很好,但当数据量太大无法完全存于内存中时(然后很多消息系统都会删除已经被消费的数据,但当消费速度比生产速度慢时,仍会造成数据的堆积),数据会被转移到磁盘,从而使得吞吐率下降,这又反过来造成系统无法及时接收数据。这样就非常糟糕,而实际上很多情景下使用queue的目的就是解决数据消费速度和生产速度不一致的问题。  
  但Kafka不存在这一问题,因为Kafka始终以O(1)的时间复杂度将数据持久化到磁盘,所以其吞吐率不受磁盘上所存储的数据量的影响。为了验证这一特性,做了一个长时间的大数据量的测试。测试表明当磁盘数据量达到1TB时,吞吐率和磁盘数据只有几百MB时没有明显区别,这个variance是由Linux I/O管理造成的,它会把数据缓存起来再批量flush。 
### 5.4 consumer吞吐率
  需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。  
1个consumer
  该测试从有6个partition,3个replication的topic消费50 million的消息。测试结果为***89.7MB/second***。可以看到,Kafka的consumer是非常高效的。它直接从broker的文件系统里读取文件块。Kafka使用[sendfile API](http://www.ibm.com/developerworks/library/j-zerocopy/)来直接通过操作系统直接传输,而不用把数据拷贝到用户空间。该项测试实际上从log的起始处开始读数据,所以它做了真实的I/O。在生产环境下,consumer可以直接读取producer刚刚写下的数据(它可能还在缓存中)。实际上,如果在生产环境下跑[I/O stat](http://en.wikipedia.org/wiki/Iostat),你可以看到基本上没有物理“读”。也就是说生产环境下consumer的吞吐率会比该项测试中的要高。
3个consumer
  将上面的consumer复制到3台不同的机器上,并且并行运行它们(从同一个topic上消费数据)。测试结果为***249.5MB/second***,正如所预期的那样,consumer的吞吐率几乎线性增涨。  
### 5.5 Producer and Consumer
  上面的测试只是把producer和consumer分开测试,而该项测试同时运行producer和consumer,这更接近使用场景。实际上目前的replication系统中follower就相当于consumer在工作。  
  该项测试,在具有6个partition和3个replica的topic上同时使用1个producer和1个consumer,并且使用异步复制。测试结果为***75.8MB/second***, 可以看到,该项测试结果与单独测试1个producer时的结果几乎一致。所以说consumer非常轻量级。  
### 5.6 消息长度对吞吐率的影响
  上面的所有测试都基于短消息(payload 100字节),而正如上文所说,短消息对Kafka来说是更难处理的使用方式,可以预期,随着消息长度的增大,records/second会减小,但MB/second会有所提高。正如我们所预期的那样,随着消息长度的增加,每秒钟所能发送的消息的数量逐渐减小。但是如果看每秒钟发送的消息的总大小,它会随着消息长度的增加而增加,当消息长度为10字节时,因为要频繁入队,花了太多时间获取锁,CPU成了瓶颈,并不能充分利用带宽。但从100字节开始,我们可以看到带宽的使用逐渐趋于饱和(虽然MB/second还是会随着消息长度的增加而增加,但增加的幅度也越来越小)。  
### 5.7 端到端的Latency
  上文中讨论了吞吐率,那消息传输的latency如何呢?也就是说消息从producer到consumer需要多少时间呢?该项测试创建1个producer和1个consumer并反复计时。结果是,***2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile)***,(这里并没有说明topic有多少个partition,也没有说明有多少个replica,replication是同步还是异步。实际上这会极大影响producer发送的消息被commit的latency,而只有committed的消息才能被consumer所消费,所以它会最终影响端到端的latency)  
### 5.8 重现该benchmark
  如果读者想要在自己的机器上重现本次benchmark测试,可以参考[本次测试的配置和所使用的命令](https://gist.github.com/jkreps/c7ddb4041ef62a900e6c)。  
  实际上Kafka Distribution提供了producer性能测试工具,可通过```bin/kafka-producer-perf-test.sh```脚本来启动。 
  读者也可参考另外一份[Kafka性能测试报告](http://liveramp.com/blog/kafka-0-8-producer-performance-2/)  
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容

  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,424评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,815评论 8 167
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • 转帖:原文地址http://www.infoq.com/cn/articles/depth-interpretat...
    端木轩阅读 2,315评论 0 19
  • 天道酬勤,父亲也走了,但比母亲去世早近一年。家还是那个家,可我的心仍然停留在那个月亮下的港湾下,聆听着雨的诉求与...
    米澜盛若阅读 302评论 2 1