一、Kafka简介
Apache Kafka最初由LinkedIn开发,并在2011年初开源。Kafka是一个分布式消息队列。★Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
Kafka 和传统的消息系统不同在于:
1.Kafka是一个分布式系统,易于向外扩展。
2.它同时为发布和订阅提供高吞吐量。
3.它支持多订阅者,当失败时能自动平衡消费者。
4.消息的持久化。
-
消息传输流程:
Producer即生产者
,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。
Topic即主题
,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息
Consumer即消费者
,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。
从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。 -
kafka服务器消息存储策略:
谈到kafka的存储,就不得不提到分区,即partitions
,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。 -
与生产者的交互:
生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中
也可以通过指定均衡策略来将消息发送到不同的分区中
如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中 -
与消费者的交互
在消费者消费消息时,kafka使用offset来记录当前消费的位置
在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。
对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费
因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。
官网:http://kafka.apache.org/
二、 Kafka下载安装
前置条件:安装scala。
# 下载
[root@localhost ~]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
# 解压
[root@localhost ~]# tar -zxvf kafka_2.12-2.3.0.tgz -C /usr/local
- 配置
# 修改配置文件
[root@localhost ~]# cd /usr/local/kafka_2.12-2.3.0/
# 去掉注释,添加主机名,一定不要写IP地址
listeners=PLAINTEXT://localhost:9092
zookeeper.properties 是zookeeper的配置文件,默认端口号2181,可不做修改
server.properties 是kafka配置文件,将 zookeeper.connect 这行 改为自己的zookeeper地址和端口号
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.77.132:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
三、 启动测试
- 启动
# 启动Zookeeper server(自带)--端口为:2181
[root@localhost kafka_2.12-2.3.0]# ./bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka server--端口为:9092
[root@localhost kafka_2.12-2.3.0]# ./bin/kafka-server-start.sh config/server.properties &
# 停止Kafka server
[root@localhost kafka_2.12-2.3.0]# ./bin/kafka-server-stop.sh
# 停止Zookeeper server
[root@localhost kafka_2.12-2.3.0]# ./bin/zookeeper-server-stop.sh
- 单机连通性测试
启动2个客户端,一个用于生产者发送消息,一个用于消费者接受消息。
# 运行producer
[root@localhost kafka_2.12-2.3.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 运行consumer
[root@localhost kafka_2.12-2.3.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
生产端发送消息:
注意:
producer,指定的Socket(localhost+9092),说明生产者的消息要发往kafka,也即是broker
consumer, 指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)
四、Kafka的web 监控平台
为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,有如下功能:
1.管理多个kafka集群
2.便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况)
3.选择你要运行的副本
4.基于当前分区状况进行
5.可以选择topic配置并创建topic(0.8.1.1和0.8.2的配置不同)
6.删除topic(只支持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true)
7.Topic list会指明哪些topic被删除(在0.8.2以上版本适用)
8.为已存在的topic增加分区
9.为已存在的topic更新配置
10.在多个topic上批量重分区
11.在多个topic上批量重分区(可选partition broker位置)
kafka-manager 项目地址:https://github.com/yahoo/kafka-manager
- 下载安装
# 下载
[root@localhost ~]# wget https://github.com/yahoo/kafka-manager/archive/2.0.0.2.tar.gz
# 解压
[root@localhost ~]# tar -zxvf 2.0.0.2.tar.gz -C /usr/local
# 安装sbt
[root@localhost kafka-manager-2.0.0.2]# curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
[root@localhost kafka-manager-2.0.0.2]# mv bintray-sbt-rpm.repo /etc/yum.repos.d/
[root@localhost kafka-manager-2.0.0.2]# yum install sbt
# 验证:检查sbt是否安装成功,查看命令输出
[root@localhost kafka-manager-2.0.0.2]# sbt -version
[info] [launcher] getting org.scala-sbt sbt 1.2.8 (this may take some time)...
# 安装完成后,一般你电脑的主目录/home/your_user_name/下一般都会有.sbt这个文件夹
[root@localhost ~]# ls -a
.sbt
# 在根目录下新建文件repositories,并编辑如下内容
# 修改仓库地址:(sbt 默认下载库文件很慢),我们可以在用户目录下创建 touch ~/.sbt/repositories, 填上 阿里云的镜像 # vi ~/.sbt/repositories
[root@localhost kafka-manager-2.0.0.2]# touch ~/.sbt/repositories
[root@localhost kafka-manager-2.0.0.2]# vi ~/.sbt/repositories
# 内容
[repositories]
#local
public: http://maven.aliyun.com/nexus/content/groups/public/
typesafe:http://dl.bintray.com/typesafe/ivy-releases/ , [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
ivy-sbt-plugin:http://dl.bintray.com/sbt/sbt-plugin-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]
sonatype-oss-releases
sonatype-oss-snapshots
# 进入kafka-manager的解压目录,编译kafka-manager,很慢(主要是下载依赖的jar包)
[root@localhost kafka-manager-2.0.0.2]# ./sbt clean dist
Getting org.scala-sbt sbt 1.2.8 (this may take some time)...
# 编译完成后, kafka-manager的解压目录下就会多了一个target文件夹
[root@localhost kafka-manager-2.0.0.2]# ls target
scala-2.12 streams universal web
[root@localhost kafka-manager-2.0.0.2]# ls target/universal/
kafka-manager-2.0.0.2.zip scripts
# 安装unzip
[root@localhost kafka-manager-2.0.0.2]# yum install unzip
# 解压编译好的universal目录中的kafka-manager-2.0.0.2.zip
[root@localhost kafka-manager-2.0.0.2]# mkdir /usr/local/kafka-manager
[root@localhost kafka-manager-2.0.0.2]# unzip target/universal/kafka-manager-2.0.0.2.zip -d /usr/local/kafka-manager
# 修改配置 conf/application.properties
[root@localhost kafka-manager-2.0.0.2]# vi /usr/local/kafka-manager/kafka-manager-2.0.0.2/conf/application.conf
# 内容
#kafka-manager.zkhosts="localhost:2181" ##注释这一行,下面添加一行
kafka-manager.zkhosts="192.168.77.132:2181"
# 启动
[root@localhost kafka-manager-2.0.0.2]# ./bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080
# 后台启动
[root@localhost kafka-manager-2.0.0.2]# nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &
启动较慢需耐心等待。
http://192.168.77.132:8080/
-
创建集群
点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面:
-
查看broker信息
-
创建topic
点击【Topic】>【Create】可以方便的创建并配置主题。
-
查看topic信息
Operations列表参数说明:
Generate Partition Assignments:
对多个topic来进行分区重分布计划的生成,知道就好,不推荐使用。
Run Partition Assignments:
对多个topic分区执行重分布计划,知道就好,不推荐使用。
Add Partitions:
对多个topic来进行添加分区操作,生成环境并不建议大家这样搞,避免误操作。
Topics列表参数说明:
Topic:
表示topic名称。
__consumer_offsets是kafka内置保存consumer offset的topic.
Partitions:
topic对应的分区数
Brokers:
topic对应的broker数
Brokers Spread%:
指broker的分布比例,计算公式:用topic的分区分布的broker数/总的broker数量。
Brokers Skew %:
Brokers的偏斜比例,计算公式:(Broker最多的分区数/Brokers最少的分区数) -1
Brokers Leader Skew%:
Brokers的leader偏斜比例,计算公式:(Broker最多的leader分区数/Brokers最少leader的分区数) -1
Replicas:
副本因子,即副本数。
Under Replicated %:
副本不足的比例,计算公式:副本不足的分区数/总分区数
Producer Message/Sec:
每秒产生的消息数量。
Summed Recent Offsets:
当前总计的消费偏移量。
-
查看已经消费者的详细信息
五、常见问题:
- OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
原因:内存不足,查看启动配置
解决:
# 查看内存使用情况
free -m
WARN [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
原因:
配置的ip与使用的不致,如果是localhost都用localhost,如查是ip需都使用ip地址。[2019-09-27 17:30:32,359] WARN [Consumer clientId=consumer-1, groupId=console-consumer-99211] Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解决:
# listeners = PLAINTEXT://your.host.name:9092
# 去掉注释,将各主机本地ip替换“your.host.name”,重启后恢复正常。
- Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/),
注释以下内容:
#kafka-manager.zkhosts=${?ZK_HOSTS}
- This application is already running (Or delete /usr/local/kafka-manager/kafka-manager-2.0.0.2/RUNNING_PID file).
停止
删除文件:
[root@localhost kafka-manager-2.0.0.2]# rm RUNNING_PID