学习RocketMQ的AsyncTraceDispatcher并发处理队列任务的技巧

RocketMQ提供消息轨迹追踪的功能特性,用来记录消息发送前后和消息消费前后的消息消息状态、消息流向、耗时等等数据,我们可以在console中查询这些数据,进而跟踪消息的轨迹。

对于消息轨迹数据处理:RockerMQ会将收集到的数据异步将数据提交给Producer,让其发送到特定topic的broker的。这个过程主要通过AsyncTraceDispatcher来实现。里面涉及多线程并发处理批量处理阻塞等待流量控制等等编程技巧是非常值得我们去学习。

注意:下面描述生产者和消费者,只是针对阻塞队列traceContextQueue角度来说的,非RocketMQ的Producer和Consumer。

  • 主要成员变量
public class AsyncTraceDispatcher implements TraceDispatcher {
    // 消息发送者,队列消费者
    private final DefaultMQProducer traceProducer;
    // 线程池
    private final ThreadPoolExecutor traceExecutor;
    // 阻塞队列,用来接收队列生产者发送的数据
    private final ArrayBlockingQueue<TraceContext> traceContextQueue;
    // 丢失任务数量
    private AtomicLong discardCount;
    // 单一线程,用来不断提取阻塞队列中的任务
    private Thread worker;
}
  • 构造器(完成成员变量的初始化)
public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
    this.discardCount = new AtomicLong(0L);
    this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
    this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
    this.traceExecutor = new ThreadPoolExecutor(10, 20, 1000 * 60, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_"));
    traceProducer = getAndCreateTraceProducer(rpcHook);
}
  • start,启动

利用CAS机制避免start()方法重复执行,然后启动一个后台线程,用来对阻塞队列任务的提取。

// 原子类标志位
private AtomicBoolean isStarted = new AtomicBoolean(false);

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    // 启动标志位
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    // 启动后台单一线程
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}
  • 接收生产数据

生产者不断生产的数据,直接往阻塞队列添加任务,添加失败则记录丢失数量。

public boolean append(final Object ctx) {
    boolean result = traceContextQueue.offer((TraceContext) ctx);
    if (!result) {
        log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
    }
    return result;
}

RocketMQ设置阻塞队列大小固定1024,使用offer()方法,队列已满情况下就不能往队列中添加任务。这种设计说明该种任务不重要,当处理任务繁忙时,可丢弃不重要的任务以达到流量控制的目的。

  • flush(流量控制)

调用flush方法,在500毫秒内不断竞争阻塞队列的锁(与提取队列任务给Producer持有同一把锁),主要目的是让提交给Producer的任务放缓一点,避免一直被写入Producer,导致消息发送无法返回。这也是流量控制的一种方式。

public void flush() {
    // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
    long end = System.currentTimeMillis() + 500;
    
    while (System.currentTimeMillis() <= end) {
        synchronized (traceContextQueue) {
            if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
                break;
            }
        }
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            break;
        }
    }
    log.info("------end trace send " + traceContextQueue.size() + "   " + appenderQueue.size());
}
  • 处理队列任务

while(!stopped)时,不断地向队列中poll任务,并提交到一个list。当list满足batchSize后,就提交给线程池处理。

// AsyncTraceDispatcher的停止标志位,volatile修饰
private volatile boolean stopped = false;

class AsyncRunnable implements Runnable {
    private boolean stopped;

    @Override
    public void run() {
        while (!stopped) {
            // 批处理list
            List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
            synchronized (traceContextQueue) {
                for (int i = 0; i < batchSize; i++) {
                    TraceContext context = null;
                    try {
                        //get trace data element from blocking Queue - traceContextQueue
                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                    if (context != null) {
                        contexts.add(context);
                    } else {
                        break;
                    }
                }
                if (contexts.size() > 0) {
                    // 将批量list提交到线程池批量消费
                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
                    traceExecutor.submit(request);
                } else if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }
        }
    }
}
  • shutdown
public void shutdown() {
    this.stopped = true;
    flush();
    this.traceExecutor.shutdown();
    if (isStarted.get()) {
        traceProducer.shutdown();
    }
    this.removeShutdownHook();
}

public void removeShutdownHook() {
    if (shutDownHook != null) {
        try {
            Runtime.getRuntime().removeShutdownHook(shutDownHook);
        } catch (IllegalStateException e) {
            // ignore - VM is already shutting down
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容