kafka的基本概念

一、基本概念

1、broker

broker 指的一个kafka服务器,一个kafka集群是由多个 kafka broker 组成。

2、producer

producer 指的是消息生产者,即发送消息到 kafka broker 的客户端。

3、consumer

consumer 指的消息消费者,即从 kafka broker 获取消息的客户端。

4、cousumer group

consumer group 指的是消费者组,拥有相同的 group id 的消费者构成一个消费者组。

  1. 消费者组与消费者之间互不影响。
  2. 消费者组内的每个消费者负责消费不同的分区的数据。
  3. 一个分区只能有同一个分区中的一个消费者消费

5、topic

topic 指的是主题。生产者生产消息、消费者消费消息,都需要指定一个topic。

6、partition

partition 指的是分区。

  1. 一个 topic 可以存在多个分区。
  2. 一个分区,只能被同一个消息者组中的某一个消费者消费。
  3. 每个分区上的消息都是有序的,但是主题(topic)上的消息不是有序的。
  4. 每个分区可能存在多个follower,其中负责读/写的是leader
  5. 每个kafka broker可以是当前分区的leader,也可以是其它分区的follower
  6. 多个分区可以提高程序的并发性,因为一个分区只能一个分区消息,多个分区可以多个消费者同时消费。

7、replicas

replicas 指的是副本。数据冗余,保证集群的可用性。

  1. leader 指的是topic主题的分区中,每个分区的 。生产者发送数据、消费者消费数据都是从 leader分区 中操作的。

  2. follower 指的是 topic主题的分区中,每个分区的 。负责从leader分区同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从 ISR 中进行选举。设置unclean.leader.election.enable = true可以从非ISR节点中进行选举,这样可以导致丢失数据,默认值是false,建议是false

  3. ISR(in-sync replicas)

    1. 与leader保持同步的follower集合
    2. 如果follower在超过 replica.lag.time.max.ms 毫秒,没有与leader进行同步,则踢出ISR。
      replica.lag.time.max.ms
  4. OSR(out-sync replicas)

    1. 落后leader太多的副本
  5. AR

    1. ISR + OSR

是否允许非ISR的副本参与选举。

unclean.leader.elect的解释ion.enable

二、生产者分区策略

即生产者,发送发送消息后,该消息保存到kafka broker上的topic上的那个partition上。

1、默认分区策略

默认的分区策略使用的是 org.apache.kafka.clients.producer.internals.DefaultPartitioner 这个类。
即我们代码中使用org.apache.kafka.clients.producer.ProducerRecord发送消息时。

  1. 在指定了 partition 的情况下,直接使用指定的 分区。
  2. 没有指定 partition 但指定了key的情况下,将key的hash值与topic的partition
    数进行取余得到partition值。
  3. 没有指partitionkey,在一个批次满的时候会发送到同一个分区,当一个新的批次创建时,会发送到另外的分区中。可以通过 KIP-480 获取更多的粘性分区知识。

2、自定义分区策略

  1. 实现 org.apache.kafka.clients.producer.Partitioner 接口。
  2. 代码实现
Properties prop = new Properties();
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全路径");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

三、生产者 acks 应答机制

1、acks = 0

producer不等待broker返回ack信息,只是将消息加入到socket的缓冲中,可能还未发送出去。

  1. 重试(retries)不会生效。
  2. 为每个记录返回的偏移量始终是-1
  3. 可能会存在丢失数据的情况。

2、acks = 1

producer等待broker返回ack信息,只要这个partitionleader将消息成功保存后返回ack

  1. 如果leader返回ack后,但是还未同步给follower,此时leader宕机了,可能会出现丢失数据的情况。

3、acks = [all | -1]

producer等待broker返回ack信息,partitionleaderfollower全部将消息保存成功后,才返回ack信息。

  • 如果在leaderfollower将消息都保存完毕,但是在返回ack之前,leader宕机了,此时可能导致 消息重复

  • 此处指的是 所有的follower 都不同完,而不是半数以上同步完,才发送ack。

    - 因为,当需要容忍 `n` 台节点故障时,`半数以上` 需要`2n+1`个副本,而`所有follower`都同步完,只需要`n+1`个副本。
    

四、日志文件的 HW 和 LEO

1、HW

HW : 即 High Watermark,指的是该分区(partition)中所有副本(leader+follower)中最小的LEO

  • 上面中所有的副本:指的是 该分区的 ISR集合。
  • HW 之前的消息是可消费的,比如 HW=5,那么可以消费的 offset 是 [0-4] 不包括5.

2、LEO

LEO :即 Log End Offset,即当前日志文件中,下一条消息待写入的 offset。

3、举例说明

比如,一个分区下有3个ISR,一个leader和2个follower。
leader 的leo 是 6
follower01 的leo是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的leo。

HW和LEO举例说明

五、消费者的分区策略

我们知道,我们的消费者(consumer)是隶属于消费者组(consumer group)的,消费者组中可以存在多个消费者,每个消费者可以消费多个主题(topic),每个主题又存在多个分区(partition),每个分区只能由一个消费者来消费,每个消费者又可以消费多个分区。那么就必然涉及到 如何将某个分区分配给那个消费者消费

1、RangeAssignor - 基于订阅的 topic来分配
     默认分区策略
2、RoundRobinAssignor - 基于consumer group来分配,可能出现误消费别的主题的情况

eg:
     topicA 存在 0,1,2 三个分区
     topicB 也存在 0,1,2三个分区
consumerA 和 consumerB 同属于一个组
    consumerA 订阅 topicA
    consumerB 订阅 tocpiA 和 topicB
此时按照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生6个主题分区对象(`TopicPartition`),因为topicA和topicB 每个主题共有3个分区。此时就有可能将topicB的消息发送给了cousumerA,导致消费错误。

3、StickyAssignor -
4、CooperativeStickyAssignor -

六、消费者offset的维护

1、自动提交offset

Properties prop = new Properties();
// 设置自动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 自动提交offset的间隔,单位ms
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);

2、手动提交offset

consumer.commitAsync() : 异步提交
consumer.commitSync() : 同步提交

Properties prop = new Properties();
// 设置手动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList("topicA", "topicB"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("offset: " + record.offset() + "value:" + record.value());
    }
    // 异步提交offset
    consumer.commitAsync();
    // 同步提交offset
    // consumer.commitSync();
}

3、自定义提交offset

需要实现 org.apache.kafka.clients.consumer.ConsumerRebalanceListener 接口,在消费者发生 rebalance 时,保存或获取自定义的offset。

手动提交offset

七、拦截器

1、生产者拦截器

实现org.apache.kafka.clients.producer.ProducerInterceptor接口。可以实现消息发送到kafka broker之前的消息拦截。

1、onSend(ProducerRecord<K, V> record)方法
该方法运行在主线程中,我们可以在该方法中对消费进行任何操作,但是最好不要修改topicpartition,否则可能影响消息目标分区的计算。

2、onAcknowledgement(RecordMetadata metadata, Exception exception)
该方法运行在producer 的 I/O线程中,因此不要执行一些比较耗时的操作,否则会拖慢producer的发送消息的速度。该方法在消息发送kafka broker之后返回ack之后执行或发送到kafka broker的过程中发生异常执行。

需要自己保证实现的拦截器的线程安全问题。

Properties prop = new Properties();
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ProducerInterceptor类的全类名"));
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

2、消费者拦截器

实现org.apache.kafka.clients.consumer.ConsumerInterceptor类,可以对从kafka broker获取到的消息进行拦截。和consumer#poll运行在同一个线程中。

Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ConsumerInterceptor类的全类名"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

八、参考文档

1、http://kafka.apache.org/documentation/#gettingStarted

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • 1. Kafka是什么? Kafka最初由LinkedIn公司使用Scala开发的一个分布式、多分区、多副本,基于...
    wanting1024阅读 225评论 0 1
  • kafka的基本概念 kafka是什么 ? ​ kafka是一个多分区、多副本且基于zookeeper协调的分...
    昂迪梵德阅读 192评论 0 0
  • 1. 什么是Kafka 很多人常常把Kafka和消息队列相提并论,事实上这是一个错误的概念。 根据官网的解释,它的...
    ElliotG阅读 827评论 0 0
  • 应用场景 日志聚合,一般kafka 使用来记录日志信息。 限流削峰,当大量数据同时请求到服务的时候,可以造成服务宕...
    进击的辉仔阅读 168评论 0 0
  • 应用场景 日志聚合,一般kafka 使用来记录日志信息。 限流削峰,当大量数据同时请求到服务的时候,可以造成服务宕...
    进击的辉仔阅读 331评论 0 0