kafka集群搭建与使用

安装前的环境准备

由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。

yum install java-1.8.0-openjdk* -y

kafka依赖zookeeper,所以需要先安装zookeeper

wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz

tar -zxvf zookeeper-3.4.12.tar.gz

cd zookeeper-3.4.12

cp conf/zoo_sample.cfg conf/zoo.cfg

启动zookeeper

bin/zkServer.sh start

bin/zkCli.sh

ls / #查看zk的根目录相关节点

[zk: localhost:2181(CONNECTED) 8] ls /
[dubbo, zookeeper, locks]
[zk: localhost:2181(CONNECTED) 9] 

第一步:下载安装包

下载1.1.0 release版本,并解压:

wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

第二步:启动服务

现在来启动kafka服务:
启动脚本语法:kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。(注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)
bin/kafka-server-start.sh -daemon config/server.properties
我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh
ls / #查看zk的根目录kafka相关节点
ls /brokers/ids #查看kafka节点

[zk: localhost:2181(CONNECTED) 3] ls /
[cluster, controller, brokers, zookeeper, dubbo, admin, isr_change_notification, log_dir_event_notification, locks, controller_epoch, consumers, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 4] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 5] 

第三步:创建主题

现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
现在我们可以通过以下命令来查看kafka中目前存在的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
除了我们通过手工的方式创建Topic,我们可以配置broker,当producer发布一个消息某个指定的Topic,但是这个Topic并不存在时,就自动创建。

[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]# 
[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]# 

[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics
[test]
[zk: localhost:2181(CONNECTED) 7] 

第四步:发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。
首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
>this is a msg
>this is a another msg 

第五步:消费消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #老版本
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test #新版本
如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。
以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。
还有一些其他命令如下:

查看组名
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer
查看消费者的消费偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
消费多主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test-2"

单播消费

一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可
分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test

多播消费
一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic tes

第六步:kafka集群配置

到目前为止,我们都是在一个单节点上运行broker,这并没有什么意思。对于kafka来说,一个单独的broker意味着kafka集群中只有一个接点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。
首先,我们需要建立好其他2个broker的配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

broker.id属性在kafka集群中必须要是唯一的。我们需要重新指定port和log目录,因为我们是在同一台机器上运行多个实例。如果不进行修改的话,consumer只能获取到一个instance实例的信息,或者是相互之间的数据会被影响。
目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:

bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

现在我们创建一个新的topic,备份因子设置为3:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

现在我们已经有了集群,并且创建了一个3个备份因子的topic,但是到底是哪一个broker在为这个topic提供服务呢(因为我们只有一个分区,所以肯定同时只有一个broker在处理这个topic)?

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
image.png

以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。因为目前我们只有一个partition,因此关于partition的信息只有一行。
leader节点负责给定partition的所有读写请求。
replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
isr 是replicas的一个子集,它只列出当前还存活着的,并且备份了该partition的节点。
现在我们的案例中,0号节点是leader,即使用server.properties启动的那个进程。
我们可以运行相同的命令查看之前创建的名称为”test“的topic

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 
image.png

之前设置了topic的partition数量为1,备份因子为1,因此显示就如上所示了。
现在我们向新建的topic中发送一些message:

 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test msg 1
>my test msg 2

现在开始消费:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2

现在我们来测试我们容错性,因为broker0目前是leader,所以我们要将其kill

ps -ef | grep server.properties
kill -9 1177
现在再执行命令:
bin/kafka-topics.sh --describe --zookeeper localhost:9092 --topic my-replicated-topic
image.png

我们可以看到,leader节点已经变成了broker 2.要注意的是,在Isr中,已经没有了0号节点。leader的选举也是从ISR(in-sync replica)中进行的。
此时,我们依然可以 消费新消息:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2

查看主题分区对应的leader信息:


image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容