1. Kafka重要概念和技术架构:
1.实时数据分析中的Kakfa
在实时数据分析应用,Kafka的位置非常重要。首先通过Flume将Nginx服务器的日志,直接sink到Kakfa。然后通过Storm等实时计算框架,将Kafka数据计算并写入到HBase/Redis中,最后通过web客户端直接访问HBase/Redis来展示数据。
1.Kafka消息存储:Kafka的消息存储在Broker节点的磁盘上,并且是顺序写。
2.Kafka消息的消费特性:消费者消费Kafka上的消息,不会对消息进行删除操作,消息可以被不同的消费者重复消费。在Kafka Broker上保存的时间,由参数log.retention.hours等确定
2.Kafka的重要概念:
1.Broker:Kafka集群中的每一个节点服务器,集群是由一个或者多个Broker组成的。
2.Producer:消息的生产者
3.Consumer:消息的消费者
4.Topic:保存消息的逻辑单元,类似database中的table。消息按照不同类别发布到Kafka集群上,每个类别称之为一个topic。
5.Partition:Topic内的消息,在物理上按照分区(Partition)存储。
6.Consumer Group:让某几个consumer共同消费一个topic中的消息,这几个consumer不会出现重复消费消息的情况。
3.Kafka架构图:
Kafka集群只是一个或者多个Broker节点,节点间通信通过Zookeeper完成。
2. Kafaka集群安装
一定按照官方的文档来安装和部署。
1.安装计划部署:
计划在三台主机上安装部署Kafka集群:192.168.8.128,192.168.8.129,192.168.8.130。我选用的Kafka安装版本:kafka_2.10-0.10.1.0.tgz,在官网下载安装文件,下载后,解压压缩包:
$ tar zxvf kafka_2.10-0.10.1.0.tgz -C /opt/modules/
2. 修改配置文件:
切换到KAFKA_HOME目录,修改配置文件config/server.properties文件。
$ cd /opt/modules/kafka_2.10-0.10.1.0/
$ vi config/server.properties
sever.properties中有很多配置项,一般地,以下需要修改来完成kafka集群配置:
#节点唯一标识号,kafka集群中的每个broker,该项都不能重复
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
#Kafka消息的存放路径,注意不是日志而是Kafka的消息
# A comma seperated list of directories under which to store log files
log.dirs=/opt/modules/kafka_2.10-0.10.1.0/kafka-logs
#Kafka集群保存消息的时间,单位是时间。也就是Kafka消息在集群上保存1周后删掉。
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
#Zookeeper服务器的地址,逗号分隔多个zookeeper主机
zookeeper.connect=hadoop-senior01.pmpa.com:2181,hadoop-senior02.pmpa.com:2181,hadoop-senior03.pmpa.com:2181
3.拷贝其他主机,修改broker.id
1.拷贝主机:
$ scp -r kafka_2.10-0.10.1.0/ natty@hadoop-senior03.pmpa.com:/opt/modules/
2.在config/server.properties配置文件中修改broker.id项
每一个broker的server.properties需要修改。
4.启动Kafka
启动kakfa,使用脚本bin/kafka-server-start.sh脚本,参数是config/server.properties文件。
$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
可以写一个shell来批量启停kafka集群。在每一个broker上执行启动和停止服务器的脚本。kafkaServer_batch.sh:
#!/bin/bash
KAFKA_HOME=/opt/modules/kafka_2.10-0.10.1.0
if [ $# -ne 1 ]
then
echo "Usage:kafkaServer_batch.sh [start|stop]";
exit -1;
fi
action=$1
if [ $action = "start" ]
#start add parameters!!!!
then
batchscript="kafka-server-start.sh ${KAFKA_HOME}/config/server.properties";
elif [ $action = "stop" ]
then
batchscript=kafka-server-stop.sh;
else
echo "bad parameter!!";
exit -1;
fi
echo $batchscript
for i in 192.168.8.128 192.168.8.129 192.168.8.130
do
echo "${action} node ${i} Begin:"
echo "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &"
ssh ${i} "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &" &
echo "${action} node ${i} End:"
done
启动后,jps查看进程,可以看到Kafka进程:
2921 QuorumPeerMain
3025 Kafka
3308 Jps
3. Kafka使用测试(ConsoleProducer、ConsoleConsumer)
Kafka集群安装配置好了之后,我们用自带的命令行生产、消费者来测试下kafka集群的使用。开启ConsolePrducer和ConsoleConsumer,使用如下2个脚本:
$KAFKA_HOME/bin/kafka-console-consumer.sh
$KAFKA_HOME/bin/kafka-console-producer.sh
不带任何参数运行这2个脚本,可以查看帮助信息(其他这些shell脚本也可以这样查看帮助)。注意,必须的参数前有个REQUIRED标志。
先创建一个测试的topic:
$bin/kafka-topics.sh --create --topic test_natty --partitions 3 --replication-factor 1 --zookeeper vm-master:2181
启动一个命令行的producer:
$bin/kafka-console-producer.sh --broker-list vm-master:9092 --topic test_natty
启动一个命令行的consumer:
$bin/kafka-console-consumer.sh --topic test_natty --zookeeper vm-master:2181
这样就开启了2个空白的窗口,在producer的窗口输入了消息后,就可以在consumer窗口看到刚才发送的消息:如下图:
producer输入:
consumer输出:
4. Flume sink数据到Kafka
1.首先增加一个flume agent配置
新增一个配置文件,kafka-sink.properties, 该配置文件实现如下flume配置:
Source: netcat (监听某个服务器44444端口的数据,nc命令)
Channel: Memory
Sink: Kafka Topic(使用之前创建的测试topic:test_natty )
在使用时候,需要注意使用的flume的版本,在确定了flume的版本后,到官网找到 Kafka Sink部分的配置信息,按照例子来填写Kafka Sink就可以了。之前在测试时候,使用了一个flume 1.8 和 kafka 0.8版本的组合,发现了问题。最终的问题的原因是Kafka版本过低,需要到0.10版本才能解决该问题,所以在测试这个case时,还需要注意使用的flume、kafka的版本。下面的例子使用的flume 1.6版本
文件 kafka-sink.properties 的详细配置文件如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the netcat source:
a1.sources.s1.type = netcat
a1.sources.s1.bind = localhost
a1.sources.s1.port = 44444
# define the memory channel:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#define the kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test_natty
a1.sinks.k1.brokerList = 10.198.193.189:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动flume agent:
$bin/flume-ng agent -n a1 -c -conf -f conf/kafka-sink.properties -Dflume.root.logger=INFO,console
启动了flume之后,查看端口监听的情况,可以看到44444端口已经被监听:
$netstat --help
$netstat -nltp
2.执行结果测试
flume启动后,会有一个Application进程在监听localhost:44444端口,如果有错误想杀掉flume进程,直接查询flume的PID后kill即可。
$ps -aux | grep flume
$kill -9 XXX
下面,我们尝试通过telnet 向localhost:44444发送数据, 再开启一个Console-consumer来监控flume是否已经开始往topic test_natty中sink了数据。
开启telnet:
$telnet localhost 44444
Console-consumer消费topic数据:
5. Storm与Kafka集成:
下面我开发了一个简单的例子,来展示storm 普通Topology 和Trident读取Kafka数据源的方法,后续的bolt的开发就根据业务来定制化。测试方法:在第4部分,配置了一个nc cluster host --> Kafka的配置。 我们就使用这个例子来进行测试,后续Storm的处理也非常简单(直接打印)。最后,测试效果是,通过nt 命令发送一些数据,storm会将nc命令输入的一些string,打印在console上,这样完成测试。
1.Kafka与普通的Topology集成:
1.需要引入maven的storm-kafka依赖包。Storm的spout读取kafka数据源。
2. Kafka与Trident集成:
3. 使用KafkaBolt将数据存入Kafka:
一般情况,将数据存入Kafka的Case很少,如果需要的话,使用KafkaBolt来实现。