docker-compose搭建Zookeeper集群以及Kafka集群

一、安装docker-compose

这里不使用官方链接进行安装,因为会很慢

https://github.com/docker/compose/releases

可以前往官网查看目前最新版,然后下面自行更换


curl -L https://get.daocloud.io/docker/compose/releases/download/1.25.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

sudo chmod +x /usr/local/bin/docker-compose

#验证是否安装成功

docker-compose --version

准备工作:


#创建两个文件夹分别存放docker-compose.yml文件,方便管理

cd /usr/local

mkdir docker

cd docker

mkdir zookeper

mkdir kafka

因为考虑到有时候只需要启动zookeeper而并不需要启动kafka,例如:使用Dubbo,SpringCloud的时候利用Zookeeper当注册中心。所以本次安装分成两个docker-compose.yml来安装和启动

二、搭建zookeeper集群


cd /usr/local/docker/zookeeper

vim docker-compose.yml

docker-compose.yml文件内容


version: '3.3'

services:

  zoo1:

    image: zookeeper

    restart: always

    hostname: zoo1

    ports:

      - 2181:2181

    environment:

      ZOO_MY_ID: 1

      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:

    image: zookeeper

    restart: always

    hostname: zoo2

    ports:

      - 2182:2181

    environment:

      ZOO_MY_ID: 2

      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:

    image: zookeeper

    restart: always

    hostname: zoo3

    ports:

      - 2183:2181

    environment:

      ZOO_MY_ID: 3

      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

参考:

DockerHub zookeeper链接:https://hub.docker.com/_/zookeeper

接着:


# :wq 保存退出之后

cd /usr/local/docker/zookeeper #确保docker-compose.yml在当前目录,且自己目前也在当前目录

docker-compose up -d

#等待安装和启动

docker ps #查看容器状态

#参考命令

docker-compose ps #查看集群容器状态

docker-compose stop #停止集群容器

docker-compose restart #重启集群容器

在这里插入图片描述

如图代表zookeeper已经安装以及启动成功,可自行使用端口扫描工具扫描,等待kafka安装成功以后集中测试

三、Kafka集群搭建

确保已经搭建完成zookeeper环境


cd /usr/local/docker/kafka

vim dokcer-compose.yml

docker-compose.yml内容:


version: '2'

services:

  kafka1:

    image: wurstmeister/kafka

    ports:

      - "9092:9092"

    environment:

      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.1                    ## 修改:宿主机IP

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.1:9092    ## 修改:宿主机IP

      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.1:2181, 192.168.0.1:2182, 192.168.0.1:2183 #刚刚安装的zookeeper宿主机IP以及端口

      KAFKA_ADVERTISED_PORT: 9092

    container_name: kafka1

  kafka2:

    image: wurstmeister/kafka

    ports:

      - "9093:9092"

    environment:

      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.1                  ## 修改:宿主机IP

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.1:9093        ## 修改:宿主机IP

      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.1:2181, 192.168.0.1:2182, 192.168.0.1:2183 #刚刚安装的zookeeper宿主机IP以及端口

      KAFKA_ADVERTISED_PORT: 9093

    container_name: kafka2

  kafka-manager:

    image: sheepkiller/kafka-manager              ## 镜像:开源的web管理kafka集群的界面

    environment:

        ZK_HOSTS: 192.168.0.1                ## 修改:宿主机IP

    ports:

      - "9000:9000"                              ## 暴露端口

kafka-manager可以自行选择是否安装,不需要安装去除即可

接着:


# :wq 保存退出之后

cd /usr/local/docker/kafka#确保docker-compose.yml在当前目录,且自己目前也在当前目录

docker-compose up -d

#等待安装和启动

docker ps #查看容器状态

#参考命令

docker-compose ps #查看集群容器状态

docker-compose stop #停止集群容器

docker-compose restart #重启集群容器

在这里插入图片描述

如图代表kafka已经安装以及启动成功,接下来进行测试

四、使用Java代码进行测试

(1)引入pom.xml依赖


        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.12</artifactId>

            <version>2.1.1</version>

        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>fastjson</artifactId>

            <version>1.2.62</version>

        </dependency>

        <dependency>

            <groupId>org.projectlombok</groupId>

            <artifactId>lombok</artifactId>

            <version>1.18.10</version>

        </dependency>

(2)创建pojo对象以及Consumer和Producter

User对象


import lombok.Data;

@Data

public class User {

    private String id;

    private String name;

}

Producter


import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import com.alibaba.fastjson.JSON;

public class CollectKafkaProducer {

    // 创建一个kafka生产者

    private final KafkaProducer<String, String> producer;

    // 定义一个成员变量为topic

    private final String topic;

    // 初始化kafka的配置文件和实例:Properties & KafkaProducer

    public CollectKafkaProducer(String topic) {

        Properties props = new Properties();

        // 配置broker地址

        props.put("bootstrap.servers", "192.168.0.1:9092");

        // 定义一个 client.id

        props.put("client.id", "demo-producer-test");

        // 其他配置项:

//  props.put("batch.size", 16384); //16KB -> 满足16KB发送批量消息

//  props.put("linger.ms", 10); //10ms -> 满足10ms时间间隔发送批量消息

//  props.put("buffer.memory", 33554432);   //32M -> 缓存提性能

        // kafka 序列化配置:

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建 KafkaProducer 与 接收 topic

        this.producer = new KafkaProducer<>(props);

        this.topic = topic;

    }

    // 发送消息 (同步或者异步)

    public void send(Object message, boolean syncSend) throws InterruptedException {

        try {

            // 同步发送

            if(syncSend) {

                producer.send(new ProducerRecord<>(topic, JSON.toJSONString(message)));

            }

            // 异步发送(callback实现回调监听)

            else {

                producer.send(new ProducerRecord<>(topic,

                                JSON.toJSONString(message)),

                        new Callback() {

                            @Override

                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                                if (e != null) {

                                    System.err.println("Unable to write to Kafka in CollectKafkaProducer [" + topic + "] exception: " + e);

                                }

                            }

                        });

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    // 关闭producer

    public void close() {

        producer.close();

    }

    // 测试函数

    public static void main(String[] args) throws InterruptedException {

        String topic = "topic1";

        CollectKafkaProducer collectKafkaProducer = new CollectKafkaProducer(topic);

        for(int i = 0 ; i < 10; i ++) {

            User user = new User();

            user.setId(i+"");

            user.setName("张三");

            collectKafkaProducer.send(user, true);

        }

        Thread.sleep(Integer.MAX_VALUE);

    }

}

Consumer


import java.util.Collections;

import java.util.List;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.TopicPartition;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class CollectKafkaConsumer {

    // 定义消费者实例

    private final KafkaConsumer<String, String> consumer;

    // 定义消费主题

    private final String topic;

    // 消费者初始化

    public CollectKafkaConsumer(String topic) {

        Properties props = new Properties();

        // 消费者的zookeeper 地址配置

        props.put("zookeeper.connect", "192.168.0.1:2181");

        // 消费者的broker 地址配置

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");

        // 消费者组定义

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id");

        // 是否自动提交(auto commit,一般生产环境均设置为false,则为手工确认)

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        // 自动提交配置项

//  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // 消费进度(位置 offset)重要设置: latest,earliest

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 超时时间配置

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

        // kafka序列化配置

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建consumer对象 & 赋值topic

        consumer = new KafkaConsumer<>(props);

        this.topic = topic;

        // 订阅消费主题

        consumer.subscribe(Collections.singletonList(topic));

    }

    // 循环拉取消息并进行消费,手工ACK方式

    private void receive(KafkaConsumer<String, String> consumer) {

        while (true) {

            // 拉取结果集(拉取超时时间为1秒)

            ConsumerRecords<String, String> records = consumer.poll(1000);

            //  拉取结果集后获取具体消息的主题名称 分区位置 消息数量

            for (TopicPartition partition : records.partitions()) {

                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                String topic = partition.topic();

                int size = partitionRecords.size();

                log.info("获取topic:{},分区位置:{},消息数为:{}", topic, partition.partition(), size);

                // 分别对每个partition进行处理

                for (int i = 0; i< size; i++) {

                    System.err.println("-----> value: " + partitionRecords.get(i).value());

                    long offset = partitionRecords.get(i).offset() + 1;

                    // consumer.commitSync(); // 这种提交会自动获取partition 和 offset

                    // 这种是显示提交partition 和 offset 进度

                    consumer.commitSync(Collections.singletonMap(partition,

                            new OffsetAndMetadata(offset)));

                    log.info("同步成功, topic: {}, 提交的 offset: {} ", topic, offset);

                }

            }

        }

    }

    // 测试函数

    public static void main(String[] args) {

        String topic = "topic1";

        CollectKafkaConsumer collectKafkaConsumer = new CollectKafkaConsumer(topic);

        collectKafkaConsumer.receive(collectKafkaConsumer.consumer);

    }

}

先启动Producter,随后启动Consumer

在这里插入图片描述

成功消费消息,所有配置OK

注意:所有IP请结合自己实际使用,本人安装时是使用的服务器IP,故文章内统一改成了192.168.0.1

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351