Kafka 入门1:系统架构、基本概念以及伪集群搭建方法

一、Kafka 是什么?

Apache Kafka 本质上是一种消息中间件,用来可靠传递消息事件,用来管理消息队列(Message Queue),具有如下特点:

  • 分布式的,支持在线水平扩展;
  • 高吞吐、高性能:kafka 具有高的吞吐量,内部采用消息的批量处理zero-copy 机制,数据的存储和获取是本地磁盘顺序批量操作,具有 O(1)的复杂度,消息处理的效率很高;
  • 分布式发布/订阅(生产/消费);
  • 依赖于 Zookeeper 保存 meta 信息,以保证系统HA;
  • Kafka的动态扩容是通过Zookeeper来实现的;
  • 起源于Linkedin,使用 Scala 语言编写。


    消息队列--发布/订阅

二、Kafka 集群架构

一个 Kafka 系统架构包括如下几个部分:

  • Producer:向 broker 发布消息(push 方式)的 client;
  • Consumer:从 broker 消费(之前订阅的)消息(pull 方式)的 client;
  • Consumer Group:消费组(同类消费者进行归类),一个 Consumer 只能属于某一个组;
  • Topic:主题,对同类消息进行归类,一个 Topic 可有多个 Consumer;
  • Partition:分区,用来存储消息,一个 Topic 可包含多个分区;
  • Broker:即 Kafka server(扮演两种角色:leader、follower),也就是常说的 Kafka 节点,可以通过增加 server 对集群进行横向扩展;
  • Zookeeper 集群:管理 Kafka 的 meta 数据(比如partition offset、消费者生产者的状态),当 Consumer Group 发生变化时 rebalance。
Kafka 集群架构图

为便于理解,可以将 Kafka 系统类比为一个工厂仓库:

kafka 系统 工厂仓库
消息 加工件
broker 库管员
topic 货架(同类加工件放在同一个货架)
partition 货架上的某一层
producer 上一级工序的产出
consumer 下一级工序的输入

2.1 Partition 内部结构

Producer 向 broker push消息时,会将该消息写入到某topic的某partition下的某 LogSegment 文件中。每个 consumer 会保留它读取到某个 partition 的 offset,而 consumer 是通过zookeeper来保留offset的。
上面提到,一个broker 可以管理多个 Topic,一个 Topic 也可能包含多个 Partition。每个 Partition都是一个有序的、不可变的结构化的提交日志记录的序列

topic 中的 partition

一个 Partition 会映射到一个逻辑 log 文件,一个 partition 又包含多个 LogSegment(每个segment大小一样),一个 LogSegment 中又包含多条message 记录,LogSegment 中使用唯一标识 offset(64位Long型)来唯一标识某条 message。

Partition 的 log 文件内部结构

LogSegment文件由两部分组成,分别为:

  • .index 文件: segment 索引文件;
  • .log 文件:数据文件。

这两个文件的命令规则为:partition 全局的第一个 segment 从0开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。

需要注意的是:

  1. 消息在Kafka中的保存时间是有有效期的,一旦超过有效期即被丢弃;
  2. partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

2.2 分区备份和负载均衡

为保证容错性和 HA,Kafka 集群会将某个topic下的某partition 同时备份在多个 broker中。至于备份多少份、备份在哪些 broker上是可以配置的。


分区备份

在一个kafka集群中,每个broker 通常会扮演两个角色:

  1. 在一个 partition 中扮演 leader(broker 的 leader 选举由 Zookeeper 帮助完成);
  2. 在其它的 partition 中扮演 followers。

Leader是最繁忙的,要处理读写请求。这样将leader均分到不同的broker上,目的自然是要确保负载均衡


三、Mac 上安装配置 Kafka 伪集群

Kafka 集群的概念是指由多台机器组成的集群中每台机器上各运行着一个Kafka-server 进程(即broker 进程),伪集群即指在一台机器上运行着多个Kafka-server 进程,是对集群的一种模拟。

3.1 搭建 Kafka 伪集群

1. 首先,确保系统安装了JDK、Scala 以及 Zookeeper

  • Kafka 集群运作依赖于Zookeeper;
  • Kafka 使用 Scala编写。

2. 其次,下载解压 kafka 安装包

  • Kafka 官网下载对应版本的Kafka安装包,比如笔者下载的是 kafka_2.11-2.4.0,其中2.11 是scala版本;
  • 解压安装包:
tar -zxvf ~/Downloads/kafka_2.11-2.4.0 -C ~/software-package-install/kafka_install/

解压缩后可以看到如下目录:

-rw-r--r--@   1 ycaha  1699762527  32216 Dec 10 00:46 LICENSE
-rw-r--r--@   1 ycaha  1699762527    337 Dec 10 00:46 NOTICE
drwxr-xr-x@  35 ycaha  1699762527   1120 Jan 14 11:35 bin/
drwxr-xr-x@  19 ycaha  1699762527    608 Jan 14 13:31 config/
drwxr-xr-x@ 104 ycaha  1699762527   3328 Dec 10 00:51 libs/
drwxr-xr-x   35 ycaha  1699762527   1120 Jan 14 13:08 logs/
drwxr-xr-x@   3 ycaha  1699762527     96 Dec 10 00:51 site-docs/

其中:

  • config 目录:存放各种配置文件;
  • bin 目录:存放各种命令文件,比如启动停止集群、创建查看 topic 等。

3. 最后,添加修改配置项
要搭建集群模式,有两个配置项 broker.id 和 listeners 是必须要进行修改的:

  • broker.id: kafka-server 的 id,每个 kafka-server 进程对应的 id 都是唯一的;
  • listener:监听,PLAINTEXT://ip:port

如果是伪集群(单机)模式,由于 ip 是一致的,因此需要用不同 port 来区分不同的 kafka-server 进程。此外还有一个配置项 log.dirs 表示服务器的 log 文件的存放路径,由于是单机,因此需要为不同的 kafka-server 进程设置不同的 log.dirs。

${kafka_home}/config/server.properties 文件的配置项:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# The address the socket server listens on. 
listeners=PLAINTEXT://localhost:9092
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

复制 server.properties 为 server-1.properties 和 server-2.properties:

cp  server.properties  server-1.properties 
cp  server.properties  server-2.properties 

${kafka_home}/config/server-1.properties 文件的配置项:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The address the socket server listens on. 
listeners=PLAINTEXT://localhost:9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs1

${kafka_home}/config/server-2.properties 文件的配置项:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The address the socket server listens on. 
listeners=PLAINTEXT://localhost:9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs2

至此 伪集群搭建完成,接下来可以启动3个(因为3个不同的配置文件)不同的 kafka-server 进程。


四、常见的 Kafka操作(shell和java API)

4.1 Kafka shell 操作

Kafka 提供了若干个脚本来完成集群的启动关闭、topic 的创建等,这些脚本就在目录 ${kafka_home}/bin 中。

1. 启动关闭 Kafka 集群
启动 Kafka 之前必须先启动 Zookeeper 集群。

# 分别启动3个 kafka-server 进程
cd ${kafka_home}/bin
sh kafka-server-start.sh ../config/server.properties &
sh kafka-server-start.sh ../config/server-1.properties &
sh kafka-server-start.sh ../config/server-2.properties &

# 查看启动的3个 kafka-server 进程
ps -ef | grep kafka

# 关闭 kafka-server 进程
sh kafka-server-stop.sh 

2. 创建查看 topic

# 创建名为  test05 的 topic
sh kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic test05
# 列出所有的 topic
sh kafka-topics.sh --list --zookeeper localhost:2181
# 获取所有 topic 的描述
sh kafka-topics.sh --describe --zookeeper localhost:2181
# 获取 topic test06 的描述
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test06
# 删除 topic test05
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test05

更多的命令解释可以通过在 shell 中使用--help命令来获取,比如关于 topic 方面的:

sh kafka-topics.sh --help


五、测试 producer 产生事件 & consumer 消费事件

1. 首先打开一个终端shell,启动 producer console

sh kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test1

进入 producer 控制台,push 事件到 broker 的 topic test1 中:

>hello world
>this is kafka
>test1

2. 然后新开一个终端shell,启动 consumer console

sh kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --topic test1

进入 consumer 控制台,发现 consumer 自动从 broker 的 topic test1 中pull 了事件:

hello world
this is kafka
test1


六、Java API 操作 Kafka

  • 模拟 Kafka producer 发布产生事件和 consumer 订阅消费事件;
  • 自定义Partitioner;

maven pom.xml

<properties>
  <kafka.version>2.4.0</kafka.version>
</properties> 
<dependency>
  <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.11</artifactId>
   <version>${kafka.version}</version>
</dependency>

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class MyKafkaPartitioner extends AbstractPartitionAssignor {

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic =
                consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet()) {
            assignment.put(memberId, new ArrayList<>());
        }

        // 针对每一个topic进行分区分配
        for (Map.Entry<String, List<String>> topicEntry :
                consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<String> consumersForTopic = topicEntry.getValue();
            int consumerSize = consumersForTopic.size();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null) {
                continue;
            }

            // 当前topic下的所有分区
            List<TopicPartition> partitions =
                    AbstractPartitionAssignor.partitions(topic,
                            numPartitionsForTopic);
            // 将每个分区随机分配给一个消费者
            for (TopicPartition partition : partitions) {
                int rand = new Random().nextInt(consumerSize);
                String randomConsumer = consumersForTopic.get(rand);
                assignment.get(randomConsumer).add(partition);
            }
        }
        return assignment;
//        return null;
    }

    // 获取每个topic所对应的消费者列表,即:[topic, List[consumer]]
    private Map<String, List<String>> consumersPerTopic(
            Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry :
                consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics())
                put(res, topic, consumerId);
        }
        return res;
    }

    @Override
    public String name() {
//        return null;
        return "MyKafkaPartitioner";
    }
}




import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;

public class Main {
    public static void main(String[] args) {
        initProducer();
        initConsumer();

    }

    private static void initProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 2);
        props.put("linger.ms", 1);
//        props.put("partitioner.class", "com.example.demo.MyPartitioner");
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"com.saicmotor.kafka.MyKafkaPartitioner");
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("powerTopic", Integer.toString(i), Integer.toString(i)));
        }

        producer.close();
    }


    private static void initConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("powerTopic", "topic1"), new ConsumerRebalanceListener() {
            // kafka在有新消费者加入或者撤出时,会触发rebalance操作,在subscribe订阅主题的时候,我们可以编写回掉函数,在触发rebalance操作之前和之后,提交相应偏移量和获取偏移量
            // 注意enable.auto.commit为false,防止自动提交偏移量
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            //在rebalance操作之前调用,用于我们提交消费者偏移

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //onPartitionAssigned会在rebalance操作之后调用,用于我们接取新的分配区的偏移量
                Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection);
                for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()) {
                    consumer.seekToBeginning(collection);
                }
            }

        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition:" + record.partition() + ",key:" + record.key() + ",value:" + record.value());
                consumer.commitAsync();
            }
        }
    }
    
}


七、常问问题

7.1 Kafka 针对 Topic 为什么要进行分区?日志为什么要分 segment?

https://www.zhihu.com/question/28925721

7.2 Kafka 可靠性如何保证?

replication 副本;


参考:
https://cloud.tencent.com/developer/article/1446017

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容