kafka全面认知

什么是Kafka

Kafka是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量。

最早设计的目的是作为LinkedIn的活动流和运营数据的处理管道。这些数据主要是用来对用户做用户画像分析以及服务器性能数据的一些监控。

所以kafka一开始设计的目标就是作为一个分布式、高吞吐量的消息系统,所以适合运用在大数据传输场景。

Kafka的应用场景

由于kafka具有更好的吞吐量、内置分区、冗余及容错性的优点(kafka每秒可以处理几十万消息),让kafka成为了一个很好的大规模消息处理应用的解决方案。所以在企业级应用长,主要会应用于如下几个方面

  • 行为跟踪:kafka可以用于跟踪用户浏览页面、搜索及其他行为。通过发布-订阅模式实时记录到对应的topic中,通过后端大数据平台接入处理分析,并做更进一步的实时处理和监控

  • 日志收集:日志收集方面,有很多比较优秀的产品,比如Apache Flume,很多公司使用kafka代理日志聚合。日志聚合表示从服务器上收集日志文件,然后放到一个集中的平台(文件服务器)进行处理。在实际应用开发中,我们应用程序的log都会输出到本地的磁盘上,排查问题的话通过linux命令来搞定,如果应用程序组成了负载均衡集群,并且集群的机器有几十台以上,那么想通过日志快速定位到问题,就是很麻烦的事情了。所以一般都会做一个日志统一收集平台管理log日志用来快速查询重要应用的问题。所以很多公司的套路都是把应用日志集中到kafka上,然后分别导入到es和hdfs上,用来做实时检索分析和离线统计数据备份等。而另一方面,kafka本身又提供了很好的api来集成日志并且做日志收集。

    image

Kafka的架构

一个典型的kafka集群包含若干Producer(可以是应用节点产生的消息,也可以是通过Flume收集日志产生的事件),若干个Broker(kafka支持水平扩展)、若干个Consumer Group,以及一个zookeeper集群。kafka通过zookeeper管理集群配置及服务协同。Producer使用push模式将消息发布到broker,consumer通过监听使用pull模式从broker订阅并消费消息或者broker通过push模式将消息推向consumer。

多个broker协同工作,producer和consumer部署在各个业务逻辑中。三者通过zookeeper管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。

图上有一个细节是和其他mq中间件不同的点,producer 发送消息到broker的过程是push,而consumer从broker消费消息的过程是pull,主动去拉数据。而不是broker把数据主动发送给consumer。

image

consumer的pull模式和push模式的区别

push模式:broker将消息主动push给consumer。

  • 优点:1、broker能以最大速率发送消息给consumer;2、broker有消息时发送下游,没有消息则停止发送。2、以最大速率发送consumer也就代表着consumer的消息实时性高。
  • 缺点:1、无法感知consumer的消费状态和消费能力,容易导致producer压垮consumer。2、broker将消息推送到consumer后,如果要保证消息成功消费,需要consumer将消费状态再回传给broker。broker记录每条消息的消费状态显然很不切合实际。

pull模式:consumer主动pull拉去broker的消息。

  • 优点:consumer可以批量拉取,也可以单条拉取,也可以控制消费速度消费数量,保证comsumer不会出现饱和。
  • 缺点:consumer并不能感知broker是否有消息,所以会一直轮训拉取broker的消息。当broker中没有消息时,这样无脑的拉取无疑会造成consumer的资源浪费

名词解释

1)Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。
2)Producer
负责发布消息到Kafka broker
3)Consumer
消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据或者broker推送(push)消息给consumer并进行处理,俩种各有优缺点,上面章节已经介绍。
4)Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
5)Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
6)Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
7)Topic & Partition
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1)。

Java中使用kafka进行通信

依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.0.0</version>
</dependency>

发送端代码

public class Producer extends Thread{
      private final KafkaProducer<Integer,String> producer;
      private final String topic;
      public Producer(String topic) {
            Properties properties=new Properties();
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
            properties.put(ProducerConfig.CLIENT_ID_CONFIG,"practice-producer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        IntegerSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class.getName());
            producer=new KafkaProducer<Integer, String>(properties);
            this.topic = topic;

    }
    @Override
      public void run() {
            int num=0;
            while(num<50){
                  String msg="pratice test message:"+num;
                  try {
                        producer.send(new ProducerRecord<Integer, String>
                (topic,msg)).get();
                        TimeUnit.SECONDS.sleep(2);
                        num++;

            }
            catch (InterruptedException e) {
                        e.printStackTrace();

            }
            catch (ExecutionException e) {
                        e.printStackTrace();

            }              
        }        
    }
      public static void main(String[] args) {
            new Producer("test").start();        
    }
}

消费端代码

public class Consumer extends Thread{
      private final KafkaConsumer<Integer,String> consumer;
      private final String topic;
      public Consumer(String topic){
            Properties properties=new Properties();

         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192
.168.13.103:9092,192.168.13.104:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "practice-consumer");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //设置offset自动提交
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //自动提交间隔时间
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //对于当前groupid来说,消息的offset从最早的消息开始消费
            consumer= new KafkaConsumer<>(properties);
            this.topic=topic;

    }
    @Override
      public void run() {
            while(true) {
                  consumer.subscribe(Collections.singleton(this.topic));
                  ConsumerRecords<Integer, String> records =
            consumer.poll(Duration.ofSeconds(1));
                  records.forEach(record -> {
                        System.out.println(record.key() + " " + record.value() + " ->
offset:" + record.offset());

            }
            );

        }

    }
      public static void main(String[] args) {
            new Consumer("test").start();

    }
}

异步发送

kafka对于消息的发送,可以支持同步和异步,前面演示的案例中,我们是基于同步发送消息。同步会需要阻塞,而异步不需要等待阻塞的过程。
从本质上来说,kafka都是采用异步的方式来发送消息到broker,但是kafka并不是每次发送消息都会直接发送到broker上,而是把消息放到了一个发送队列中,然后通过一个后台线程不断从队列取出消息进行发送,发送成功后会触发callback。kafka客户端会积累一定量的消息统一组装成一个批量消息发送出去,触发条件是前面提到的batch.size和linger.ms。
而同步发送的方法,无非就是通过future.get()来等待消息的发送返回结果,但是这种方法会严重影响消息发送的性能。

public void run() {
        int num=0;
        while(num<50){
              String msg="pratice test message:"+num;
              try {
                    producer.send(new ProducerRecord<>(topic, msg), new Callback() {
                          @Override
                          public void onCompletion(RecordMetadata recordMetadata,
                Exception e) {
                                System.out.println("callback:
"+recordMetadata.offset()+"->"+recordMetadata.partition());

                }

            }
            );
                    TimeUnit.SECONDS.sleep(2);
                    num++;

        }
        catch (InterruptedException e) {
                    e.printStackTrace();

        }

    }

}

batch.size

生产者发送多个消息到broker上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb,意味着当一批消息大小达到指定的batch.size的时候会统一发送

linger.ms

Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms就是为每次发送到broker的请求增加一些delay,以此来聚合更多的Message请求。这个有点想TCP里面的Nagle算法,在TCP协议的传输中,为了减少大量小数据包的发送,采用了Nagle算法,也就是基于小包的等-停协议。
batch.size和linger.ms这两个参数是kafka性能优化的关键参数,batch.size和linger.ms这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要求,就会发送请求到broker上

一些基础配置分析

group.id

consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费.如下图所示,分别有三个消费者,属于两个不同的group,那么对于firstTopic这个topic来说,这两个组的消费者都能同时消费这个topic中的消息,对于此时的架构来说,这个firstTopic就类似于ActiveMQ中的topic概念。如右图所示,如果3个消费者都属于同一个group,那么此时firstTopic就是一个Queue的概念

image
image

enable.auto.commit

消费者消费位移的提交方式,true为自动提交,即consumer poll消息后自动提交上次之前poll的所有消息位移,若为false则需要手动提交,即consumer poll出的消息需要手动提交消息位移,提交消息位移的方式有同步提交和异步提交。

auto.commit.interval.ms

在enable.auto.commit 为true的情况下, 自动提交消费位移的间隔,默认值5000ms。那么消费者会在poll方法调用后每隔5000ms(由auto.commit.interval.ms指定)提交一次位移。和很多其 他操作一样,自动提交消费位移也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间(auto.commit.interval.ms指定的值),如果是则提交上一次poll返回的最大位移。具体什么时候提交消息位移,请看这篇[文章]。(https://zhuanlan.zhihu.com/p/112745985)

auto.offset.reset

这个参数是针对新的groupid中的消费者而言的,当有新groupid的消费者来消费指定的topic时,对于该参数的配置,会有不同的语义。
auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费Topic下的消息。
auto.offset.reset= earliest情况下,新的消费者会从该topic最早的消息开始消费。
auto.offset.reset=none情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。

max.poll.records

consumer是通过轮训的方式使用poll()方法不断获取消息的,max.poll.records参数可以限制每次调用poll返回的消息数,默认是500条。

max.poll.interval.ms

默认值5分钟,表示若5分钟之内consumer没有消费完上一次poll的消息,也就是在5分钟之内没有调用下次的poll()函数,那么kafka会认为consumer已经宕机,所以会将该consumer踢出consumer group,紧接着就会发生rebalance,发生rebalance可能会发生重复消费的情况。

正常消费端伪代码如下

while (true) {
    //取出消息
    ConsumerRecords<String,String> records = consumer.poll(100);
    for (ConsumerRecord<String,String> record : records) {
         //执行消费消息
         dosomething
     }
}

看到这里需要保证poll出的所有消息消费时间总和不能大于max.poll.interval.ms,如果大于则会将consumer踢出consumer group,会进行rebalance操作了,所有每次poll消息的数量不能太大,避免发生rebalance。

关于Topic和Partition

Topic

在kafka中,topic是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到kafka集群的消息都有一个类别。物理上来说,不同的topic的消息是分开存储的,
每个topic可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。

image

Partition(分区)

每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的,那么为什么要设置多partition呢?第一分区存储可以存储更多的消息,其次是为了提高吞吐量,如果只有一个partition,则所有消息只能存储在该partition内,消费时不管有多少个消费者也只能顺序读取该partition内的消息,如果是多个partition,那么消费者就可以同时从多个partition内并发读取消息,正是这个原因才提高了吞吐量。

每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的。

在多partition和多consumer的情况下,生产的消息是具有顺序性的,且根据partition的分发策略依次插入到相应的partition中,但是由于kafak只保证同一个partition内的消息输出有序性,所以多partition依次输出的消息顺序并不能保证和生产消息写入的顺序是一样的。

下图中,对于名字为test的topic,做了3个分区,分别是p0、p1、p2.
每一条消息发送到broker时,会根据partition的规则选择存储到哪一个partition。如果partition规则设置合理,那么所有的消息会均匀的分布在不同的partition中,这样就有点类似数据库的分库分表的概念,把数据做了分片处理。

image

Topic&Partition的存储

Partition是以文件的形式存储在文件系统中,比如创建一个名为firstTopic的topic,其中有3个partition,那么在kafka的数据目录(/tmp/kafka-log)中就有3个目录,firstTopic-0~3, 命名规则是<topic_name>-<partition_id>

sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 --topic firstTopic

关于消息分发

kafka消息分发策略

消息是kafka中最基本的数据单元,在kafka中,一条消息由key、value两部分构成,在发送一条消息时,我们可以指定这个key,那么producer会根据key和partition机制来判断当前这条消息应该发送并存储到哪个partition中。我们可以根据需要进行扩展producer的partition机制。

自定义Partitioner

public class MyPartitioner implements Partitioner {
      private Random random = new Random();
      @Override
      public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
            //获取集群中指定topic的所有分区信息
            List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(s);
            int numOfPartition=partitionInfos.size();
            int partitionNum=0;
            if(o==null){
                   //key没有设置
                  partitionNum=random.nextint(numOfPartition);
                //随机指定分区               
            } else{
                  partitionNum=Math.abs((o1.hashCode()))%numOfPartition;               
            }
            System.out.println("key->"+o+",value->"+o1+"->send to partition:"+partitionNum);
            return partitionNum;         
    }
}

发送端代码添加自定义分区

public KafkaProducerDemo(String topic,Boolean isAysnc){
      Properties properties=new Properties();
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
          "192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
      properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
      properties.put(ProducerConfig.ACKS_CONFIG,"-1");
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.IntegerSerializer");
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer");
     properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.wei.kafka.MyPartitioner");
      producer=new KafkaProducer<Integer, String>(properties);
      this.topic=topic;
      this.isAysnc=isAysnc;
}

消息默认的分发机制

默认情况下,kafka采用的是hash取模的分区算法。如果Key为null,则会随机分配一个分区。这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果key为null,则只会发送到唯一的分区。这个值值哦默认情况下是10分钟更新一次。

关于Metadata,这个之前没讲过,简单理解就是Topic/Partition和broker的映射关系,每一个topic的每一个partition,需要知道对应的broker列表是什么,leader是谁、follower是谁。这些信息都是存储在Metadata这个类里面。

消费端如何消费指定的分区

通过下面的代码,就可以消费指定该topic下的0号分区。其他分区的数据就无法接收

//消费指定分区的时候,不需要再订阅
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));

消息的消费原理

在实际生产过程中,每个topic都会有多个partitions,多个partitions的好处在于,一方面能够对broker上的数据进行分片有效减少了消息的容量从而提升io性能。另外一方面,提高了消费端的消费能力,如果只有一个partitions,那么多consumer也只能顺序读取该partitions内的消息,如果是多个partitions的话,那么多consumer就可以从多partitions并发生读取topic消息,这样就提高了消息断的消费能力,所以一般会设置多个consumer去消费同一个topic的多个partitions, 也就是消费端的负载均衡机制。

这也就是我们接下来要了解的,在多个partition以及多个consumer的情况下,消费者是如何消费消息的。

kafka存在consumer group的概念,也就是group.id一样的consumer,这些consumer属于一个consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的consumer来消费,那么同一个consumergroup里面的consumer是怎么去分配该消费哪个分区里的数据的呢?如下图所示,3个分区,3个消费者,那么哪个消费者消分哪个分区?

image

对于上面这个图来说,这3个消费者会分别消费test这个topic 的3个分区,也就是每个consumer消费一个partition。

  • 演示1(3个partiton对应3个consumer)
    Ø 创建一个带3个分区的topic
    Ø 启动3个消费者消费同一个topic,并且这3个consumer属于同一个组
    Ø 启动发送者进行消息发送

演示结果:consumer1会消费partition0分区、consumer2会消费partition1分区、consumer3会消费partition2分区
如果是2个consumer消费3个partition呢?会是怎么样的结果?

  • 演示2(3个partiton对应2个consumer)
    Ø 基于上面演示的案例的topic不变
    Ø 启动2个消费这消费该topic
    Ø 启动发送者进行消息发送
    演示结果:consumer1会消费partition0/partition1分区、consumer2会消费partition2分区

  • 演示3(3个partition对应4个或以上consumer)
    演示结果:仍然只有3个consumer对应3个partition,其他的consumer无法消费消息
    通过这个演示的过程,引出接下来需要了解的kafka的分区分配策略(Partition Assignment Strategy)

consumer和partition的数量建议

  1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
  2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀,被取的不均匀也就代表是消费能力不均匀最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
  3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

什么是分区分配策略

通过前面的案例演示,我们应该能猜到,同一个group中的消费者对于一个topic中的多个partition,存在一定的分区分配策略,每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,这个"赞同"的规则请继续往下看
在kafka中,存在三种分区分配策略,一种是Range(默认)、 另一种是RoundRobin(轮询)、StickyAssignor(粘性)。 在消费端中的ConsumerConfig中,通过这个属性来指定分区分配策略

public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";

RangeAssignor(范围分区)

Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

假设n = 分区数/消费者数量
m= 分区数%消费者数量
那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区

假设我们有10个分区,3个消费者,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区.
结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C3-0 将消费 7, 8, 9 分区

假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C3-0 将消费 8, 9, 10 分区

假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C3-0 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重也就代表着C1-0这个消费者的消费能力会低于C2-0和C3-0消费者,导致的问题直接点说就是消费者的消费能力不平衡,所以最好的情况就是partiton数目是consumer数目的整数倍,可以有效避免这个弊端。

RoundRobinAssignor(轮询分区)

轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进行排序,注意上一种range分区是针对每一个topic而言的,而轮训分区是相对于所有的partition和consumer而言的,最后通过轮询算法分配partition给消费线程。如果消费组内,所有消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。

在我们的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1(c1和c2 consumer group都订阅了t1),最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;

相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)

对于订阅组内消费者订阅Topic不一致的情况:假设有三个消费者分别为C1-0、C2-0、C3-0,有3个Topic T1、T2、T3,分别拥有1、2、3个分区,并且C1-0订阅T1,C2-0订阅T1和T2,C3-0订阅T1、T2、T3,那么RoundRobinAssignor的分配结果如下:



看上去分配已经尽量的保证均衡了,不过可以发现C3-0承担了4个分区的消费而C2-0和C1-0都是承担一个分区,如果T2-1分配给c2-0,均衡性是不是更好呢?带个这个问题,继续下面的这次策略。

StrickyAssignor 分配策略

背景

尽管RoundRobinAssignor已经在RangeAssignor上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下更核心的问题是无论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果显然,在执行一次新的分配之前,如果能考虑到上一次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。

kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫粘性策略可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动(上一次的结果是有粘性的),它主要有两个目的:

  • 分区的分配尽可能的均匀
  • 分区的分配尽可能和上次分配保持相同,也就是rebalance之后分区的分配尽量和之前的分区分配相同。

当两者发生冲突时, 第 一 个目标优先于第二个目标。 第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。

我们举俩个例子来体现StickyAssignor特性

第一个例子:所有consumer订阅的topic都相同的情况:

  • 有3个Consumer:C0、C1、C2

  • 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区

  • 所有Consumer都订阅了这4个分区

StickyAssignor的分配结果如下图所示(增加RoundRobinAssignor分配作为对比):


上面的例子中,删除C1 consumerre然后balance,RoundRobin策略会将所有分区重新进行一遍分配,可以看到变动较大,而Sticky模式原来分配给C0、C2的分区都没有发生变动,且最终C0、C1达到的均衡的目的,这就体现了StickyAssignor策略的优越性

再举一个例子:所有consumer订阅的topic不相同的情况:

  • 有3个Consumer:C0、C1、C2

  • 3个Topic:T0、T1、T2,它们分别有1、2、3个分区

  • C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2

分配结果如下图所示:


首先在所有consumer订阅的topic不相同的情况下,可以看出StickyAssignor策略相比于RoundRobin策略均衡性更好,体现了StickyAssignor策略的第一个特点:分区的分配尽可能的均匀,看到这里也解决了我们上节留下的疑问。

其次是,在删除C0消费者进行rebalance之后,可以看出使用RoundRobin策略的分区会重新进行一遍RoundRobin,而使用StickyAssignor策略的分区分配尽可能的和上次保持了最小变动。

以上俩个例子,完美体现了StickyAssignor策略的优越性。

rebalance触发的场景

在上面的例子中可以看到rebalance触发的场景大致有如下三种情况:
(1)Consumer增加或删除会触发 Consumer Group的Rebalance
(2)Broker的增加或者减少都会触发 Consumer Rebalance
(3)consumer在超过max.poll.interval.ms时间后没有再次poll的操作,kafka会认为该consumer宕机,也就会将该consumer踢出group,触发rebalance

谁来执行Rebalance以及管理consumer的group呢?

Kafka提供了一个角色:coordinator来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信

如何确定coordinator

consumer group如何确定自己的coordinator是谁呢, 消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator

JoinGroup的过程

在rebalance之前,需要保证coordinator是已经确定好了的,整个rebalance的过程分为两个步骤,JoinSync

join: 表示加入到consumer group中,在这一步中,所有的成员都会向coordinator发送joinGroup的请求。一旦所有成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色,并把组成员信息和订阅信息发送消费者
leader选举算法比较简单,如果消费组内没有leader,那么第一个加入消费组的消费者就是消费者leader,如果这个时候leader消费者退出了消费组,那么重新选举一个leader,这个选举很随意,类似于随机算法

image

protocol_metadata: 序列化后的消费者的订阅信息
leader_id: 消费组中的消费者,coordinator会选择一个座位leader,对应的就是member_id
member_metadata 对应消费者的订阅信息
members:consumer group中全部的消费者的订阅信息
generation_id: 年代信息,类似于之前讲解zookeeper的时候的epoch是一样的,对于每一轮rebalance,generation_id都会递增。主要用来保护consumer group。隔离无效的offset提交。也就是上一轮的consumer成员无法提交offset到新的consumer group中。

每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,这个"赞同"的规则是,消费组内的各个消费者会通过投票来决定

  • 在joingroup阶段,每个consumer都会把自己支持的分区分配策略发送到coordinator
  • coordinator手机到所有消费者的分配策略,组成一个候选集
  • 每个消费者需要从候选集里找出一个自己支持的策略,并且为这个策略投票
  • 最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略

Synchronizing Group State阶段

完成分区分配之后,就进入了Synchronizing Group State阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配方案同步给consumer group 中的所有consumer

image

每个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,其他消费者只是打打酱油而已。当leader把方案发给coordinator以后,coordinator会把结果设置到SyncGroupResponse中。这样所有成员都知道自己应该消费哪个分区。

consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性

总结

我们再来总结一下consumer group rebalance的过程
Ø 对于每个consumer group子集,都会在服务端对应一个GroupCoordinator进行管理,GroupCoordinator会在zookeeper上添加watcher,当消费者加入或者退出consumer group时,会修改zookeeper上保存的数据,从而触发GroupCoordinator开始Rebalance操作
Ø 当消费者准备加入某个Consumer group或者GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的在网络中的位置,这个时候就需要确定GroupCoordinator,消费者会向集群中的任意一个Broker节点发送ConsumerMetadataRequest请求,收到请求的broker会返回一个response作为响应,其中包含管理当前ConsumerGroup的GroupCoordinator,
Ø 消费者会根据broker的返回信息,连接到groupCoordinator,并且发送HeartbeatRequest,发送心跳的目的是要要奥噶苏GroupCoordinator这个消费者是正常在线的。当消费者在指定时间内没有发送心跳请求,则GroupCoordinator会触发Rebalance操作。

Ø 发起join group请求,两种情况

  • 如果GroupCoordinator返回的心跳包数据包含异常,说明GroupCoordinator因为前面说的几种情况导致了Rebalance操作,那这个时候,consumer会发起join group请求
  • 新加入到consumer group的consumer确定好了GroupCoordinator以后消费者会向GroupCoordinator发起join group请求,GroupCoordinator会收集全部消费者信息之后,来确认可用的消费者,并从中选取一个消费者成为group_leader。并把相应的信息(分区分配策略、leader_id、…)封装成response返回给所有消费者,但是只有group leader会收到当前consumer group中的所有消费者信息。当消费者确定自己是group leader以后,会根据消费者的信息以及选定分区分配策略进行分区分配
  • 接着进入Synchronizing Group State阶段,每个消费者会发送SyncGroupRequest请求到GroupCoordinator,但是只有Group Leader的请求会存在分区分配结果,GroupCoordinator会根据Group Leader的分区分配结果形成SyncGroupResponse返回给所有的Consumer。
  • consumer根据分配结果,执行相应的操作

到这里为止,我们已经知道了消息的发送分区策略,以及消费者的分区消费策略和rebalance。对于应用层面来说,还有一个最重要的东西没有讲解,就是offset,他类似一个游标,表示当前消费的消息的位置。

如何保存消费端的消费位置

什么是offset

前面在讲解partition的时候,提到过offset, 每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的; 对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。那么offset保存在哪里?

image

offset在哪里维护?

在kafka中,提供了一个consumer_offsets_* 的一个topic,把offset信息写入到这个topic中。
consumer_offsets——按保存了每个consumer group某一时刻提交的offset信息。
__consumer_offsets 默认有50个分区。
根据前面我们演示的案例,我们设置了一个KafkaConsumerDemo的groupid。首先我们需要找到这个consumer_group保存在哪个分区中
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
计算公式:
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由于默认情况下groupMetadataTopicPartitionCount有50个分区,计算得到的结果为:35, 意味着当前的consumer_group的位移信息保存在__consumer_offsets的第35个分区
执行如下命令,可以查看当前consumer_goup中的offset位移提交的信息

kafka-console-consumer.sh --topic __consumer_offsets --partition 15 --bootstrap-server 192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092
--formatter
'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

从输出结果中,我们就可以看到test这个topic的offset的位移日志

分区的副本机制

我们已经知道Kafka的每个topic都可以分为多个Partition,并且同一topic的多个partition会均匀分布在集群的各个节点下虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。

每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志,和redis cluster中的节点概念相同,leader副本为redis cluster中的主节点,follower副本为redis cluster中的备节点

一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高kafka集群的可用性。

创建一个带副本机制的topic

通过下面的命令去创建带2个副本的topic

sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic

然后我们可以在/tmp/kafka-log路径下看到对应topic的副本信息了。我们通过一个图形的方式来表达。
针对secondTopic这个topic的3个分区对应的3个副本

image

通常follower副本和leader副本不会在同一个broker上,这种是为了保证当leader副本所在broker宕机后,follower副本可继续提供服务。

如何知道哪个各个分区中对应的leader是谁呢?

在zookeeper服务器上,通过如下命令去获取对应分区的信息, 比如下面这个是获取secondTopic第1个分区的状态信息。

get /brokers/topics/secondTopic/partitions/1/state

{"controller_epoch":12,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1]}
或通过这个命令

sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test_partition

leader表示当前分区的leader是那个broker-id。下图中。绿色线条的表示该分区中的leader节点。其他节点就为follower

image

需要注意的是,kafka集群中的一个broker中最多只能有一个副本,leader副本所在的broker节点的分区叫leader节点,follower副本所在的broker节点的分区叫follower节点

副本的leader选举机制

Kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的leader节点发生故障,这个时候怎么处理呢?
那么,kafka必须要保证从follower副本中选择一个新的leader副本。那么kafka是如何实现选举的呢?
要了解leader选举,我们需要了解几个概念
Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同可分为3类:

  • leader副本:响应clients端读写请求的副本
  • follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
  • ISR副本:Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能,ISR副本包含了leader副本和所有与leader副本保持同步的follower副本,注意是和保持同步,不包含和leader副本没保持同步的follower副本。

副本协同机制

刚刚提到了,消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当leader副本所在的broker挂了以后,会从ISR副本中的follower副本中选取新的leader。

写请求首先由Leader副本处理,之后follower副本会从leader上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。但是如果一个follower副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,leader就会把它踢出去。kafka通过ISR集合来维护一个分区副本信息

image

一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader;leader负责维护和跟踪ISR(in-Sync replicas , 副本同步队列)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。

ISR

ISR表示目前可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集。怎么去理解可用和相差不多这两个词呢?具体来说,ISR集合中的副本必须满足两个条件:

  1. 副本所在节点必须维持着与zookeeper的连接
  2. 副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值。(replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内一直没有追上过leader的所有消息,则该follower就会被剔除isr列表
  3. ISR数据保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state 节点中

follower副本把leader副本前的日志全部同步完成时,则认为follower副本已经追赶上了leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafka副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数replica.lag.time.max.ms 的值,如果大于,则会把这个副本踢出ISR集合

image

如何处理所有的Replica不工作的情况,也可以理解为leader的选举

在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  1. 等待ISR中的任一个Replica“活”过来,并且选它作为Leader
  2. 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader,默认配置。

这就需要在可用性和一致性当中作出一个简单的折中。

如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。

选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(所有读写都由Leader完成)。

默认情况下Kafka采用第二种策略,即unclean.leader.election.enable=true,也可以将此参数设置为false来启用第一种策略。

副本数据同步原理

了解了副本的协同过程以后,还有一个最重要的机制,就是数据的同步过程。

下图中,深红色部分表示test_replica分区的leader副本,另外两个节点上浅色部分表示follower副本

image

Producer在发布消息到某个Partition时,

  • 先通过ZooKeeper找到该Partition的Leader get /brokers/topics/<topic>/partitions/2/state ,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。
  • Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。
  • Follower在收到该消息并写入其Log后,向Leader发送ACK。
  • 一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW(HighWatermark)并且向Producer发送ACK。

LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的,可以看出leader副本和follower副本都有LEO

HW:即所有follower副本中相对于leader副本最小的LEO值。HW是相对leader副本而言的,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别的

通过下面这幅图来表达LEO、HW的含义,随着follower副本不断和leader副本进行数据同步,follower副本的LEO主键会后移并且追赶到leader副本,这个追赶上的判断标准是当前副本的LEO是否大于或者等于leader副本的HW,如果follower在replica.lag.time.max.ms时间范围内追赶上了leader副本,该follower副本则加入到ISR副本内,也可以使得之前被踢出的follower副本重新加入到ISR集合中;如果在replica.lag.time.max.ms时间范围内follower副本没追赶上leader副本,该follower副本会被从ISR副本范围内踢出,可以看出ISR副本是一个由zookerper动态监控的变化的副本

另外, 假如说下图中的最右侧的follower副本被踢出ISR集合,也会导致这个分区的HW发生变化,变成了3

image

数据可靠性和持久性保证

producer数据不丢失

当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

  • request.required.acks=0
    producer写入的一条消息会立即返回ack确认消息,不管leader副本是否同步完或者ISR中的follower副本是否同步完,此配置丢失数据风险很大,生产环境很少使用。
  • request.required.acks=1(默认配置)
    producer写入的一条消息后会等到leader副本同步完成(不需要等到ISR内的follower副本同步完成)后立即返回给客户端ack消息。该配置的风险是如果ISR内的follower副本还没有完成信息同步时,leader节点宕机了,然后通过选举一个follower副本做为新的节点,此时就会有数据丢失的问题,相当于mysql的主从同步,优点就是可用性强,缺点就是弱一致性,可能造成数据丢失。
  • request.required.acks=-1
    producer写入的一 条消息需要等到分区的leader 副本完成同步,且需要等待ISR集合中的所有follower副本都同步完之后才能返回producer确认的ack,这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失,类似于强一致性,追求强一致性也就意味着可用性(响应时间)会降低。设置成-1就可以保证写入的数据不丢失了吗?不一定,`比如当ISR中只有leader副本时(前面ISR那一节讲到,ISR副本中的成员由于某些情况会增加也会减少,最少就只剩一个leader),当leader副本宕机后,所有数据丢失。

为了避免数据的丢失,提高可靠性,避免ISR副本中只有一个leader副本情况的发生,可以使用参数min.insync.replicas来约束,该参数的意思是设定ISR中的最小副本数是多少,总数包含leader副本和follower副本之和,如果ISR中的副本数不够参数min.insync.replicas所设定的值,客户端会返回异常。

如果由于网络原因导致producer push数据失败了,我们可以设置retries参数来进行重试,总结:producer消息不丢失需要下面3中措施

  • request.required.acks=-1
  • 设置min.insync.replicas参数
  • 设置retries参数

broker数据不丢失

上面已经介绍过unclean.leader.election.enable=false参数。
这里设置unclean.leader.election.enable=false,表示:如果ISR副本全部宕机后,等到ISR副本中的里一个副本启动之后,并将他做为leader副本.

consumer数据不丢失

enable.auto.commit该参数默认为true,表明consumer在下次poll消息时自动提交上次poll出的所有消息的消费位移,如果设置为false,则需要用户手动提交手动提交所有消息的消费位移

消息重复消费和消息丢失的场景

当 enable.auto.commit设置为true的时候会有消息重复消费和消息丢失的场景。

当应用端消费消息时,还没有提交消费位移的时候,此时kafka出现宕机,那么在kafka恢复之后,这些消息将会重新被消费一遍,这就造成了重复消费。

比如consumer第一次poll出n条消息进行消费,达到auto.commit.interval.ms时间后,cosumer会进行下一次poll并提交上次poll出的n条消息的消费位移。如果第一次poll出的n条消息客户端还没有消费完,此时客户端宕机了,当客户端重启后,将会从第二次poll的位置开始拉取消息,从而丢失第一次未提交消费位移的消息,这就造成了数据丢失。

只能避免数据丢失而不能解决数据重复

当设置enable.auto.commit为false时,所有的消息位移提交都为手动提交了,所有可以避免上面提到的数据丢失问题,可以保证consumer消息时数据不会丢失。

手动提交有同步提交和异步提交,我们可以选择在应用端处理完消息后手动提交消费位移。如果在消费完消息准备提交消息位移的时候,应用端发生了宕机,那么重启之后这些消息还是会被重新消费一遍,所以通过配置enable.auto.commit参数为false只能避免消费端丢失消息而不能避免消费端重复消费消息.

kafka是怎么保证消息顺序消费的

生产者维度

首先要知道kafka只能保证同一个partion内消息消费顺序,而不能保证topic内的所有消息消费。每个partition会维护一个从0开始递增的offset,比如上个消息消费的是100的offset,那么下个消费的消息应该为offset为101,所以offset是保证partition可以顺序消费的前提。

如果需要顺序消费,怎么可以将消息写入到一个partition呢?


默认情况下,如果消息中的key为null,则随机分配到一个分区,如果key存在则会对消息中的key进行取模 hash(key)/patitionNum求得分区id。
因此如果消息是需要顺序消费的,那么将所有消息的key设置成相同值即可。

消费者维度

每个分区只能由group内的同一个消费者消费,不能被多个消费者消费,如果被多个消费者消费的话就保证不了消息的顺序性了,为什么呢?举个例子,如下:

比如消费者A和消费者B同时消费某个partition,且消费者 A 处理速度更快。例如,消费者 A 先获取到分区中的第 0 条消息,而消费者 B 获取分区中的第 1 条消息,由于消费者A消费速度更快且消费完了第0条,接着获取到了第2条消息进行消费,而此时由于消费者B消费较慢,刚刚消费完第1条消息,此时消息的顺序变为0->2->1,这就出现了消息乱序的情况。

此外,在某些情况下,消费者在处理消息时可能会失败,例如发生网络错误、消息处理异常等。如果消费者在处理消息时失败,那么 Kafka 会将该消息重新发送给消费者,这就会导致消息顺序错乱。

consumer-group的具体作用是什么

同一个topic可以有多个consumer-group组时,每个consumer-group都会订阅topic的所有消息,每个consumer-group内的所有消费者会协调消费topic内的所有分区。

这里引出一个问题,既然每个consumer-group都消费整个topic的消息,那么分group的作用是啥?

group顾名思义就是分组,可以对topic中的消息的某个字段进行分组消费。在Kafka中,Consumer Group是一个逻辑上的概念,用于将一组Consumer组织在一起,共同消费一个或多个Topic中的消息。具体的过滤过滤逻辑需要用户自己实现

如下:我们建了一个test-group的分组,该分组只对test-topic中消息字段cmd为first的消息进行消费,其他类型的消息都直接过滤。

public class ConsumerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        //kafka的服务地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //设置一个consumer-group为test-group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        //将消息的KEY和VALUE反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        //实例化一个消费者,并将该消费者加入到test-group组中
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //该消费组订阅localhost:9092集群的test-topic
        consumer.subscribe(Collections.singleton("test-topic"));

        while (true) {
            //拉去消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value = %s%n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                //根据具体的业务代码上线group过滤:比如该group只接该topic中消息体cmd字段为first的消息,其他消息则过滤
                if(record.value().cmd == "first"){
                    //执行具体的业务代码。入库或者是调取api
                }
            });
        }
    }
}

如果想消费该topic种cmd为sencod的消息,则我们可以继续创建另一个消费者,可以发现,consumer-group的作用就是对topic中的消息进行分组,然后对分组后的数据自行实现具体的业务逻辑。

看了上面的代码,可能有个疑问,为什么没有消费者的地址?

因为 Kafka 消费者不需要显式地指定其 IP 地址或主机名。Kafka 消费者只需要知道 Kafka 集群的地址,即可与 Kafka 集群建立连接并消费消息。因此,Kafka 消费者可以在任何具有网络连接的机器上启动,并且可以连接到远程 Kafka 集群。

在上面的代码中,我们将 bootstrap.servers 属性设置为 "localhost:9092",这意味着 Kafka 消费者将连接到运行在本地主机上的 Kafka 集群。如果 Kafka 集群运行在远程主机上,则可以将 bootstrap.servers 属性设置为 Kafka 集群的公共 IP 地址或 DNS 名称。当 Kafka 消费者启动时,它将使用 bootstrap.servers 属性指定的地址和端口,尝试与 Kafka 集群建立连接。

consumer-group怎么控制下发qps

Kafka 消费者组并不提供直接控制下发消息速率的选项。Kafka 消费者消费的速率主要受到以下几个因素的影响:

  • 分区数:Kafka 消费者从多个分区消费消息,分区数越多,消费的消息速率也就越快。
  • 消费者数:如果 Kafka 消费者组中有多个消费者,那么消费者组将会同时从多个分区消费消息,从而提高消息消费的速率。
  • 消费者消费消息的速度:Kafka 消费者可以通过 poll() 方法控制从 Kafka 服务端获取消息的速率,消费者可以通过调整 poll() 方法的超时时间,来控制下发消息的速率。但是这种方法只能粗略地控制下发消息的速率,无法实现精确的 QPS 控制。

如果需要对下发消息的速率进行精确控制,可以考虑以下方法:

  • 控制生产者生产消息的速率:可以通过限制生产者发送消息的速率,从而限制下发消息的速率。
  • 手动提交消费位移:Kafka 消费者可以手动提交消费位移,这样就可以在消费者消费速率过快时暂停消费,以便控制下发消息的速率。可以使用 Consumer.pause() 方法暂停分区的消费,等待处理完成后再使用 Consumer.resume() 方法继续消费。
  • 使用 Kafka 的流处理 API:如果需要对下发消息的速率进行精确的控制,可以考虑使用 Kafka 的流处理 API。Kafka 流处理 API 提供了更细粒度的控制能力,可以根据具体需求对下发消息的速率进行定制化的控制。

使用手动位移控制qps的代码如下

public class ManualOffsetCommitConsumerExample {
   private static final String TOPIC_NAME = "test-topic";
   private static final String CONSUMER_GROUP_ID = "test-group";
   private static final String BOOTSTRAP_SERVERS = "localhost:9092";
   private static final int MAX_POLL_RECORDS = 100;
   private static final int MAX_POLL_INTERVAL_MS = 5000; // 5 seconds
   private static final int DESIRED_QPS = 20;
   private static final int MAX_BATCH_SIZE = DESIRED_QPS * 2; // allow for burst rate

   public static void main(String[] args) {
       Properties props = new Properties();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

       KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
       consumer.subscribe(Collections.singleton(TOPIC_NAME));

       final int batchSize = Math.min(MAX_BATCH_SIZE, MAX_POLL_RECORDS);
       List<ConsumerRecord<String, String>> buffer = new ArrayList<>(batchSize);

       try {
           while (true) {
               ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(MAX_POLL_INTERVAL_MS));
               for (ConsumerRecord<String, String> record : records) {
                   buffer.add(record);
                   if (buffer.size() >= batchSize) {
                       processRecords(buffer);
                       buffer.clear();
                   }
               }
               // If we have remaining records in the buffer, process them as well.
               if (!buffer.isEmpty()) {
                   processRecords(buffer);
                   buffer.clear();
               }
               consumer.commitSync();
           }
       } catch (WakeupException e) {
           // Ignore the exception and close the consumer.
       } finally {
           consumer.close();
       }
   }

   private static void processRecords(List<ConsumerRecord<String, String>> records) {
       // Process the records in some way.
       for (ConsumerRecord<String, String> record : records) {
           System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
                   record.key(), record.value(), record.partition(), record.offset());
       }
       // Wait to achieve the desired QPS.
       try {
           Thread.sleep(1000 / DESIRED_QPS);
       } catch (InterruptedException e) {
           // Ignore the exception and proceed.
       }
   }
}

topic怎么设置合理的分区数呢?

  • 同一topic的多个partition会均匀分布在集群的各个节点下,每个节点相同的partion数可以尽可能的使机器资源均匀。也就是partion数最好是broker节点的倍数。
  • 根据上面的分区分配策略,发现partition数最好为consumer数的整数倍,这样每个consumer负责消费的partition数,消费能力也就均匀相同。

所以,partition数的设置最好为为broker数和consumer数的公倍数,比如broker个数为6个,consumer个数为10个,那么可设置30个partition数。

consumer-group怎么避免重复消费

  • 消费组ID:消费组ID 是 Kafka 用于标识一组消费者的字符串,具有相同的消费组 ID 的多个消费者共同消费一个或多个主题。消费者在一个消费组中协调分配分区,这样每个分区只由一个消费者消费。因此,消费组ID 可以避免多个消费者重复消费同一条消息。

  • 消费位移(Offset):在消费消息时,Kafka 会记录每个消费者的消费位移,也就是消费者消费消息的偏移量。这些偏移量存储在 Kafka 中,消费者会定期地提交它们的偏移量。如果一个消费者发生故障,那么其他消费者可以接管它消费的分区,并继续从故障消费者消费位移的下一个消息开始消费,避免重复消费同一条消息。

  • 重复消息检测:消费者在消费消息时,可以根据消息的唯一标识(例如消息的键值)进行重复消息检测。如果消息已经被消费过,则可以忽略它,从而避免重复消费。

问:通常需要在consumer处做避免消息重复的处理吗?还是kafka不会出现这种情况?

答:在使用 Kafka 时,通常需要在消费者端进行消息去重处理,以避免重复消费。这是因为在 Kafka 中,同一个消息可能会被多个消费者消费,或者同一个消费者在不同的时间重复消费同一个消息。这种情况可能会在以下情况下发生:

  • 多个消费者订阅同一个主题,并且消费者组 ID 相同或不同,因此同一个消息可能会被多个消费者消费,这种属于正常现象,既然多个消费组消费同一topic,就肯定会出现重复消息的消费。

  • 消费者在处理消息时发生错误,导致消费位移提交失败,下一次启动时会重新消费之前未成功处理的消息。

  • 消费者在处理消息时发生错误,但是消费位移提交成功了,因此消费者认为消息已经处理成功,但实际上并没有。

为了避免这些情况,消费者通常需要对消息进行去重处理,例如记录已经处理过的消息的 ID 或者 offset,以避免重复处理。可以使用缓存或者数据库等方式存储已经处理过的消息的信息,并在消费时进行判断。同时,消费者也可以在处理完消息后立即提交消费位移,以保证消费位移的可靠提交,避免下次启动时重复消费。

在这个示例代码中,我们使用了一个 HashSet 来记录已经处理过的消息的 ID。在每次消费消息时,我们首先根据消息的 topic、partition 和 offset 来构造一个唯一的消息 ID,然后判断该消息 ID 是否已经在已处理集合中存在。如果存在,则说明该消息已经被处理过,可以跳过不做处理;否则,我们可以对消息进行处理,并将该消息 ID 记录到已处理集合中,以便在下次消费时判断该消息是否已经被处理过。

另外,我们在处理完消息后立即提交消费位移,以保证消费位移的可靠提交,避免下次启动时重复消费。

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "test-topic";
    private static final String GROUP_ID = "test-group";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    // 记录已经处理过的消息 ID 集合
    private static Set<String> processedMessageIds = new HashSet<>();

    public static void main(String[] args) {
        // 配置 Kafka 消费者
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("group.id", GROUP_ID);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                String messageId = record.topic() + ":" + record.partition() + ":" + record.offset();
                // 判断消息是否已经被处理过
                if (processedMessageIds.contains(messageId)) {
                    System.out.println("Message has already been processed: " + messageId);
                    continue;
                }

                // 处理消息
                System.out.println("Processing message: " + record.value());

                // 将消息 ID 记录到已处理集合中
                processedMessageIds.add(messageId);

                // 提交消费位移
                consumer.commitSync();
            }
        }
    }

利用HashSet来记录消息,如果消息量过大会出现内存溢出,这种方式不适合用在大消息场景。在大消息场景可以使用如下俩种方案

  • 消息去重表
    可以在消费者端维护一个消息去重表,记录已经消费过的消息的id或者offset,每次消费新消息时先在去重表中查询是否已经消费过,如果已经消费过,则跳过该消息,否则正常消费。缺点是需要消费者维护这个表,增加了代码复杂度和存储开销。

  • 分布式缓存
    使用分布式缓存(如Redis)作为消息去重表,每次消费新消息时先在缓存中查询是否已经消费过,如果已经消费过,则跳过该消息,否则正常消费。缺点是需要额外的缓存开销和网络通信开销。

  • 去重服务
    可以单独部署一个去重服务,消费者将消息发送到该服务进行去重,再将去重后的消息返回给消费者进行消费。优点是可以实现与消费业务的解耦,但是需要额外的网络通信和部署开销。

消息的幂等性是什么意思?

在 Kafka 中,消息幂等性指的是相同的消息被重复发送到同一个主题分区时,不会导致数据的重复写入。也就是说,即使同一条消息被多次写入,也只会保留一份数据。

幂等性是 Kafka 提供的一种可选特性,通过使用生产者端的幂等性保证消息不会重复写入到分区中,消费者端就可以避免消费到重复的消息,从而保证数据的一致性。

可通过俩种方式实现生产者的消息幂等性

  • 发送唯一标识符(如UUID)作为消息的key,保证相同key的消息总是被写入到同一个分区中。在写入消息之前,可以先查询该key对应的消息是否已经被写入到分区中,如果已经存在,则不再写入,从而实现幂等性。

  • 启用Kafka的生产者端幂等性特性。通过配置enable.idempotence=true启用生产者端幂等性特性,生产者端会自动在内部缓存和重试未确认的消息,并使用序列号对每个消息进行标记。这样,即使相同的消息被重复发送,只有第一次发送的消息会被写入分区中,后续发送的消息会被过滤掉。

幂等性是 Kafka 提供的一种可选特性,因为启用该特性会带来额外的开销。启用幂等性会增加生产者端的延迟和网络带宽消耗,因为生产者需要在本地缓存未确认的消息,并在发送完成确认之前,阻塞等待来自服务器的响应。同时,启用幂等性还可能增加服务器端的开销,因为服务器需要维护每个生产者的幂等性状态信息。因此,是否启用幂等性需要根据具体的业务场景和性能需求来进行决策。

问:可能有人疑问第一种方式,为什么必须写入同一分区才能判断?

答:在Kafka中,同一个分区中的消息是有序的,因此如果我们将具有相同key的消息写入同一个分区,那么它们将被存储在相邻的消息偏移量上,这样在进行幂等性检查时会更方便。如果将具有相同key的消息写入不同的分区,那么它们就可能被存储在不同的消息偏移量上,这样就需要在多个分区中进行幂等性检查,增加了实现难度。
基于上面第一种方案,在写入消息之前,可以先查询该key对应的消息是否已经被写入到分区中,可以使用Kafka提供的API中的KafkaConsumer来实现查询功能。具体代码实现如下如下:

// 创建一个KafkaConsumer实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅指定的topic
consumer.subscribe(Collections.singleton(topic));

// 构造查询的参数,key为需要查询的消息的key,partition为消息所在的分区
TopicPartition tp = new TopicPartition(topic, partition);
Map<TopicPartition, Long> query = new HashMap<>();
query.put(tp, 0L);

// 调用KafkaConsumer的seekToEnd方法获取指定分区的最后一个消息的offset
consumer.seekToEnd(Collections.singleton(tp));
long lastOffset = consumer.position(tp);

// 如果该分区的offset为0,则说明该分区没有消息,可以直接写入
if (lastOffset == 0) {
   // 写入消息
} else {
   // 如果该分区不为空,就需要查询该key对应的消息是否已经存在
   consumer.seek(tp, 0L);
   while (consumer.position(tp) < lastOffset) {
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
       for (ConsumerRecord<String, String> record : records) {
           // 判断消息是否匹配
           if (record.key().equals(key) && record.value().equals(value)) {
               // 如果消息已经存在,直接返回
               return;
           }
       }
   }
   // 如果该分区不存在该消息,可以写入
   // 写入消息
}

以上代码中,首先订阅指定的topic,然后构造查询的参数,查询该分区的最后一个消息的offset,如果该分区没有消息则可以直接写入。如果该分区不为空,就需要查询该key对应的消息是否已经存在,查询过程中通过调用poll方法获取分区中的消息,然后遍历消息列表进行比对,如果匹配到了则直接返回,否则可以写入新消息。

基于上面的第二种方案,代码如下

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String CLIENT_ID = "test-producer-client";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 开启幂等性
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 设置确认方式为all

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            String key = String.valueOf(i % 3); // 生成key,保证相同key的消息写入同一个分区
            String value = "hello world " + i;

            // 构造消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, key, value);

            try {
                // 发送消息
                RecordMetadata metadata = producer.send(record).get();
                System.out.printf("Produced message: key=%s, value=%s, partition=%d, offset=%d\n",
                        key, value, metadata.partition(), metadata.offset());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }

        producer.close();
    }
}

在上述代码中,我们通过将enable.idempotence配置设置为true来开启生产端的幂等性,同时将acks配置设置为all,以保证消息被所有副本成功写入才被确认。在构造消息时,我们将相同的key传递给ProducerRecord构造函数,保证具有相同key的消息被写入同一个分区。这样,在生产端发送消息时,就能够保证具有相同key的消息只会被写入一次。

Kafka消费者push消息的模式

Kafka的发送模式由producer端的配置参数producer.type来设置,这个参数指定了在后台线程中消息的发送方式是同步的还是异步的,默认是同步的方式,即producer.type=sync

如果设置成异步的模式,即producer.type=async,可以是producer以batch的形式push数据,就是将消息按批量的方式发送,而不是一条一条的发送,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须要将producer.type设置为sync。

高可靠性配置

要保证数据写入到Kafka是安全的,高可靠的,需要如下的配置:

  • 分区副本, 你可以创建分区副本来提升数据的可靠性,避免数据丢失,但是分区数过多也会带来性能上的开销,一般来说,3个副本就能满足对大部分场景的可靠性要求
  • topic的配置:replication.factor>=3,指副本数至少是3个;2<=min.insync.replicas<=replication.factor,指ISR中的副本数大于等于2,且小于等于3
  • broker的配置:leader的选举条件unclean.leader.election.enable=false
  • producer的配置:request.required.acks=-1(all),producer.type=sync

消息的存储,消息的持久化

消息发送端发送消息到broker上以后,消息是如何持久化的呢?那么接下来去分析下消息的存储首先我们需要了解的是,kafka是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个offset值来表示它在分区中的偏移量。Kafka中存储的一般都是海量的消息数据,为了避免日志文件过大,Log并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>-<partition_id>

消息的文件存储机制

一个topic的多个partition在物理磁盘上的保存路径,路径保存在 /tmp/kafka-logs/topic-partition,包含日志文件、索引文件和时间索引文件

image

kafka是通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。
segment的常用配置有:

#server.properties

#segment文件的大小,默认为 1G
log.segment.bytes=1024*1024*1024
#滚动生成新的segment文件的最大时长
log.roll.hours=24*7
#segment文件保留的最大时长,超时将被删除
log.retention.hours=24*7

那么这个LogSegment是什么呢?

LogSegment

假设kafka以partition为最小存储单位,那么我们可以想象当kafka producer不断发送消息,必然会引起partition文件的无线扩张,这样对于消息文件的维护以及被消费的消息的清理带来非常大的挑战,所以kafka 以segment为单位又把partition进行细分。每个partition相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中(每个segment文件中的消息不一定相等),这种特性方便已经被消费的消息的清理,提高磁盘的利用率。

  • log.segment.bytes=107370 (设置分段大小),默认是1gb,我们把这个值调小以后,可以看到日志分段的效果

  • 抽取其中3个分段来进行分析

    image

    segment file由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
    segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值进行递增。数值最大为64位long大小,20位数字字符长度,没有数字用0填充

查看segment文件命名规则

通过下面这条命令可以看到kafka消息日志的内容,注意grep必须加-a参数

grep -a 'logId' 00000000000000000000.log

假如第一个log文件的最后一个offset为:5376,所以下一个segment的文件命名为:
00000000000000005376.log。对应的index为00000000000000005376.index

segment中index和log的对应关系

从所有分段中,找一个分段进行分析
为了提高查找消息的性能,为每一个日志文件添加2个索引索引文件:OffsetIndex 和 TimeIndex,分别对应.index以及.timeindex, TimeIndex索引文件格式:它是映射时间戳和相对offset

查看索引内容:

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log

展示如下

offset: 4561 position: 683858
offset: 4566 position: 688769
offset: 4573 position: 693871
offset: 4578 position: 700261
offset: 4583 position: 704568
offset: 4586 position: 714114
offset: 4591 position: 720123
offset: 4594 position: 727926
offset: 4601 position: 733000
offset: 4603 position: 742220
offset: 4605 position: 753894
offset: 4607 position: 764212
offset: 4609 position: 771133
offset: 4614 position: 776029
offset: 4617 position: 780633
offset: 4622 position: 785519
offset: 4628 position: 796098
offset: 4633 position: 1198989
offset: 4637 position: 1204712

index采用稀疏存储的方式,它不会为每一条message都建立索引,而是每隔一定的字节数建立一条索引,避免索引文件占用过多的空间。缺点是没有建立索引的offset不能一次定位到message的位置,需要做一次顺序扫描,但是扫描的范围很小。

image

如图所示,.index文件中存储了索引以及物理偏移量(position),.log文件存储了消息的内容

索引包含两个部分(均为4个字节的数字),分别为相对offset和position。相对offset表示segment文件中的offset,其实就是消息的唯一标识,同一个partition内的消息offset是唯一的,position表示在消息在.log文件中在数据文件中的位置,其实是消息真实的物理偏移地址。

Kafka采用整数值position记录单个分区的消费状态,当消费成功broker收到确认,position指向下一个offset。 由于消息一定时间内不清除,那么可以重置offset来重新消费消息。

在partition中如何通过offset查找message

查找的算法是

  1. 根据offset的值,查找segment段中的index索引文件。由于索引文件命名是以上一个文件的最后一个offset+1进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。
  2. 找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引。(kafka采用稀疏索引的方式来提高查找性能)
  3. 得到position以后,再到对应的log文件中,从position出开始查找offset对应的消息,将每条消息的offset与目标offset进行比较,直到找到消息

比如说,我们要查找offset=2490这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到log文件中,根据49111这个position开始查找,比较每条消息的offset是否大于等于2490。最后查找到对应的消息以后返回

Log文件的消息内容分析

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log | grep position

前面我们通过kafka提供的命令,可以查看二进制的日志文件信息,一条消息,会包含很多的字段。

offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize:
-1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1
sequence: -1 isTransactional: false headerKeys: [] payload: message_5371

offset和position这两个前面已经讲过了、 createTime表示创建时间、keysize和valuesize表示key和value的大小、 compresscodec表示压缩编码、payload:表示消息的具体内容

kafka提供的命令的参数

sh kafka-run-class.sh kafka.tools.DumpLogSegments
Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.
Option                               Description
------                               -----------
--deep-iteration                     if set, uses deep instead of shallow
                                       iteration.
--files <String: file1, file2, ...>  REQUIRED: The comma separated list of data
                                       and index log files to be dumped.
--help                               Print usage information.
--index-sanity-check                 if set, just checks the index sanity
                                       without printing its content. This is
                                       the same check that is executed on
                                       broker startup to determine if an index
                                       needs rebuilding or not.
--key-decoder-class [String]         if set, used to deserialize the keys. This
                                       class should implement kafka.serializer.
                                       Decoder trait. Custom jar should be
                                       available in kafka/libs directory.
                                       (default: kafka.serializer.StringDecoder)
--max-message-size <Integer: size>   Size of largest message. (default: 5242880)
--offsets-decoder                    if set, log data will be parsed as offset
                                       data from the __consumer_offsets topic.
--print-data-log                     if set, printing the messages content when
                                       dumping data logs. Automatically set if
                                       any decoder option is specified.
--transaction-log-decoder            if set, log data will be parsed as
                                       transaction metadata from the
                                       __transaction_state topic.
--value-decoder-class [String]       if set, used to deserialize the messages.
                                       This class should implement kafka.
                                       serializer.Decoder trait. Custom jar
                                       should be available in kafka/libs
                                       directory. (default: kafka.serializer.
                                       StringDecoder)
--verify-index-only                  if set, just verify the index log without
                                       printing its content.

日志的清除策略以及压缩策略

日志清除策略

前面提到过,日志的分段存储,一方面能够减少单个文件内容的大小,另一方面,方便kafka进行日志清理。日志的清理策略有两个:

  1. 根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程
  2. 根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka会启动一个后台线程,定期检查是否存在可以删除的消息

通过log.retention.bytes和log.retention.hours这两个参数来设置,当其中任意一个达到要求,都会执行删除。
默认的保留时间是:7天

日志压缩策略

Kafka还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的key和value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。因此,我们可以开启kafka的日志压缩功能,服务端会在后台启动启动Cleaner线程池,定期将相同的key进行合并,只保留最新的value值。日志的压缩原理是

image

磁盘存储的性能问题

磁盘存储的性能优化

我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka采用顺序写的方式存储数据。即使是这样,但是频繁的I/O操作仍然会造成磁盘的性能瓶颈

零拷贝

消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。

操作系统将数据从磁盘读入到内核空间的页缓存:
▪ 应用程序将数据从内核空间读入到用户空间缓存中
▪ 应用程序将数据写回到内核空间到socket缓存中
▪ 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

image

通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的unix操作系统提供一个优化的代码路径,用于将数据从页缓存传输到socket;在Linux中,是通过sendfile系统调用来完成的。Java提供了访问这个系统调用的方法:FileChannel.transferTo API
使用sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的

image

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,但凡设计到缓存的,基本都是为了提升i/o性能,所以页缓存是用来减少磁盘I/O操作的。
磁盘高速缓存有两个重要因素:
第一,访问磁盘的速度要远低于访问内存的速度,若从处理器L1和L2高速缓存访问则速度更快。
第二,数据一旦被访问,就很有可能短时间内再次访问。正是由于基于访问内存比磁盘快的多,所以磁盘的内存缓存将给系统存储性能带来质的飞越。

当 一 个进程准备读取磁盘上的文件内容时, 操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据, 从而避免了对物理磁盘的I/0操作;如果没有命中, 则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存, 之后再将数据返回给进程。
同样,如果 一 个进程需要将数据写入磁盘, 那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在, 则会先在页缓存中添加相应的页, 最后将数据写入对应的页。 被修改过后的页也就变成了脏页, 操作系统会在合适的时间把脏页中的数据写入磁盘, 以保持数据的 一 致性
Kafka中大量使用了页缓存, 这是Kafka实现高吞吐的重要因素之 一 。 虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的, 但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync),可以通过 log.flush.interval.messages 和 log.flush.interval.ms 参数来控制。
同步刷盘能够保证消息的可靠性,避免因为宕机导致页缓存数据还未完成同步时造成的数据丢失。但是实际使用上,我们没必要去考虑这样的因素以及这种问题带来的损失,消息可靠性可以由多副本来解决,同步刷盘会带来性能的影响。 刷盘的操作由操作系统去完成即可

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容