在使用Spring boot2 RabbitMQ的时候,如果不设置spring.rabbitmq.lisener.simple.acknowledge-mode
默认的方式是auto
,在发生错误的时候会把消息扔回MQ中,然后MQ会再次调用consumer,就会造成不断的报错,这种方式显然不可取。如果设置为manual
,就要手动去ack的,每个consumer都要去异常处理,然后把再去手动ack,这种方式太麻烦,重复代码过多,不是我想要的方式。
Spring有一个retry的包,在spring-boot-starter-amqp
中对retry进行了实现,可以利用retry的特性对MQ进行出错后的处理
下边的代码是当retry结束后,还失败,你需要进行的处理,比如放在另外一个队列中,或者记录到库里
import com.alibaba.fastjson.JSON;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import java.util.Map;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer.*;
/**
* @author Jeff
* created on 2019-03-22 10:24
*/
public class RetryMessageRecoverer implements MessageRecoverer {
@Override
public void recover(Message message, Throwable cause) {
//do some things you want
}
}
在下边的代码里设置mq的重试机制,这里是重试五次,注意:这里的重试只是在当前线程里的重试,就是让线程休眠了,不是重回到broke
@Bean
public RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder
.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 5000)
.recoverer(new RetryMessageRecoverer(redisService, rabbitTemplate))
.build();
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter);
factory.setAdviceChain(interceptor());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setPrefetchCount(4);
factory.setConcurrentConsumers(4);
factory.setMaxConcurrentConsumers(8);
factory.setDefaultRequeueRejected(false);
return factory;
}