17. Apache Kafka

1. Kafka重要概念和技术架构:

1.实时数据分析中的Kakfa

在实时数据分析应用,Kafka的位置非常重要。首先通过Flume将Nginx服务器的日志,直接sink到Kakfa。然后通过Storm等实时计算框架,将Kafka数据计算并写入到HBase/Redis中,最后通过web客户端直接访问HBase/Redis来展示数据。

1.Kafka消息存储:Kafka的消息存储在Broker节点的磁盘上,并且是顺序写。
2.Kafka消息的消费特性:消费者消费Kafka上的消息,不会对消息进行删除操作,消息可以被不同的消费者重复消费。在Kafka Broker上保存的时间,由参数log.retention.hours等确定

2.Kafka的重要概念:

1.Broker:Kafka集群中的每一个节点服务器,集群是由一个或者多个Broker组成的。
2.Producer:消息的生产者
3.Consumer:消息的消费者
4.Topic:保存消息的逻辑单元,类似database中的table。消息按照不同类别发布到Kafka集群上,每个类别称之为一个topic。
5.Partition:Topic内的消息,在物理上按照分区(Partition)存储。
6.Consumer Group:让某几个consumer共同消费一个topic中的消息,这几个consumer不会出现重复消费消息的情况。

3.Kafka架构图:

Kafka架构图.png

Kafka集群只是一个或者多个Broker节点,节点间通信通过Zookeeper完成。

2. Kafaka集群安装

一定按照官方的文档来安装和部署。

1.安装计划部署:

计划在三台主机上安装部署Kafka集群:192.168.8.128,192.168.8.129,192.168.8.130。我选用的Kafka安装版本:kafka_2.10-0.10.1.0.tgz,在官网下载安装文件,下载后,解压压缩包:

$ tar zxvf kafka_2.10-0.10.1.0.tgz -C /opt/modules/

2. 修改配置文件:

切换到KAFKA_HOME目录,修改配置文件config/server.properties文件。

$ cd /opt/modules/kafka_2.10-0.10.1.0/
$ vi config/server.properties

sever.properties中有很多配置项,一般地,以下需要修改来完成kafka集群配置:

#节点唯一标识号,kafka集群中的每个broker,该项都不能重复
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

#Kafka消息的存放路径,注意不是日志而是Kafka的消息
# A comma seperated list of directories under which to store log files
log.dirs=/opt/modules/kafka_2.10-0.10.1.0/kafka-logs

#Kafka集群保存消息的时间,单位是时间。也就是Kafka消息在集群上保存1周后删掉。
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

#Zookeeper服务器的地址,逗号分隔多个zookeeper主机
zookeeper.connect=hadoop-senior01.pmpa.com:2181,hadoop-senior02.pmpa.com:2181,hadoop-senior03.pmpa.com:2181

3.拷贝其他主机,修改broker.id

1.拷贝主机:

$ scp -r kafka_2.10-0.10.1.0/ natty@hadoop-senior03.pmpa.com:/opt/modules/

2.在config/server.properties配置文件中修改broker.id项
每一个broker的server.properties需要修改。

4.启动Kafka

启动kakfa,使用脚本bin/kafka-server-start.sh脚本,参数是config/server.properties文件。

$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

可以写一个shell来批量启停kafka集群。在每一个broker上执行启动和停止服务器的脚本。kafkaServer_batch.sh:

#!/bin/bash
KAFKA_HOME=/opt/modules/kafka_2.10-0.10.1.0
if [ $# -ne 1 ]
then
   echo "Usage:kafkaServer_batch.sh [start|stop]";
   exit -1;
fi
action=$1
if [ $action = "start" ]
#start add parameters!!!!
then
   batchscript="kafka-server-start.sh ${KAFKA_HOME}/config/server.properties";
elif [ $action = "stop" ]
then
   batchscript=kafka-server-stop.sh;
else
   echo "bad parameter!!";
   exit -1;
fi

echo $batchscript

for i in 192.168.8.128 192.168.8.129 192.168.8.130
do
  echo "${action} node ${i} Begin:"
  echo "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &"
  ssh ${i} "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &" &
  echo "${action} node ${i} End:"
done

启动后,jps查看进程,可以看到Kafka进程:

2921 QuorumPeerMain
3025 Kafka
3308 Jps

3. Kafka使用测试(ConsoleProducer、ConsoleConsumer)

Kafka集群安装配置好了之后,我们用自带的命令行生产、消费者来测试下kafka集群的使用。开启ConsolePrducer和ConsoleConsumer,使用如下2个脚本:
$KAFKA_HOME/bin/kafka-console-consumer.sh
$KAFKA_HOME/bin/kafka-console-producer.sh

不带任何参数运行这2个脚本,可以查看帮助信息(其他这些shell脚本也可以这样查看帮助)。注意,必须的参数前有个REQUIRED标志。


帮助信息.png

先创建一个测试的topic:

$bin/kafka-topics.sh --create --topic test_natty --partitions 3  --replication-factor 1 --zookeeper vm-master:2181

启动一个命令行的producer:

$bin/kafka-console-producer.sh --broker-list vm-master:9092 --topic test_natty

启动一个命令行的consumer:

$bin/kafka-console-consumer.sh --topic test_natty --zookeeper vm-master:2181

这样就开启了2个空白的窗口,在producer的窗口输入了消息后,就可以在consumer窗口看到刚才发送的消息:如下图:
producer输入:


producer.png

consumer输出:


consumer.png

4. Flume sink数据到Kafka

1.首先增加一个flume agent配置

新增一个配置文件,kafka-sink.properties, 该配置文件实现如下flume配置:
Source: netcat (监听某个服务器44444端口的数据,nc命令)
Channel: Memory
Sink: Kafka Topic(使用之前创建的测试topic:test_natty )
在使用时候,需要注意使用的flume的版本,在确定了flume的版本后,到官网找到 Kafka Sink部分的配置信息,按照例子来填写Kafka Sink就可以了。之前在测试时候,使用了一个flume 1.8 和 kafka 0.8版本的组合,发现了问题。最终的问题的原因是Kafka版本过低,需要到0.10版本才能解决该问题,所以在测试这个case时,还需要注意使用的flume、kafka的版本。下面的例子使用的flume 1.6版本
文件 kafka-sink.properties 的详细配置文件如下:

a1.sources = s1
a1.channels = c1
a1.sinks = k1

# define the netcat source:
a1.sources.s1.type = netcat
a1.sources.s1.bind = localhost
a1.sources.s1.port = 44444


# define the memory channel:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


#define the kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test_natty
a1.sinks.k1.brokerList = 10.198.193.189:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动flume agent:

$bin/flume-ng agent -n a1 -c -conf -f conf/kafka-sink.properties -Dflume.root.logger=INFO,console

启动了flume之后,查看端口监听的情况,可以看到44444端口已经被监听:

$netstat --help
$netstat -nltp

2.执行结果测试

flume启动后,会有一个Application进程在监听localhost:44444端口,如果有错误想杀掉flume进程,直接查询flume的PID后kill即可。

$ps -aux | grep flume
$kill -9 XXX

下面,我们尝试通过telnet 向localhost:44444发送数据, 再开启一个Console-consumer来监控flume是否已经开始往topic test_natty中sink了数据。
开启telnet:

$telnet localhost 44444
telnet往44444端口发送数据.png

Console-consumer消费topic数据:


Console-Consumer消费数据.png

5. Storm与Kafka集成:

下面我开发了一个简单的例子,来展示storm 普通Topology 和Trident读取Kafka数据源的方法,后续的bolt的开发就根据业务来定制化。测试方法:在第4部分,配置了一个nc cluster host --> Kafka的配置。 我们就使用这个例子来进行测试,后续Storm的处理也非常简单(直接打印)。最后,测试效果是,通过nt 命令发送一些数据,storm会将nc命令输入的一些string,打印在console上,这样完成测试。

1.Kafka与普通的Topology集成:

1.需要引入maven的storm-kafka依赖包。Storm的spout读取kafka数据源。

2. Kafka与Trident集成:

3. 使用KafkaBolt将数据存入Kafka:

一般情况,将数据存入Kafka的Case很少,如果需要的话,使用KafkaBolt来实现。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容