什么是Topic
Topic 翻译过来是主题的意思,一个 Topic 就像数据库中的一张表。Producer发布的消息可以带有主题,主题相同的消息记录放在一个集合里。
什么是Partition
Partition 是最小的存储单元,每个 Partition 都是一个单独的 log 文件,每条记录都以追加的形式写入。
Topic在Partition里如何体现
如下图,一份topic的数据有三部分组成,下图设置的分片数是2。在下图中一份topic的数据=replica0_1+replica1_1+replica2_1,但是为了保证容错率每一部分的数据需要在另外一台机器(broker)进行备份,比如在Borker1这台机器上replica0_1的数据在Broker2备份,文件名为replica0_2。Producer会经过负载均衡任意选择一个partition写入数据,然后产生副本(replica)。Consumer在消费消息的时候也会根据元数据选择自己访问的topic消息在哪个partition里,然后进行访问操作。
在设置副本时,副本数是必须小于集群的 Broker 数的,副本只有设置在不同的机器上才有作用。
Producer生产消息到Partition保存
每个 Producer 线程都会有一个 RecordAccumulator 对象,它负责缓存要发送 RecordBatch、记录发送的状态并且进行相应的处理,如下图所示,每个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操作(但并不是只有达到这两个条件之一才会被发送)。
如何保证Partition的有序性
上述我们知道消息记录在partition里是有编号,如replica0_1、replica1_1....,因此保证消息的有序性。但在Producer生产消息的时候如何保证消息的有序性呢?
RecordAccumulator类里面的mutePartition() 与 unmutePartition()它们是保证有序性关键之一,相当于topic.lock(),topic.unlock。具体实现代码如下:
private final Set<TopicPartition> muted;
.....
private boolean isMuted(TopicPartition tp) { return muted.contains(tp);}
......
public void mutePartition(TopicPartition tp) { muted.add(tp);}
public void unmutePartition(TopicPartition tp) { muted.remove(tp);}
上述代码使用场景简要概况,如果想要对该topic进行操作,就先调用isMuted看看是否上锁,如果上了锁就不能操作,没上锁就要先调用mutePartition上锁进行操作,操作完调用unmutePartition解锁
有了锁机制就可以保证消息的有序性了