在使用Message Queue的过程中,总会由于种种原因而导致消息失败。一个经典的场景是一个生成者向Queue中发消息,里面包含了一组邮件地址和邮件内容。而消费者从Queue中将消息一条条读出来,向指定邮件地址发送邮件。消费者在发送消息的过程中由于种种原因会导致失败,比如网络超时、当前邮件服务器不可用等。这样我们就希望建立一种机制,对于未发送成功的邮件再重新发送,也就是重新处理。重新处理超过一定次数还不成功,就放弃对该消息的处理,记录下来,继续对剩余消息进行处理。
ActiveMQ为我们实现了这一功能,叫做ReDelivery(重新投递)。当消费者在处理消息时有异常发生,会将消息重新放回Queue里,进行下一次处理。当超过重试次数时,会给broker发送一个"Poison ack",这个消息被认为是a poison pill(毒丸),这时broker会将这个消息发送到DLQ。
在以下四种情况中,ActiveMQ消息会被重发给客户端/消费者:
- 在一个事务session中,并且调用了session.rollback()方法。
- 在一个事务session中,session.commit()之前调用了commit.close()。
- 在session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了session.recover()方法。
- 在session中使用AUTO_ACKNOWLEDGE签收模式,在异步(messageListener)消费消息情况下,如果onMessage方法异常且没有被catch,此消息会被redelivery。
缺省情况下:持久消息过期,会被送到DLQ,非持久消息不会送到DLQ(不会redelivery)。
可以在connectionFactory中注入自定义的redeliveryPolicy来改变缺省参数:
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--是否在每次尝试重新发送失败后,增长这个等待时间-->
<property name="useExponentialBackOff" value="true"></property>
<!--重发次数,默认为6次-->
<property name="maximumRedeliveries" value="5"></property>
<!--重发时间间隔,默认为1秒-->
<property name="initialRedeliveryDelay" value="1000"></property>
<!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value-->
<property name="backOffMultiplier" value="2"></property>
<!--最大传送延迟,只在useExponentialBackOff为true时有效,当重连间隔大于最大重连间隔时,以后每次重连间隔都为最大重连间隔。-->
<property name="maximumRedeliveryDelay" value="1000"></property>
</bean>
<!-- 在ConnectionFactory中应用这个Policy。 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://188.166.236.173:61616"/>
<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
<!-- <property name="useAsyncSend" value="true"/> 默认就是异步发送-->
</bean>
在ActiveMQ 服务端的conf/activemq.xmlzhong的broker节点下添加:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
<!-- 如果不想将过期消息放到DLQ中
<sharedDeadLetterStrategy processExpired="false" />
-->
<!-- 如果想将非持久消息放入DLQ
<sharedDeadLetterStrategy processNonPersistent="true" />
-->
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
测试会重发消息(redelivery)的四种方法:
在一个事务session中,并且调用了session.rollback():
<bean id="jmsQueueContainerForDLQ" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListenerDLQ" />
<!-- 如果支持事物的话,在接收消息后rollback会重发消息,进入死信队列,默认为false -->
<property name="sessionTransacted" value="true" />
</bean>
public class ConsumerMessageListenerDLQ implements SessionAwareMessageListener<TextMessage> {
public void onMessage(TextMessage message, Session session) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
if ("i want to redelivery".equals(text)){
throw new JMSException("process failed to test redelivery and DLQ");
}
} catch (JMSException e) {
System.out.println("there is JMS exception: " + e.getMessage() );
//throw JmsUtils.convertJmsAccessException(e);
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
在一个事务session中,session.commit()之前调用了commit.close():
<bean id="jmsQueueContainerForDLQ" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListenerDLQ" />
<!-- 如果支持事物的话,在接收消息后rollback会重发消息,进入死信队列,默认为false -->
<property name="sessionTransacted" value="true" />
</bean>
public class ConsumerMessageListenerDLQ implements SessionAwareMessageListener<TextMessage> {
public void onMessage(TextMessage message, Session session) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
if ("i want to redelivery".equals(text)){
throw new JMSException("process failed to test redelivery and DLQ");
}
} catch (JMSException e) {
System.out.println("there is JMS exception: " + e.getMessage() );
//throw JmsUtils.convertJmsAccessException(e);
try {
session.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
在session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了session.recover():
<bean id="jmsQueueContainerForDLQ" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListenerDLQ" />
<!-- 自动应答模式消息不会重发,进入死信队列 -->
<property name="sessionAcknowledgeMode" value="2"/>
</bean>
public class ConsumerMessageListenerDLQ implements SessionAwareMessageListener<TextMessage> {
public void onMessage(TextMessage message, Session session) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
if ("i want to redelivery".equals(text)){
throw new JMSException("process failed to test redelivery and DLQ");
}
} catch (JMSException e) {
System.out.println("there is JMS exception: " + e.getMessage() );
//throw JmsUtils.convertJmsAccessException(e);
try {
session.recover();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
在session中使用AUTO_ACKNOWLEDGE签收模式,异步Listener的onMessage()异常未被捕捉:
public class Listener implements MessageListener {
public void onMessage(Message message) {
int i = 8/0;//会导致redelivery
try {
if(message instanceof ActiveMQTextMessage){
ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
System.out.println("收到的消息:" + textMessage.getText()); }
} catch (Exception e) {
e.printStackTrace();
}
}
}
持久消息过期,会被送到DLQ:
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<property name="messageConverter" ref="messageConverter"></property>
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="deliveryMode" value="2" />
<!-- 2秒后过期,这个对点对点模式有效 -->
<property name="timeToLive" value="2000" />
</bean>
Junit测试:
@Test
public void testDLQ() throws Exception{
//jpaUserService.findOne("3");
for(int i = 1;i<=10;i++){
queueProducer.sendMessages("我是第"+i+"个");
}
Thread.sleep(50000);
System.out.print("全部执行完毕!!!");
}