RabbitMQ使用的一些问题

先说点闲话,这个问题的发生是因为当年我的垃圾代码导致,嗯,垃圾到我自己不想看。刚毕业的年轻人总是想探索一下未知,于是一知半解之下就上了MQ,但是话说回来,这种自讨苦吃是要的,不然真的会一直垃圾下去。

问题的发生

项目中的消息推送使用MQ做了异步处理,有一天消息推送突然中断了,排查了好久,日志也没有,这个也是,嗯,知识太少。好久之后,我想不会是MQ出问题了吧,我一看MQ,果然,这里不是原图,Unacknowledged状态的有8,Ready的有好多,再打开消费端的配置一看,

MQ管理

消费端配置
,qos设置正好为8,通过以上排查基本可以确定队列堵塞是由于消费者线程取走了消息,但是既没有ACK,也没有NACK,这样的消息个数到达Qos设置的值后,队列就会堵塞。

  • 问题代码
@Component
public class MsgQueueListener extends MessageListenerAdapter {

  private static Logger logger = LoggerFactory.getLogger(MsgQueueListener.class);
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Autowired
  private MessageSendFacadeService messageSendFacadeService;
  @Autowired
  private TaskExecutor taskExecutor;
  @Autowired
  private RedisUtil redisUtil;

  @Override
  public void onMessage(final Message message, final Channel channel) throws IOException {
    final WechatTemMessageDTO dto = (WechatTemMessageDTO) rabbitTemplate.getMessageConverter()
        .fromMessage(message);
    try {
     //业务代码处理
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (AnswerException e) {
      if (getRetryCount(message.getMessageProperties() < 3) {
        dto.setErrMsg(e.getErrorMsg());
        //重试次数小于3 ,投递到重试队列
        rabbitTemplate.convertAndSend("*", "*",
            dto, new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = new MessageProperties();
                properties.setHeader("x-orig-routing-key", "*");
                return message;
              }
            });
      } else {
        dto.setErrMsg(e.getErrorMsg());
        rabbitTemplate.convertAndSend("*failed", "*failed", dto,
            new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = new MessageProperties();
                properties.setHeader("x-orig-routing-key", "*");
                return message;
              }
            });
      }
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  }

问题出在catch块,

  1. catch只捕获了业务异常
    对于非业务异常,显然无法捕获,导致消费者没有ack,这段代码主要的原因再调用业务代码的时候,我尽可能的将业务代码捕获的异常转换为了业务异常,但是产生了遗漏,而且这里这样做也有很大的坏处,没有将异常分类,来决定失败消息如何处理,下面再细说。
  2. catch块中可能又会抛出异常
    catch块中抛出的异常显然导致消费者没有ack,也没有finally进行处理,所以消费者慢慢的阻塞。
    解决办法应该是在finally语句中来执行这些操作,消费者从队列中取出消息后,无非是三种处理结果:1、处理成功,这种时候应该用basicAck确认消息;2、可重试的处理失败,这时候应该用basicNack将消息重新入列或者丢入死信队列3、不可重试的处理失败,这时候应该使用basicNack将消息丢弃或者丢入失败队列进行相应的业务操作
  • 正确示例
enum ProcessResult{
//这里只举几个简单的例子
  SUCCESS,  // 处理成功
  RETRY,   // 可以重试的错误
  FAIL,  // 无法重试的错误
}
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        WechatMessageDTO messageDTO = null;
        try {
            messageDTO = rabbitMQService.getMessageBody(message);
        }
        catch (Exception e) {
            logger.error("MQ 数据转换异常",e);
        } finally {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
        ProcessResult result=null;
        try {
            result = messageService.processMsg(messageDTO);
        } catch(UserDefineException e){
          logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
          //根据业务异常类型进行处理
          result = ;
        }catch (Exception e) {
            logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
            result = ;
        } finally {
            postProcessByResult(result);
        }
    }

容易出问题的点

  • 自动ack机制导致消息丢失以及客户端崩溃
    MQ只要确认消息发送成功,无须等待应答就会丢弃消息,虽然自动ack机制可以防止队列阻塞的问题,但是无法得知消费者的处理情况。自动ack没有qos控制,只要客户端队列不为空,则不断推送消息,可能导致消费者假死或者崩溃。
    qos是rabbitMQ一种消费限流的手段,上面提到的prefetch属性指定每个消费者最大的unacked messages数目。消费者每次最多可以取prefetch条消息缓存在客户端,Java客户端内部维护了一个BlockingQueue用来缓存从queue获取的message,默认值会设为Integer.MAX_VALUE,如果不设置qos可能会导致队列不断膨胀,最终OOM;Spring amqp提供了类似的功能,队列的大小是prefetch的大小,默认是1,关于prefetch的设置可以参考Some queuing theory: throughput, latency and bandwidth
    假设有客户端两个消费者线程,prefetch都是10,意味着每个消费者线程每次会从queue中预抓取 10 条消息到本地缓存着等待消费。这里对于MQ来说,客户端只是一个消费者,他们之间建立的Connection(包含多个channel,通常每个消费者线程使用一个)的unacked数变为20,但是对于客户端来说,可能是多个消费者线程,每个channel的unack数量达到prefetch预设值,并且达到最大的最大消费者线程数。便会停止投递新的message到该消费者中直到它发出ack。关于这部分大家可以看我以前写的去了解RabbitMQ连接池,Rabbit将会停止投递新的message到该消费者中直到它发出ack。
  • nack机制导致的死循环
    消息处理失败时使用Nack,等下一次重新消费,导致队列中Ready状态的消息暴增,
  • 启用ack机制但是没有启动qos
    如我上面发生的问题,如果没有qos,消息处理发生异常后,无法ack,队列的Unacked消息数暴涨,导致MQ响应越来越慢,甚至崩溃。

一些消息可靠性保证措施

对于生产者:

  • 发送确认
    实现ReturnCallback接口来得到消息发送失败的原因
  • 发送失败返回
    实现ConfirmCallback接口来确认是否正确到达exchange,当使用confirm时候,如果channel或者connection失败,生产者应该重新发送所有没有来得及提交的数据。但是服务器broker可能已经发送确认数据到生产者了,因此消费者要具有幂等性。
    对于消费者:
  • 通过redelivedred来确认消息是否重复发送
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,366评论 0 1
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,861评论 0 3
  • RabbitMQ的学习笔记 关于RabbitMQ的几个角色如下: 关于名词的通俗解析: 首先我们肯定知道Rabbi...
    ChinaXieShuai阅读 1,490评论 0 2
  • 【六项精进打卡】 2019.4.9日 姓名:陈岗 企业名称:上海孚因流体动力设备股份有限公司 打卡第349天 【知...
    我心依旧_79e2阅读 172评论 0 0
  • 今天FOX放出了DC美剧《哥谭》最终季的终极版预告片,而所有的重要角色全都登场了。 看起来每个角色在最终季里都会有...
    DC中文网阅读 1,529评论 0 0