RocketMQ
提供消息轨迹追踪的功能特性,用来记录消息发送前后和消息消费前后的消息消息状态、消息流向、耗时等等数据,我们可以在console中查询这些数据,进而跟踪消息的轨迹。
对于消息轨迹数据处理:RockerMQ
会将收集到的数据异步将数据提交给Producer,让其发送到特定topic的broker的。这个过程主要通过AsyncTraceDispatcher
来实现。里面涉及多线程,并发处理,批量处理,阻塞等待,流量控制等等编程技巧是非常值得我们去学习。
注意:下面描述生产者和消费者,只是针对阻塞队列traceContextQueue
角度来说的,非RocketMQ
的Producer和Consumer。
- 主要成员变量
public class AsyncTraceDispatcher implements TraceDispatcher {
// 消息发送者,队列消费者
private final DefaultMQProducer traceProducer;
// 线程池
private final ThreadPoolExecutor traceExecutor;
// 阻塞队列,用来接收队列生产者发送的数据
private final ArrayBlockingQueue<TraceContext> traceContextQueue;
// 丢失任务数量
private AtomicLong discardCount;
// 单一线程,用来不断提取阻塞队列中的任务
private Thread worker;
}
- 构造器(完成成员变量的初始化)
public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
this.traceExecutor = new ThreadPoolExecutor(10, 20, 1000 * 60, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
- start,启动
利用CAS
机制避免start()方法重复执行,然后启动一个后台线程,用来对阻塞队列任务的提取。
// 原子类标志位
private AtomicBoolean isStarted = new AtomicBoolean(false);
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
// 启动标志位
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
// 启动后台单一线程
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
- 接收生产数据
生产者不断生产的数据,直接往阻塞队列添加任务,添加失败则记录丢失数量。
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}
RocketMQ
设置阻塞队列大小固定1024,使用offer()方法,队列已满情况下就不能往队列中添加任务。这种设计说明该种任务不重要,当处理任务繁忙时,可丢弃不重要的任务以达到流量控制的目的。
- flush(流量控制)
调用flush方法,在500毫秒内不断竞争阻塞队列的锁(与提取队列任务给Producer持有同一把锁),主要目的是让提交给Producer的任务放缓一点,避免一直被写入Producer,导致消息发送无法返回。这也是流量控制的一种方式。
public void flush() {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
while (System.currentTimeMillis() <= end) {
synchronized (traceContextQueue) {
if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
break;
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());
}
- 处理队列任务
while(!stopped)
时,不断地向队列中poll任务,并提交到一个list。当list满足batchSize后,就提交给线程池处理。
// AsyncTraceDispatcher的停止标志位,volatile修饰
private volatile boolean stopped = false;
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
// 批处理list
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
synchronized (traceContextQueue) {
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue - traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
// 将批量list提交到线程池批量消费
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
}
- shutdown
public void shutdown() {
this.stopped = true;
flush();
this.traceExecutor.shutdown();
if (isStarted.get()) {
traceProducer.shutdown();
}
this.removeShutdownHook();
}
public void removeShutdownHook() {
if (shutDownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(shutDownHook);
} catch (IllegalStateException e) {
// ignore - VM is already shutting down
}
}
}