kafka相关知识

为什么要分区

1.方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以由多个Partition组成,因此整个集群就可以适应任意大小的数据
2.可以提高并发,因为可以以Partition为单位读写了。

分区引发的问题

1.生产者如何将消息分发到不同分区中(生产者的分区分配策略)

 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) 
 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
 public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
 public ProducerRecord(String topic, Integer partition, K key, V value)
 public ProducerRecord(String topic, K key, V value)
 public ProducerRecord(String topic, V value)

(1)可以指定消费哪个分区
(2)没有指明Partition值,但是有key的情况下,将key的hash值与topic的partition值进行取余得到partition值
(3)既没有partition值有没有key值的情况下,第一次调用时会随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总结书取余得到partition值,也就是常说的round-robin算法

2.消费者组中,怎么指定哪个partition由哪个consumer来消费(消费者的分区分配策略)
(1)RangeAssignor(范围分区)
(2)RoundRobinAssignor(轮询分区)
(3)StrickyAssignor 分配策略
(4)可以指定消费哪个分区

//消费指定分区的时候,不需要再订阅 
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消费指定的分区 
TopicPartition topicPartition=new TopicPartition(topic,0); 
kafkaConsumer.assign(Arrays.asList(topicPartition));

producer如何最大化确保消息发送到broker上不被丢失且不重复

在介绍怎么保证消息不丢不重之前,先介绍一些概念

生产者ack应答机制
 //设置ack应答,分为0,1,-1(all)
 properties.put(ProducerConfig.ACKS_CONFIG,"all");

acks参数配置
0: (At Most Once)producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据
1: producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么会丢失数据;
all( At Least Once):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复;

partition副本

每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志。

** 通过下面的命令去创建带2个副本的topic
sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic
幂等性

所谓的幂等性就是指Producer不论向server发送多少次的重复数据,server端都只会持久化一次,幂等性结合At Least Once,就不会造成数据重复;

 //启用幂等性
 properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");

开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带Sequence Number,而broker端会对<PID,Partition,SeqNumner>做缓存,当具有想沟通主键的消息提交时,broker只会持久化一条。

但是Producer重启就会变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once

事务

幂等性只能保证单分区单会话内数据只持久化一条,对于跨分区跨会话


image.png

事务性事例:
Kafka 事务性的使用方法也非常简单,用户只需要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,然后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort,示例如下所示:

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();

try {
 String msg = "matt test";
 producer.beginTransaction();
 producer.send(new ProducerRecord(topic, "0", msg.toString()));
 producer.send(new ProducerRecord(topic, "1", msg.toString()));
 producer.send(new ProducerRecord(topic, "2", msg.toString()));
 producer.commitTransaction();
} catch (ProducerFencedException e1) {
 e1.printStackTrace();
 producer.close();
} catch (KafkaException e2) {
 e2.printStackTrace();
 producer.abortTransaction();
}
producer.close();

参考:http://matt33.com/2018/11/04/kafka-transaction/#%E4%BA%8B%E5%8A%A1%E6%80%A7%E7%A4%BA%E4%BE%8B

producer如何最大化确保消息发送到broker上不被丢失且不重复

副本+ack、幂等性,事务

1.每个partition设置多个副本
2.acks设置为all
3.幂等性虽然acks设置为all,但是可能数据重复,所以引入了幂等性
At Least Once + 幂等性 = Exactly Once
4.对于跨分区跨会话就不能用幂等性,要用事务

consumer如何最大化确保消息发送到broker上不被丢失且不重复

由自动提交改为手动提交

ISR

ISR表示目前“可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集”。怎么去理解
可用和相差不多这两个词呢?具体来说,ISR集合中的副本必须满足两个条件

  1. 副本所在节点必须维持着与zookeeper的连接
  2. 副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值
    (replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内一直没有追
    上过leader的所有消息,则该follower就会被剔除isr列表
  3. ISR数据保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state
    节点中
    follower副本把leader副本LEO之前的日志全部同步完成时,则认为follower副本已经追赶上了leader
    副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafk副本管理器会启动一个副本过期检
    查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数
    replica.lag.time.max.ms 的值,如果大于,则会把这个副本踢出ISR集合
image.png
LEO和HW
image.png

image.png
控制器

控制器其实就是一个 broker, 只不过它除了具有一般 broker 的功能之外, 还负责分区
首领的选举。集群里第一个启动的 broker 通过在 Zookeeper 里创建一个临时节点/controuer
让自己成为控制器。 其他 broker 在启动时也会尝试创建这个节点,不过它们会收到一个“节
点已存在”的异常,然后“意识”到控制器节点已存在, 也就是说集群里已经有一个控制器了 。
其他 broker 在控制器节点上创建 Zookeeperwatch 对象,这样它们就可以收到这个节点的变
更通知。这种方式可以确保集群里一次只有一个控制器存在。
如果控制器被关闭或者与 Zookeeper 断开连接, zookeeper 上的临时节点就会消失。 集
群里的其他 broker 通过 watch 对象得到控制器节点消失的通知, 它们会尝试让自己成为新
的控制器。 第一个在 Zookeeper 里成功创建控制器节点的 broker 就会成为新的控制器, 其
他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建 watch 对象。
当控制器发现一个 broker 已经离开集群,它就知道,那些失去首领的分区需要一个新首
领 (这些分区的首领刚好是在这个 broker 上)。 控制器遍历这些分区, 并确定谁应该成为新
首领 (简单来说就是分区副本列表里的下一个副本) , 然后向所有包含新首领或现有跟随者
的 broker 发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首
领开始处理来自生产者和消费者的情求,而跟随者开始从新首领那里复制消息。
当控制器发现一个 broker 加入集群时, 它会使用 broker ID 来检査新加入的 broker 是
否包含现有分区的副本。 如果有, 控制器就把变更通知发送给新加入的 broker 和其他
broker, 新 broker 上的副本开始从首领那里复制消息。
简而言之, Kafka 使用 Zookeeper 的临时节点来选举控制器,并在节点加入集群或退出集
群时通知控制器。 控制器负责在节点加入或离开集群时进行分区首领选举。

群组协调器

用于消费者分区再分配。
消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消
费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后,
群组把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者
只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程在每次再
均衡时都会发生。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Design 1. Motivation 我们设计Kafka用来作为统一的平台来处理大公司可能拥有的所有实时数据源...
    BlackManba_24阅读 5,439评论 0 8
  • 1 Kafka概述 1.1 定义 Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领...
    djm猿阅读 3,892评论 0 4
  • Kafka 是一个java开发的mq中间件,依赖于zookeper,有高可用,高吞吐量等特点。 优势 可靠性:pa...
    何笙阅读 15,127评论 1 9
  • 一、概述 Kafka是一个具有高吞吐量,高拓展性,高性能和高可靠的基于发布订阅模式的消息队列,是由领英基于Java...
    服务端开发阅读 4,174评论 1 5
  • 2月26日是一个很有压力的日子,因为,在那一天,我迎来两门新课程,其中一门,就是以作业量大以强度高著称的自诩...
    笨鱼森森阅读 2,144评论 0 1