当我们想通过多条线程处理activemq中的消息,直觉上会使用固定大小线程池去处理,然而这种方式并不妥当,这么做我们只是将消息从activemq转移到线程池的阻塞队列之中,当线程池开始工作,activemq中的消息快速被消费完毕,而消息所代表的任务却并未真正被处理, 他们被堆积在处理程序的内存中,并陆续由线程中的线程处理。这会产生副作用,此时当处理程序因为某种原因而崩溃,这些待处理的任务都将丢失。
如何实现既能通过多个线程处理任务,又能保证未完成的任务的安全性,此时 SynchronousQueue 就有了用武之地。
我们可以把SynchronousQueue 当作长度为1的阻塞队列,当队列被塞入一个元素,假如这个元素未被消费掉,那么后续的塞入操作将被阻塞。我们可以利用它的这个特性,把它当作是activemq与处理线程之间的缓冲层。在 SynchronousQueue 的一端,我们从activemq中读取一个元素,并将它put进SynchronousQueue 。在另一端,多条线程分别从 SynchronousQueue 中 take 元素进行处理,只有当 SynchronousQueue 中不存在任何元素,也就是线程们将当前的任务都处理完毕,还有一端的从activemq中提取消息的操作才能执行,反之则将被阻塞。 通过这种方式,我们便能保证任务不丢失的同时又能通过多线程处理它们。示例代码如下
初始化一个 SynchronousQueue
private SynchronousQueue<ActiveMQObjectMessage> synchronousQueue = new SynchronousQueue<>();
从activemq中将消息转移至synchronousQueue,一次转移一条,如果上一条未被处理,下一条不能继续
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
.DEFAULT_PASSWORD, brokerUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(dest);
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
try {
Message message = consumer.receive();
if (message instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) message;
synchronousQueue.put(activeMQObjectMessage);
} else {
if (message != null) {
message.acknowledge();
logger.error("消息格式错误,msg={}",message.toString());
}
}
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
开启多条线程同时处理消息
Runnable task = () -> {
while (true) {
try {
ActiveMQObjectMessage activeMQObjectMessage = synchronousQueue.take();
//消费消息,处理成功后确认
boolean complete = handle(msg);
if (complete) {
activeMQObjectMessage.acknowledge();
}
} catch ( JMSException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < threads; i++) {
Thread thread = new Thread(task);
thread.setName("log-task-" + i);
thread.start();
}