简介
部分引用来自博客:https://juejin.im/post/5a67f7e7f265da3e3c6c4f8b
Apache Kafka是一个分布式的发布-订阅系统和一个强大的队列。
特点:
1、高吞吐(单机每秒10W条消息传输)
2、消息持久化(磁盘)
3、分布式(支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输。这样易于向外扩展,所有的producer、broker 和 consumer 都会有多个,均为分布式的)
4、消费消息采用pull模式(由consumer保存offset)
5、支持online和offline场景
基本概念:
1、 Broker
Kafka 集群中的一台或多台服务器统称为 Broker,可以简单理解为一个kafka节点。
*Kafka集群通常使用多个Broker来实现集群的负载均衡
*Kafka brokers 是无状态的,它们使用 ZooKeeper 来保持集群信息
2、ZooKeeper
是一个分布式配置和同步服务,管理和协调Kafka broker
3、Topic
每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic 。(物理上不同Topic 的消息分开存储。逻辑上一个 Topic 的消息虽然保存于一个或多个broker上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
4、 Partition
Topic物理上的分组
,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)。但整个Topic消息不一定有序。
5、Segment
包含消息内容的指定大小的文件
*由 index 文件和 log 文件组成;
*一个 Partition 由多个 Segment 文件组成
6、Offset
Segment 文件中消息的索引值, 从 0 开始计数
7、Replica (N)
消息的冗余备份
*每个 Partition 都会有 N 个完全相同的冗余备份, 这些备份会被尽量分散存储在不同的机器上。
8.、Producer
消息和数据的生产者,可以理解为往 Kafka 发消息的客户端
9、 Consumer
消息和数据的消费者,可以理解为从 Kafka 取消息的客户端
10、Consumers
由于 Kafka brokers 是无状态的, 因此需要Consumer来维护根据partition offset
已经消费的消息数量信息。
*如果 consumer 确认了一个指定消息的offset
,那也就意味着 consumer 已经消费了该offset
之前的所有消息。
*Consumer可以向Broker异步发起一个拉取消息的请求来缓存待消费的消息。consumers 也可以通过提供一个指定的offset
值来回溯或跳过Partition中的消息。
*Consumer 消费消息的offset
值是保存在ZooKeeper中的。
11、Consumer Group
每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定Group Name,若不指定 Group Name 则属于默认的 Group
)。
这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。如果要实现广播,只要每个 Consumer 有一个独立的 Consumer Group 就可以了。如果要实现单播只要所有的 Consumer 在同一个 Consumer Group 。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic 。
本旨在探究kafka的部署及使用,下面介绍以docker方式安装部署:
单机部署
第一步:下载镜像。
// 选一个下载
docker pull wurstmeister/kafka
docker pull zookeeper
第二步:创建通信网络。由于要涉及到zookeeper和kafka之间的通信,运用docker内部容器通信机制先新建一个网络。
[root@sz-ben-dev-01 ~]# docker network create hbl_test (新建网络)
dcb683a23044e902b251e01f493c814f940bd5bb592025c9eb4b78902f45091f
[root@sz-ben-dev-01 ~]# docker network ls (查看网络)
NETWORK ID NAME DRIVER SCOPE
dcb683a23044 hbl_test bridge local
[root@sz-ben-dev-01 ~]# docker network inspect hbl_test (查看网络详细信息)
[
{
"Name": "hbl_test",
"Id": "dcb683a23044e902b251e01f493c814f940bd5bb592025c9eb4b78902f45091f",
"Created": "2019-11-07T11:30:01.007966557+08:00",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": {},
"Config": [
{
"Subnet": "172.19.0.0/16",
"Gateway": "172.19.0.1"
}
]
},
"Internal": false,
"Attachable": false,
"Containers": {}, (连接的容器为空)
"Options": {},
"Labels": {}
}
]
第三步:创建zookeeper和kafka容器. (kafka容器启动有坑,接着往下看)
docker run --net=hbl_test --name hbl_zookeeper -p 21810:2181 -d docker.io/zookeeper
docker run --net=hbl_test --name hbl_kafka -p 9093:9092 \
--link hbl_zookeeper \
-e KAFKA_ZOOKEEPER_CONNECT=172.19.0.2:2181 \
-e KAFKA_ADVERTISED_HOST_NAME=172.28.2.104 \
-e KAFKA_ADVERTISED_PORT=9092 \
-d wurstmeister/kafka
KAFKA_ADVERTISED_HOST_NAME参数需要设置为宿主机地址172.28.2.104。
KAFKA_ZOOKEEPER_CONNECT参数设置hbl-zookeeper容器内部地址和端口(同一宿主机内的容器互相访问要用容器内地址,查看指令为docker inspect hbl_zookeeper,在Networks字段可以看到容器内ip地址)。
"Networks": {
"hbl_test": {
......
"IPAddress": "172.19.0.2",
......
}
}
这时在去查看网络hbl_test,zookeeper和kafka容器都已经加入hbl_test网络了。
[root@sz-ben-dev-01 ~]# docker network inspect hbl_test
[
{
"Name": "hbl_test",
......
"Containers": {
"1ea442cae355872506cfa6d1aab71149da8dfa0fd2d1b05c126f32112cbe957a": {
"Name": "hbl_zookeeper",
"EndpointID": "414a64059817d85133f5bcc32f4789aecfccdda5a5f987828fd59b86a3d8dcd6",
"MacAddress": "02:42:ac:13:00:02",
"IPv4Address": "172.19.0.2/16",
"IPv6Address": ""
},
"761485f630e9fd09e4737c511744da74018612e9a07f672927b123d3fc46b908": {
"Name": "hbl_kafka",
"EndpointID": "24bd176e9c828b577f8f269f081fcb83508e66764f6a46e08d3f59b36e6e1528",
"MacAddress": "02:42:ac:13:00:03",
"IPv4Address": "172.19.0.3/16",
"IPv6Address": ""
}
},
......
}
]
测试发送消息和接收消息
进入容器查看消费者信息或发布生产者消息
// 进入容器
docker exec -it hbl_kafka bash
// 进入kafka所在目录
cd opt/kafka_2.12-2.3.0/bin/
// 启动消息发送方
./kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
// 再开一个终端进入容器,启动消息接收方
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mykafka --from-beginning
中途踩坑
1、启动时kafka容器docker logs -f hbl_kafka查看日志报错
WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 (/172.28.2.104:9095) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
解决:防火墙对应端口没开
// 开启9093端口
firewall-cmd --add-port=9093/tcp --permanent --zone=public
// 重载配置
firewall-cmd --reload
// 查看端口
firewall-cmd --list-port
2、进入生产者发送消息报错
WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 105 : {mykafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解决:在一篇博客上看到说后来发现最新版本0.10.x broker配置弃用了advertised.host.name
和 advertised.port
这两个个配置项,就配置advertised.listeners就可以了。
advertised.listeners=PLAINTEXT://59.64.11.22:9092
继续
运行指令docker stop hbl_kafka停掉之前的kafka,重新启动一个kafka容器:
docker run --net=hbl_test --name hbl_kafka2 -p 9094:9092 \
--link hbl_zookeeper \
-e KAFKA_BROKER_ID=4 \
-e KAFKA_ZOOKEEPER_CONNECT=172.19.0.2:2181 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.28.2.104:9094 \
-d wurstmeister/kafka
从第一个坑里还能看到如果我们不指定broker id,会随机分配一个,所以还是指定一个吧。(小伙伴们这里要开启防火墙9094端口哟)
进入kafka容器发送消息
bash-4.4# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>hh
>kk
>nihao
>ss
>yapi
>
接收消息
bash-4.4# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
hh
kk
nihao
ss
yapi
上述做法是单个容器部署的,其实已经有点麻烦且容易出错了,而作为分布式消息队列,生产环境上肯定是集群部署,所以推荐用下面的docker compose容器编排工具来部署。
首先编写
docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2182:2181"
networks:
- hbl_net
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
links:
- zookeeper
networks:
- hbl_net
ports:
- "9095:9092"
environment:
KAFKA_BROKER_ID: 5
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9095 #宿主机监听端口
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
hbl_net:
driver: bridge # 生成一个桥接网络,用于容器内部通信,注意实际生成的网络名称会带有docker-compose.yml文件所在文件夹的前缀,比如我的.yml文件放在了hbl文件夹下,所以执行后生成的网络名为hbl_hbl_net
# external: true 如果外部已有网络就用这个配置
执行docker-compose up -d
即可。
集群部署
https://www.cnblogs.com/yingww/p/9188701.html
docker-compose.yml
version: '3'
services:
zoo1:
image: zookeeper
restart: always
container_name: zoo1
ports:
- "21811:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:3888:observer
zoo2:
image: zookeeper
restart: always
container_name: zoo2
ports:
- "21812:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:3888:observer
zoo3:
image: zookeeper
restart: always
container_name: zoo3
ports:
- "21813:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:3888:observer
zoo4:
image: zookeeper
restart: always
container_name: zoo4
ports:
- "21814:2181"
environment:
ZOO_MY_ID: 4
PEER_TYPE: observer
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 server.4=zoo4:2888:388:observer
broker1:
image: wurstmeister/kafka
restart: always
container_name: broker1
ports:
- "9192:9092"
depends_on:
- zoo1
- zoo2
- zoo3
- zoo4
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9192
volumes:
- /var/run/docker.sock:/var/run/docker.sock
broker2:
image: wurstmeister/kafka
restart: always
container_name: broker2
ports:
- "9292:9092"
depends_on:
- zoo1
- zoo2
- zoo3
- zoo4
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9292
volumes:
- /var/run/docker.sock:/var/run/docker.sock
broker3:
image: wurstmeister/kafka
restart: always
container_name: broker3
ports:
- "9392:9092"
depends_on:
- zoo1
- zoo2
- zoo3
- zoo4
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.28.2.104:9392
volumes:
- /var/run/docker.sock:/var/run/docker.sock
后续待验证,服务器内存不够
探究:
1、不显示指定消费者组是否会默认分配到同一个默认group?
划重点
1、为什么要配置KAFKA_LISTENERS?
listeners 监听列表,使用0.0.0.0绑定到所有接口
2、为什么要配置KAFKA_ADVERTISED_LISTENERS?
advertised.listeners 发布到zookeeper供客户端使用的监听器,可以简单理解为kafka提供的外部访问地址
3、为什么要挂载 /var/run/docker.sock?
Docker Daemon默认监听的是/var/run/docker.sock这个文件,所以docker客户端只要把请求发往这里,daemon就能收到并且做出响应。把宿主机的/var/run/docker.sock映射到容器内后,在容器内也能够向/var/run/docker.sock发送http请求和Docker Daemon通信。
(参考博文https://blog.csdn.net/boling_cavalry/article/details/92846483)
小Tips
1、查看kafka的配置
位于kafka容器内 opt/kafka_2.12-2.3.0/config/ 路径下,查看server.properties文件部分信息如下:
......
group.initial.rebalance.delay.ms=0
advertised.port=9093
advertised.host.name=172.28.2.104
port=9092
2、查看kafka在zookeeper中的信息
进入zookeeper容器,/apache-zookeeper-3.5.6-bin/bin 路径下执行./zkCli.sh 进入客户端,可以查看当前broker id或topic分区等等信息
[zk: localhost:2181(CONNECTED) 6] ls /brokers/ids
[1011]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/ids
[4]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/mykafka/partitions
[0]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics/test1/partitions
[0]
[zk: localhost:2181(CONNECTED) 10]
2、docker-compose安装使用
// 下载
curl -L https://github.com/docker/compose/releases/download/1.12.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
// 设置可执行权限
chmod +x /usr/local/bin/docker-compose