ubuntu20.04 kafka+zookeeper环境搭建

kafka 部署需要使用zookeeper来管理broker节点. kafka 2.8版本以后,自带zookeeper. 本文记录kafka+独立zookeeper以及和自带zookeeper的两种部署方式,测试demo使用golang版本的简单通信demo.

kafka zookeeper下载

本例选择的最新的版本的kafaka2.8以及zookeeper3.7

kafka 2.8.0

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz

zookeeper 3.7.0

https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz


# 解压
tar -zxvf kafka_2.12-2.8.0.tgz
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
ls
renevy@redux-ThinkPad:~/workspace/redux/kafka$ ls
apache-zookeeper-3.7.0-bin         kafka_2.12-2.8.0
apache-zookeeper-3.7.0-bin.tar.gz  kafka_2.12-2.8.0.tgz

jdk1.8 配置

#添加环境变量
vi /etc/profile
#java 
export JAVA_HOME=/opt/software/jdk1.8.0_161
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:${JRE_HOME}/lib
export PATH=$PATH:${JAVA_HOME}/bin:${JRE_HOME}/bin
#环境变量生效
source /etc/profile

kafka + 独立zookeeper

zookeeper配置&启动

cd apache-zookeeper-3.7.0-bin/
renevy@redux-ThinkPad:~/workspace/redux/kafka/apache-zookeeper-3.7.0-bin$ ls
bin  conf  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.md
# 重命名配置文件
renevy@redux-ThinkPad:~/workspace/redux/kafka/apache-zookeeper-3.7.0-bin$ mv conf/zoo_sample.cfg  conf/zoo.cfg 
# 修改配置文件
vi  conf/zoo.cfg 
dataDir=~/workspace/redux/kafka/zoodata
# 启动zookeeper
~/workspace/redux/kafka/apache-zookeeper-3.7.0-bin$ bin/zkServer.sh start
# 查看zookeeper 启动状态
renevy@redux-ThinkPad:~/workspace/redux/kafka/apache-zookeeper-3.7.0-bin$ jps
16795 Jps
16588 QuorumPeerMain
# 查看zookeeper 状态
renevy@redux-ThinkPad:~/workspace/redux/kafka/apache-zookeeper-3.7.0-bin$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/renevy/workspace/redux/kafka/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
# Mode: standalone 表示当前zookeeper处于单机模式

kafka配置&启动

cd kafka_2.12-2.8.0/
renevy@redux-ThinkPad:~/workspace/redux/kafka/kafka_2.12-2.8.0$ ls
bin  config  libs  LICENSE  licenses  NOTICE  site-docs
# 修改配置文件
renevy@redux-ThinkPad:~/workspace/redux/kafka/kafka_2.12-2.8.0$ gedit config/server.properties
# 将以下内容从注释中打开
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/home/renevy/workspace/redux/kafka/kafkalogs
# 启动 kafka (-daemon后台启动)
renevy@redux-ThinkPad:~/workspace/redux/kafka/kafka_2.12-2.8.0$ bin/kafka-server-start.sh -daemon config/server.properties
# 查看kafka启动状态
renevy@redux-ThinkPad:~/workspace/redux/kafka/kafka_2.12-2.8.0$ jps
20992 Kafka
21090 Jps
16588 QuorumPeerMain
#  查看kafka在zookeeper中的节点信息
cd apache-zookeeper-3.7.0-bin/
bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 1]  ls /brokers/ids
[0]
# 由上可以看到当前kafka的节点信息.

golang kafka 通信demo1

第一个demo选择的是golang的sarama作为client的通信
https://github.com/Shopify/sarama
producer

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)
func main() {
    fmt.Println("producer_test\n")
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.V0_11_0_2

    producer, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Printf("producer_test create producer error :%s\n", err.Error())
        return
    }
    defer producer.AsyncClose()
    // send message
    msg := &sarama.ProducerMessage{
        Topic: "kafka_go_test",
        Key:   sarama.StringEncoder("go_test"),
    }
    value := "this is message"
    for {
        fmt.Scanln(&value)
        msg.Value = sarama.ByteEncoder(value)
        fmt.Printf("send [%s]\n", value)
        // send to chain
        producer.Input() <- msg
        select {
        case suc := <-producer.Successes():
            fmt.Printf("%s offset: %d \n", msg.Timestamp.String(), suc.Offset)
        case fail := <-producer.Errors():
            fmt.Println("err: %s\n", fail.Err.Error())
        }
    }
}

consumer

package main
import (
    "fmt"
    "github.com/Shopify/sarama"
)
func main() {
    fmt.Printf("consumer_test \n")
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Version = sarama.V0_11_0_2
    // consumer
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        fmt.Printf("consumer_test create consumer error %s\n", err.Error())
        return
    }
    defer consumer.Close()
    partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
    if err != nil {
        fmt.Printf("try create partition_consumer error %s\n", err.Error())
        return
    }
    defer partition_consumer.Close()
    for {
        select {
        case msg := <-partition_consumer.Messages():
            fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
                msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
        case err := <-partition_consumer.Errors():
            fmt.Printf("err :%s\n", err.Error())
        }
    }

}

console log

# producer
renevy@redux-ThinkPad:~/workspace/redux/golearn/kafka-go/kafkaDemo$ go run producter/producter.go 
producer_test
vivi
send [vivi]
0001-01-01 00:00:00 +0000 UTC offset: 2 
# consumer
renevy@redux-ThinkPad:~/workspace/redux/golearn/kafka-go/kafkaDemo$ go run consumer/consumer.go 
consumer_test 
msg offset: 2, partition: 0, timestamp: 2021-08-21 11:40:22.906 +0800 CST, value: vivi

kafka + 自带zookeeper

zookeeper配置&启动

cd kafka_2.12-2.8.0/
#修改配置文件
gedit config/zookeeper.properties
dataDir=~/workspace/redux/kafka/zoodata
admin.serverPort=8088
# 启动
renevy@redux-ThinkPad:~/workspace/redux/kafka/kafka_2.12-2.8.0$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 查看状态
renevy@redux-ThinkPad:~/workspace/redux/kafka/kafka_2.12-2.8.0$ jps
4432 QuorumPeerMain
4474 Jps

kafka 启动

bin/kafka-server-start.sh  config/server.properties

报错 cluster id重复:

[2021-08-21 12:26:41,092] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID GcY08Ud7SFmKFlgfq8Ymhg doesn't match stored clusterId Some(6ffH--rpSBS35gEHm0lAdw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
    at kafka.server.KafkaServer.startup(KafkaServer.scala:218)
    at kafka.Kafka$.main(Kafka.scala:109)
    at kafka.Kafka.main(Kafka.scala)
[2021-08-21 12:26:41,094] INFO shutting down (kafka.server.KafkaServer)
[2021-08-21 12:26:41,098] INFO [feature-zk-node-event-process-thread]: Shutting down (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)

解决办法:

  1. 修改 cluster id,因为之前创建的kafka+zookeeper已经有了cluster id,现在需要替换成新的.
  2. 重新修改kafka log路径.
#将 cluster id改为GcY08Ud7SFmKFlgfq8Ymhg
gedit  /home/renevy/workspace/redux/kafka/kafkalogs/meta.properties

查看kafka运行状态

renevy@redux-ThinkPad:~/workspace/redux/kafka/kafka_2.12-2.8.0$ jps
4432 QuorumPeerMain
6278 Jps
5818 Kafka

golang kafka 通信demo2

第二个demo使用confluent-kafka-go
https://github.com/confluentinc/confluent-kafka-go.git
producer

package main

import (
    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }

    defer p.Close()

    // Delivery report handler for produced messages
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := "myTopic"
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    // Wait for message deliveries before shutting down
    p.Flush(15 * 1000)
}

consumer

package main

import (
    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{"myTopic"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

    c.Close()
}

console log
producer

renevy@redux-ThinkPad:~/workspace/redux/golearn/kafka-go/kafkaDemo2$ go run producer/producer.go 
Delivered message to myTopic[0]@35
Delivered message to myTopic[0]@36
Delivered message to myTopic[0]@37
Delivered message to myTopic[0]@38
Delivered message to myTopic[0]@39
Delivered message to myTopic[0]@40
Delivered message to myTopic[0]@41

consumer

renevy@redux-ThinkPad:~/workspace/redux/golearn/kafka-go/kafkaDemo2$ go run consumer/consumer.go 

Message on myTopic[0]@28: Welcome
Message on myTopic[0]@29: to
Message on myTopic[0]@30: the
Message on myTopic[0]@31: Confluent
Message on myTopic[0]@32: Kafka
Message on myTopic[0]@33: Golang
Message on myTopic[0]@34: client
Message on myTopic[0]@35: Welcome
Message on myTopic[0]@36: to
Message on myTopic[0]@37: the
Message on myTopic[0]@38: Confluent
Message on myTopic[0]@39: Kafka
Message on myTopic[0]@40: Golang
Message on myTopic[0]@41: client
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容