了解生产者如何向kafka集群中发送消息的,消费者如何从集群中消费消息?zookeeper到底扮演者着什么角色?它跟生产者和消费者之间有没有关系?
安装kafka
在安装kafka之前,需要安装zookeeper,zookeeper是用来管理kafka集群的,安装完zk之后并启动。
安装kafka
wget https://archive.apache.org/dist/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
tar -xf kafka_2.11-0.11.0.0.tgz
cd kafka_2.11-0.11.0.0
为了学习的方便,我们在同一台机器上搭建伪集群。新建三个配置文件,修改如下:
# server1.properties
broker.id=0
port=9092
host.name=127.0.0.1
log.dirs=/Users/Liang/dataLog/kafka/k1
# server2.properties
broker.id=0
port=9093
host.name=127.0.0.1
log.dirs=/Users/Liang/dataLog/kafka/k2
# server3.properties
broker.id=0
port=9094
host.name=127.0.0.1
log.dirs=/Users/Liang/dataLog/kafka/k3
分别启动kafka
bin/kafka-server-start.sh -daemon config/server1.properties
bin/kafka-server-start.sh -daemon config/server2.properties
bin/kafka-server-start.sh -daemon config/server3.properties
这时候三台kafka向zk去注册自己的相关信息(节点),我们连上zk服务上观察一下:
[zk: localhost:2181(CONNECTED) 24] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 27] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 28] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 53] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1576225146987","port":9092,"version":4}
cZxid = 0x76
ctime = Fri Dec 13 16:19:06 CST 2019
mZxid = 0x76
mtime = Fri Dec 13 16:19:06 CST 2019
pZxid = 0x76
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x16efd41071f000b
dataLength = 188
numChildren = 0
新建topic
$ bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic first --partitions 2 --replication-factor 2
我们发现zk下多了一个first的topic
[zk: localhost:2181(CONNECTED) 38] ls /brokers/topics
[first]
当然我们也可以用kafka提供的命令查看
$ bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
first
仔细观察一下日志,可以发现topic为first对应的两个分区分布在三台kafka机器上
查看topic信息
$ bin/kafka-topics.sh --describe --topic first --zookeeper 127.0.0.1:2181
Topic:first PartitionCount:2 ReplicationFactor:2 Configs:
Topic: first Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: first Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
生产者
向kafka集群中发送消息,需要指定kafka机器的ip(事实上,访问任何一台机器都可以获取到集群的其他机器信息,发送消息是往leader中发送的)
$ bin/kafka-console-producer.sh --topic first --broker-list 127.0.0.1:9092
>hello
消费者
$ bin/kafka-console-consumer.sh --topic first --zookeeper 127.0.0.1:2181
hello
如果消费者不在线,当我们重新打开consumer的时候,我们可以指定从头开始消费
$ bin/kafka-console-consumer.sh --topic first --zookeeper 127.0.0.1:2181 --from-beginning
hello
参考
Kafka 客户端是如何找到 leader 分区的
zookeeper与Kafka的关系
再过半小时,你就能明白kafka的工作原理了