消息中间件--Kafka(三)

一、Kafka简介

  Apache Kafka最初由LinkedIn开发,并在2011年初开源。Kafka是一个分布式消息队列。★Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

Kafka 和传统的消息系统不同在于:
1.Kafka是一个分布式系统,易于向外扩展。
2.它同时为发布和订阅提供高吞吐量。
3.它支持多订阅者,当失败时能自动平衡消费者。
4.消息的持久化。

  1. 消息传输流程:

    消息传输流程

      Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。
      Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息
      Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。
      从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。

  2. kafka服务器消息存储策略:

    kafka服务器消息存储策略

      谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
    分区

     在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。

  3. 与生产者的交互:


    与生产者的交互

     生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中
     也可以通过指定均衡策略来将消息发送到不同的分区中
     如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中

  4. 与消费者的交互

    与消费者的交互

    在消费者消费消息时,kafka使用offset来记录当前消费的位置
     在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。
     对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费
     因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。
    官网:http://kafka.apache.org/

二、 Kafka下载安装

前置条件:安装scala。

  1. 下载
    http://kafka.apache.org/downloads
    下载
# 下载
[root@localhost ~]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

# 解压
[root@localhost ~]# tar -zxvf kafka_2.12-2.3.0.tgz -C /usr/local

  1. 配置
# 修改配置文件
[root@localhost ~]# cd /usr/local/kafka_2.12-2.3.0/
# 去掉注释,添加主机名,一定不要写IP地址
listeners=PLAINTEXT://localhost:9092

配置文件

zookeeper.properties 是zookeeper的配置文件,默认端口号2181,可不做修改
server.properties 是kafka配置文件,将 zookeeper.connect 这行 改为自己的zookeeper地址和端口号

# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.77.132:9092
#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

三、 启动测试

  1. 启动
# 启动Zookeeper server(自带)--端口为:2181
[root@localhost kafka_2.12-2.3.0]# ./bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka server--端口为:9092 
[root@localhost kafka_2.12-2.3.0]# ./bin/kafka-server-start.sh config/server.properties &  

# 停止Kafka server
[root@localhost kafka_2.12-2.3.0]# ./bin/kafka-server-stop.sh  
# 停止Zookeeper server
[root@localhost kafka_2.12-2.3.0]# ./bin/zookeeper-server-stop.sh 

  1. 单机连通性测试
    启动2个客户端,一个用于生产者发送消息,一个用于消费者接受消息。
# 运行producer
[root@localhost kafka_2.12-2.3.0]#  ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

# 运行consumer
[root@localhost kafka_2.12-2.3.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

生产端发送消息:


生产端发送消息
消费端接收消息

注意:
producer,指定的Socket(localhost+9092),说明生产者的消息要发往kafka,也即是broker
consumer, 指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)

四、Kafka的web 监控平台

  为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,有如下功能:
1.管理多个kafka集群
2.便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况)
3.选择你要运行的副本
4.基于当前分区状况进行
5.可以选择topic配置并创建topic(0.8.1.1和0.8.2的配置不同)
6.删除topic(只支持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true)
7.Topic list会指明哪些topic被删除(在0.8.2以上版本适用)
8.为已存在的topic增加分区
9.为已存在的topic更新配置
10.在多个topic上批量重分区
11.在多个topic上批量重分区(可选partition broker位置)
kafka-manager 项目地址:https://github.com/yahoo/kafka-manager

  1. 下载安装
# 下载
[root@localhost ~]# wget https://github.com/yahoo/kafka-manager/archive/2.0.0.2.tar.gz
# 解压
[root@localhost ~]# tar -zxvf 2.0.0.2.tar.gz -C /usr/local
# 安装sbt
[root@localhost kafka-manager-2.0.0.2]# curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
[root@localhost kafka-manager-2.0.0.2]# mv bintray-sbt-rpm.repo /etc/yum.repos.d/
[root@localhost kafka-manager-2.0.0.2]# yum install sbt
# 验证:检查sbt是否安装成功,查看命令输出
[root@localhost kafka-manager-2.0.0.2]# sbt -version
[info] [launcher] getting org.scala-sbt sbt 1.2.8  (this may take some time)...
# 安装完成后,一般你电脑的主目录/home/your_user_name/下一般都会有.sbt这个文件夹
[root@localhost ~]# ls -a 
.sbt
# 在根目录下新建文件repositories,并编辑如下内容
# 修改仓库地址:(sbt 默认下载库文件很慢),我们可以在用户目录下创建 touch ~/.sbt/repositories, 填上 阿里云的镜像   # vi ~/.sbt/repositories  
[root@localhost kafka-manager-2.0.0.2]# touch ~/.sbt/repositories
[root@localhost kafka-manager-2.0.0.2]#  vi ~/.sbt/repositories  
# 内容
[repositories]
#local
public: http://maven.aliyun.com/nexus/content/groups/public/
typesafe:http://dl.bintray.com/typesafe/ivy-releases/ , [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
ivy-sbt-plugin:http://dl.bintray.com/sbt/sbt-plugin-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]
sonatype-oss-releases

sonatype-oss-snapshots

# 进入kafka-manager的解压目录,编译kafka-manager,很慢(主要是下载依赖的jar包)
[root@localhost kafka-manager-2.0.0.2]# ./sbt clean dist
Getting org.scala-sbt sbt 1.2.8  (this may take some time)...

# 编译完成后, kafka-manager的解压目录下就会多了一个target文件夹
[root@localhost kafka-manager-2.0.0.2]# ls target
scala-2.12  streams  universal  web
[root@localhost kafka-manager-2.0.0.2]# ls target/universal/
kafka-manager-2.0.0.2.zip  scripts
# 安装unzip
[root@localhost kafka-manager-2.0.0.2]# yum install unzip

# 解压编译好的universal目录中的kafka-manager-2.0.0.2.zip
[root@localhost kafka-manager-2.0.0.2]# mkdir /usr/local/kafka-manager
[root@localhost kafka-manager-2.0.0.2]# unzip target/universal/kafka-manager-2.0.0.2.zip -d /usr/local/kafka-manager

# 修改配置 conf/application.properties
[root@localhost kafka-manager-2.0.0.2]# vi /usr/local/kafka-manager/kafka-manager-2.0.0.2/conf/application.conf 
# 内容
#kafka-manager.zkhosts="localhost:2181"       ##注释这一行,下面添加一行
kafka-manager.zkhosts="192.168.77.132:2181"
# 启动
[root@localhost kafka-manager-2.0.0.2]#  ./bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080
# 后台启动
[root@localhost kafka-manager-2.0.0.2]#  nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &

启动较慢需耐心等待。
http://192.168.77.132:8080/

启动测试

  1. 创建集群
    点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面:


    创建集群

    创建集群
  2. 查看broker信息


    查看broker信息
  1. 创建topic
    点击【Topic】>【Create】可以方便的创建并配置主题。


    创建topic

    创建topic
  1. 查看topic信息


    查看topic信息
Operations列表参数说明:
Generate Partition Assignments:
  对多个topic来进行分区重分布计划的生成,知道就好,不推荐使用。
Run Partition Assignments:
  对多个topic分区执行重分布计划,知道就好,不推荐使用。
Add Partitions:
  对多个topic来进行添加分区操作,生成环境并不建议大家这样搞,避免误操作。
Topics列表参数说明:
Topic:
    表示topic名称。
    __consumer_offsets是kafka内置保存consumer offset的topic.
Partitions:
    topic对应的分区数
Brokers:
    topic对应的broker数
Brokers Spread%:
    指broker的分布比例,计算公式:用topic的分区分布的broker数/总的broker数量。
Brokers Skew %:
    Brokers的偏斜比例,计算公式:(Broker最多的分区数/Brokers最少的分区数) -1 
Brokers Leader Skew%:
    Brokers的leader偏斜比例,计算公式:(Broker最多的leader分区数/Brokers最少leader的分区数) -1
Replicas:
    副本因子,即副本数。
Under Replicated %:
    副本不足的比例,计算公式:副本不足的分区数/总分区数
Producer Message/Sec:
    每秒产生的消息数量。
Summed Recent Offsets:
    当前总计的消费偏移量。
  1. 查看已经消费者的详细信息


    查看已经消费者的详细信息

    已经消费者的详细信息

五、常见问题:

  1. OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
    原因:内存不足,查看启动配置
    解决:
# 查看内存使用情况
free -m
  1. WARN [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
    原因:
    配置的ip与使用的不致,如果是localhost都用localhost,如查是ip需都使用ip地址。

  2. [2019-09-27 17:30:32,359] WARN [Consumer clientId=consumer-1, groupId=console-consumer-99211] Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    解决:

#     listeners = PLAINTEXT://your.host.name:9092
#  去掉注释,将各主机本地ip替换“your.host.name”,重启后恢复正常。
  1. Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/),
    注释以下内容:
 #kafka-manager.zkhosts=${?ZK_HOSTS}
  1. This application is already running (Or delete /usr/local/kafka-manager/kafka-manager-2.0.0.2/RUNNING_PID file).
    停止
    删除文件:
[root@localhost kafka-manager-2.0.0.2]#  rm RUNNING_PID

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容