Kafka基础与搭建
一、先行要事
1.Java环境
由于kafka是使用Java语言进行的开发的应用程序,所以需要提前配置JDK环境。
2.Zookeeper安装
kafka的设计是通过Zookeeper进行保存集群的元数据信息和消费者信息。
Zookeeper是kafka的前提依赖。
在kafka的应用程序中包含有Zookeeper轻量应用,可以通过脚本直接运行。不过,也可以通过单独下载进行部署安装Zookeeper应用。
①.Zookeeper单机模式的部署配置如下:
$: tar xzvf zookeeper-3.4.13.tar.gz
$: mv zookeeper-3.4.13 /usr/local/zookeeper
$: vim /usr/local/zookeeper/conf/zoo.cfg
# 修改zoo.cfg文件中以配置
tickTime=2000 #Zookeeper心跳时间【2000】检测,单位为毫秒
dataDir=/var/lib/zookeeper #默认情况下,Zookeeper事务日志和数据快照保存路径
clientPort=2181 #客户端连接服务器的端口
$: /usr/local/zookeeper/zkServer.sh start #启动Zookeeper服务
②.Zookeeper集群模式的部署配置如下:
Tips:Zookeeper集群成为群组(Ensemble).关于群组节点需要注意的是数量问题:
-
群组包含节点数量尽量不要超过7个。
- 由于Zookeeper使用了一致性协议,当节点数量超过7个时,极可能造成整个群组的性能下降。
-
群组包含节点数量尽量为奇数个。
- 由于Zookeeper的quorum(仲裁)的思想,详细见【Zookeeper-quorum】文档。
$: vim /usr/local/zookeeper/conf/zoo.cfg
# 修改zoo.cfg文件中以配置
tickTime=2000 #Zookeeper心跳时间【2000】检测,单位为毫秒
dataDir=/var/lib/zookeeper #默认情况下,Zookeeper事务日志和数据快照保存路径
clientPort=2181 #客户端连接服务器的端口
initLimit=20 #flower最初连接leader时的超时值,单位为tickTime的倍数
syncLimit=5 #flower与leader之间进行Sync操作的超时值,单位为tickTime的倍数
# server.x=[hostname]:peerPort:leaderPort[:observer]
# x为群组中服务器的ID值.它必须是一个整数,不过不一定要从0开始,也不要求是连续的
# peerPort用于事务的发送
# leaderPort用于leader选举
# observe参数可选,为了标记服务器是否进入观察者模式
server.1=hostname1:2888:3888:observer
server.2=hostname2:2888:3888
server.3=hostname3:2888:3888
# 客户端只需要通过clientPort就能连接到群组,
# 而群组节点间的通信则需要同时用到这3个端口peerPort、leaderPort、clientPort。
3.Kafka安装
# 下载Kafka:http://kafka.apache.org/downloads
# 将下载的安装包kafka_2.12-2.1.1.tgz 进行解压操作
$: tar xzvf kafka_2.12-2.1.1.tgz
# 可参考kafka的配置文件参数说明自行配置kafka运行参数(见附表)
# 将kafka放在/usr/local/kafka路径下
mv -r kafka_2.12-2.1.1 /usr/local/kafka
# 启动kafka(前提已启动Zookeeper服务)
/usr/local/kafka/bin/kafka-server-start.sh -demon /usr/local/kafka/config/server.properties
一旦kafka服务启动完成,就可以针对kafka服务进行简单操作:
- 创建名称为test的Topic:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 验证test Topic:
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
# 终端窗口将会显示:
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic:test Partition:0 Leader:0 Replicas:0 Isr:0
- 向test Topic发送消息:
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 终端输入消息,回车发送:
Test Message Producer One
Test Message Producer Two
- 从test Topic读取消息:
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
# 终端显示读取的消息:
Test Message Producer One
Test Message Producer Two
Consumer 2 messages
附录:Kafka Broker配置项
配置选项 | 作用 | 默认值 | 备注 |
---|---|---|---|
broker.id | 表示在kafka集群中的唯一表示符 | 0 | 建议将此ID设置与主机名中所包含的唯一性数字相同 |
port | kafka服务的端口号 | 9092 | 注意配置1024以下的端口需要root权限 |
zookeeper.connect | 用于保存broker元数据的Zookeeper服务地址 | localhost:2181 | 其格式为: hostname:port/path具体含义如下 hostname:Zookeeper服务器的IP地址或者主机名 port:Zookeeper服务的端口号 /path:此项为可选Zookeeper路径,作为kafka集群的chroot环境。如果不指定,默认使用根路径/ |
log.dir | 日志存放路径 | — | 它是一组通过逗号隔开的路径本地系统文件路径。如果指定多个路径,broker会通过最少使用原则,把同一分区下的日志片段保存到同一个路径下。特别注意,broker是向拥有最少数目分区的路径进行新增分区,并非最小本地磁盘空间的路径新增分区。 |
num.recovery.threads.per.data.dir | 处理日志片段线程数 | 1 | 这些线程只是在服务启动或者关闭的时候才会使用到,所以可以使用大量的线程来处理以至于达到并行操作的目的。特别是包含有大量分区的服务器,一旦崩溃,在进行恢复并行操作时,可以节省大量时间。 |
auto.create.topics.enable | 自动创建Topic | true | 默认情况下,kafka服务会在以下三种情况下自动创建Topic: ①.当一个生产者开始往topic写入消息时; ②.当一个消费者开始从topic读取消息时; ③.当任意一个客户端向topic发送元数据请求时。 大多时候这些行为都是非预期的。根据kafka的协议,一个topic不先被创建,是无法知道此topic是否已经存在。如果把此项值改为false,可以通过显式的方式创建,不管是手动创建还是通过其他配置的方式。 |