1.在进行负载分配时候,会把不属于的队列进行锁释放,如果释放成功,从待处理队列中移除掉时,
进行新队列加入操作,尝试对新加的队列加锁,如果锁成功,进入创建待处理队列放入,同时创建pullrequest请求放入请求缓存
2.消费线程每隔20秒获取当前所有待处理队列构造成brokername+queue,尝试给master对应队列加锁,所成功设置待处理队列状态为true,未成功或者为false(就是加锁的一个批量处理,拉取数据用)
3.在进行pull拉取数据时,对于顺序消息,判断该队列是否加锁,加锁的话往下进行,如果是首次加锁,pullreuqets的锁状态置为true,同时获取消费进度
4.从broker拉取数据,回调函数中,把数据放入processQueue中(treeMap中),交给消费线程处理,pullreuqets接着放入缓存进行下次拉取
5.消费线程中,获取当前队列锁,判断processQueue是否已加锁,判断待处理队列开始时间距离当前时间是否超过60秒(只有超过才会跳出当前线程,当前队列只有这一个线程能处理)从processQueue中取消费信息(并发是取直接拉取的信息),消费信息从treemap中移除,同时临时map中存有一份待处理消息,调用监听处理逻辑,返回处理状态
6.自动提交:返回成功,直接清除临时map,返回最后偏移量+1,存入消费进度,
挂起的话,判断当前重试次数是否超过,如果未超过或者超过但是发送Ack失败,重试+1,临时map清除,消息放入msgTreeMap,延迟重新消费,
超过并且Ack延迟成功,进入死信队列,清除临时map,返回当前偏移量+1,当消费成功处理
7.非自动提交:返回提交,和自动提交的成功一样,返回回滚,临时map的消息放入msgTreeMap,清除临时map,重新延时消费(相当于本次未消费),挂起,处理方式和上边一样
8.存入消费进度到内存