前置条件
- Java环境
这个网上一搜一大堆,简单记一下
jdk下载地址。注意这个在网页上下载时候是要同意协议的,所以wget下载下来的可能无法解压,所以手动下了。
解压,我的是解压在了/usr/local/java路径,后面设置环境变量用
$ tar -zxvvf jdk-8u181-linux-x64.tar.gz
编辑 /etc/profile
#java environment
export JAVA_HOME=/usr/local/java/jdk1.8.0_181/
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
$ source /etc/profile
$ java -version
- ZooKeeper
$ wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
$ tar -zxvf zookeeper-3.4.12.tar.gz
$ cd zookeeper-3.4.12/conf/
$ cp zoo_sample.cfg zoo.cfg (配置文件,保留一份做备份)
vim zoo.cfg,自己指定地址
dataDir=/data1/data/zookeeper
dataLogDir=/data1/data/zookeeper
启动服务
$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data1/softwares/zookeeper-3.4.12/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
然而上面这个日志不代表真的成功
查看是否启动成功
$ bin/zkServer.sh status
当前文件夹下有一个 zookeeper.out 可以查看错误原因
kafka安装
1. 下载
$ wget http://mirrors.shu.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
$ tar -xzf kafka_2.11-2.0.0.tgz
$ cd kafka_2.11-2.0.0
2. 启动服务
kafka需要ZooKeeper服务,如果前置条件里面的ZooKeeper服务没有启动起来的话,kafka文件夹下也提供了一个脚本能快捷的启动一个单节点ZooKeeper实例。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动ZooKeeper之后就可以启动kafka服务了
$ bin/kafka-server-start.sh config/server.properties
3. 创建一个topic
简单起见,先创建一个单分区一个副本集的topic => 'test'
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
执行以下指令即可看见我们创建的topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
也可以将代理配置为在发布不存在的topic时自动创建topic,而不是手动创建。
4. 发送一些消息
Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。
运行生产者producer,然后在控制台中键入一些消息以发送到服务器。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
5. 启动消费者
Kafka还有一个命令行终端,它会将消息转储到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你在不同的终端中运行上述每个命令,那么你现在应该能够在生产者终端中键入消息并看到它们出现在消费者终端中。
所有命令行工具都有其他选项;运行不带参数的命令将显示更详细地记录它们的使用信息。
6. 设置多代理集群
到目前为止,我们一直在与一个单broker链接,但这并不好玩。对于Kafka来说,单broker只是一个大小为1的集群,因此我们必须启动更多broker。为了感受它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)
$ cp config/server.properties config/server-1.properties
$ cp config/server.properties config/server-2.properties
编辑配置文件
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/data1/data/kafka/log-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/data1/data/kafka/log-2
broker.id是集群中每个节点唯一且持久的名字。
启动两个节点
$ bin/kafka-server-start.sh config/server-1.properties &
...
$ bin/kafka-server-start.sh config/server-2.properties &
...
如果遇到报错
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
这时候一般是内存不足,修改启动脚本
vim bin/kafka-server-start.sh
找到
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
这一行,把它限制小一点即可。例如
现在创建一个复制因子为3的新topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic-2
好的,但现在我们有一个集群,我们怎么知道哪个broker正在做什么?运行“describe topics”命令:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic-2
输出
第一行给出了所有partitions的摘要,每个附加行提供有关一个partition的信息。由于我们只有一个partition用于此topic,因此只有一行。
“leader”是负责响应指定partition的所有读取和写入的请求。每个节点将随机成为partition的leader。
“replicas”是复制此分区日志的节点列表,无论它们是否为leader,或者它们当前是否处于活动状态。
“isr”是“同步”的复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
请注意,在我的示例中,节点1是该topic的唯一partition的leader。
我们可以在我们之前创建的topic上运行相同的命令,以查看它的位置:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
毫无疑问 - 原来的topic没有副本,位于服务器0上,是我们创建它时群集中唯一的服务器。
测试一下容错性
先像步骤4、5一样,两个终端,对新topic起一个生产者,一个消费者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
杀掉主节点
查询状态
主节点已更换
之前的生产者消费者仍可继续运行。
参考资料
官方文档
[在CentOS 7上安装Kafka](https://www.mtyun.com/library/how-to-install-kafka-on-centos7