消息中间件能做什么
消费中间件用来解决分布式系统之间消息传递,从而实现应用程序之间的协同如异步化处理。
Java中使用kafka进行通信
同步发送和异步发送
异步发送
当消息发送到buffer队列时立即返回;当消息被确认后发生回调。
producer.send(new ProducerRecord<>(topic, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});
批量发送
batch.size:控制批量提交的字节数大小,当批量的消息大小超过这个大小,会统一发送。
linger.ms:控制发送时间的间隔,当批量的消息间隔超过这个值,会统一发送。
当两个都配置的话,只需要满足一个就批量发送。
同步发送
异步发送返回后阻塞,等消息完成则继续执行
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, msg)).get()
基础配置分析
group.id
对消费者进行分组,同一个消费组只能有一个消费者来消费消息,不同组可以同时消费同一个消息。
enable.auto.commit
只有提交后的消息,下次才不能被接受到。
enable.auto.commit表示是否允许自动提交,若允许则可以通过auto.commit.interval.ms来控制提交频率;若不允许则可以通过consumer.commitSync()来手动提交
auto.offset.reset
针对有新groupid的消费者来消费指定的topic
auto.offset.reset=latest,新的消费者从其他消费者最后一个offset开始消费topic下的消息
auto.offset.reset= earliest:新的消费者从该topic最早的消息开始消费。
max.poll.records
限制每次调用poll返回的消息数
Topic和Partition
Topic
表示消息类别,是逻辑上的概念。
Partition
每个topic分为多个Partition,同一个topic下不同分区的消息是不同的,Offset表示消息在分区的偏移量即位置。
存储
以文件形式存储,命名规则是<topic_name>-<partition_id>
关于消息分发
自定义Partitioner
通过实现Partitioner来自定义分发策略
消息默认的分发机制
若key有值,则采用key的hash取模的分区算法;如果key为null,则metadata.max.age.ms范围内随机选择一个。这个值是默认十分钟更新一次。
消费端如何消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0);
consumer.assign(Arrays.asList(topicPartition));
消息的消费原理
kafka消息消费原理演示
3个消费者3个分区
consumer1会消费partition0分区、consumer2会消费partition1分区、consumer3会消费 partition2分区
3个partiton对应2个consumer
consumer1会消费partition0/partition1分区、consumer2会消费partition2分区
3个partition对应4个或以上consumer
仍然只有3个consumer对应3个partition,其他的consumer无法消费消息
consumer和partition的数量建议
1 若consumer大于partition是浪费,因为一个分区只能被一个消费者消费,从而导致消费者空闲
2 若consumer小于等于partition,为了分配均匀,最好保证partition是consumer的整数倍。
3 若consumer只能保证一个partition的顺序是有序的,从多个partition因为读取数据是不一样的,从而使无序的。
什么时候会触发分区策略(rebalance)
1 消费者的数量发生变化:如新增一个消费者或去掉一个消费者
2 分区的数量发生变化
分区分配策略
定义
对同一个topic,同一个消费组,如何分配消费者和分区的对应关系,可以通过PARTITION_ASSIGNMENT_STRATEGY_CONFIG来指定分区策略
RangeAssignor(范围分区)
前提:同一个topic下
1 对分区按序号排序,对消费者按字母顺序排序
2 前m个消费者每个分配n+l个分区,后面的(消费者数量-m)个消费者每个分配n个分区:n = 分区数/消费者数量,m= 分区数%消费者数量
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C3-0 将消费 7, 8, 9 分区
弊端是一个topic时,c1-0多分配一个分区,若多个topic时,c1-0就会多分配一个分区。
RoundRobinAssignor(轮询分区)
1 把分区和消费者分别按照haseCode排序
2 通过轮询算法分配分区给消费者
假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;
StrickyAssignor 分配策略
1 分区的分配尽可能的均匀,分区的分配尽可能和上次分配保持相同
2 当两者发生冲突时, 第 一 个目标优先于第二个目标
执行Rebalance以及管理consumer的group
coordinator(协调者)来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。
如何确定coordinator
消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator。
JoinGroup的过程
join阶段:消费者向coordinator发送joinGroup请求,coordinator选举一个cosumer为leader,并把组成员信息和订阅信息发送给消费者
1 选举算法:若消费组没有leader,则选择第一个加入消费组为消费者leader,若这是leader挂了,则随机选举一个leader。
2 每个消费者都可以设置自己的分区策略,coordinator会根据组内消费者的投票解决来实现消费组的分区分配。
Sync阶段:leader将消费者对应的partition分配方案同步给consumer group 中的所有consumer。
如何保存消费端的消费位置
什么是offset
offset表示分区中消息的唯一编号
offset在哪里维护
1 将offset信息保存在consumer_offsets_*的文件
2 通过 Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount 来计算位移在那个分区里; 由于默认情况下groupMetadataTopicPartitionCount有50个分区。
分区的副本机制
通过副本机制实现冗余,一个分区存在多个副本,leader副本实现所有请求的读写,剩余的是follower副本,主要从leader副本中同步消息日志。
可以通过replication-factor来创建副本,通过get /brokers/topics/secondTopic/partitions/1/state来获取各个分区中的leader是什么。
副本的leader选举
略