RabbitMQ和kafka对于消费失败处理总结

一、kafka

1、kafka消息模型总结

发送消息到topic,每个topic可以分成多个Partition,每个Partition对用一个消费者消费,属于无状态消息,Partition每个消息对应唯一的offset,通过zk保存信息,消费端维护offset,持久化属于日志型持久话默认七天删除消息。


kafka
2、消费失败处理方案(个人思考)

业务处理异常时,暂不提交offset,利用数据库(关系型或非关系型)保存失败的消息记录,根据失败策略处理相应消息。保存好记录之后可以提交offset。
失败处理可以隔五分钟再往对应的消息队列发送该消息(发送成功就次数+1,将消息id也传入消息队列,方便记录失败次数)复杂情况可能需要记录消息失败的次数,到达一定次数后,改为手工处理

3、保证不丢失消息处理

参考:https://www.jianshu.com/p/7a6deaba34d2

一般是要求起码设置如下4个参数:
1、给topic设置replication.factor参数:
这个值必须大于1,要求每个partition必须有至少2个副本在kafka服务端
2、设置min.insync.replicas参数:
这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧在producer端
3、设置acks=all:
这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了在producer端
4、设置retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了我们生产环境就是按照上述要求配置的,这样配置之后,至少在kafka broker端就可以保证在leader所在broker发生故障,进行leader切换时,数据不会丢失

二、RabbitMQ

1、消费失败处理方案
(1)相关配置
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        retry:                    #抛异常会按retry策略重发(建议不在程序内抛异常,记录失败消息,然后确认)
          enabled: true           #允许重发
          max-attempts: 5         #重发次数
          initial-interval: 30000 #重发间隔时间
        acknowledge-mode: manual  #不确认宕机重启时会重新消费,确认失败会一直重发
    publisher-confirms: true      # 如果消息没有到exchange,则confirm回调,ack=false,
                                  # 如果消息到达exchange,则confirm回调,ack=true
    publisher-returns: true       #exchange到queue成功,则不回调return
                                  #exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

消息手动确认模式的几点说明:
1、监听的方法内部必须使用channel进行消息确认,包括消费成功或消费失败
2、如果不手动确认,也不抛出异常,消息不会自动重新推送(包括其他消费者),因为对于rabbitmq来说始终没有接收到消息消费是否成功的确认,并且Channel是在消费端有缓存的,没有断开连接
3、如果rabbitmq断开,连接后会自动重新推送(不管是网络问题还是宕机)
4、如果消费端应用重启,消息会自动重新推送
5、如果消费端处理消息的时候宕机,消息会自动推给其他的消费者
6、如果监听消息的方法抛出异常,消息会按照listener.retry的配置进行重发,但是重发次数完了之后还抛出异常的话,消息不会重发(也不会重发到其他消费者),只有应用重启后会重新推送。因为retry是消费端内部处理的,包括异常也是内部处理,对于rabbitmq是不知道的(此场景解决方案后面有)
7、spring.rabbitmq.listener.retry配置的重发是在消费端应用内处理的,不是rabbitqq重发

(2)方案描述

参考:https://my.oschina.net/dengfuwei/blog/1595047

消费确认机制改为manual手动确认,在消费方法中try catch中,记录消费失败的消息,然后basicAck确认,通过自定义重试策略取出失败的消息重新消费,失败达到一定次数手动处理
需要注意的 basicAck 方法需要传递两个参数:
(1)deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
(2)multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

2、保证消息不丢失

失败重发参考:https://www.cnblogs.com/xujishou/p/6288623.html

1、设置消息持久化
2、利用confirm模式,发送失败重新放送
(很多帖子说,confirm模式但是confirm回调测试没有消息数据无法重发,建议:https://www.cnblogs.com/xujishou/p/6288623.html)
3、return exchange到队列失败回调,可以获取到消息相关消息可重发

confirm模式 重发消息,生成CorrelationData,重新发送

private CorrelationData getCorrelationData(String exchange, String routeKey, byte[] body) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setReceivedExchange(exchange);
        messageProperties.setReceivedRoutingKey(routeKey);
        Message message = new Message(body, messageProperties);
        CorrelationData correlationData = new CorrelationData();
        correlationData.setReturnedMessage(message);
        return correlationData;
    }

@Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("confirm消息发送成功:" + cause);
        } else {
            String msg = new String(correlationData.getReturnedMessage().getBody());
            System.out.println("confirm消息发送失败:" + msg);
            MessageProperties messageProperties = correlationData.getReturnedMessage().getMessageProperties();
            rabbitTemplate.convertAndSend(messageProperties.getReceivedExchange(),
                    messageProperties.getReceivedRoutingKey(),
                    correlationData.getReturnedMessage(),
                    correlationData);
        }
    }
publisher-confirms: true      # 如果消息没有到exchange,则confirm回调,ack=false,
                                  # 如果消息到达exchange,则confirm回调,ack=true
publisher-returns: true       #exchange到queue成功,则不回调return
                                  #exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

三、疑问

发送消息时,消息发送成功,业务失败。业务成功消息发送失败?
保证业务处理成功后发送消息,发送失败一直重试发送消息。

四、demo

kafka:https://github.com/huangxiongbiao12/kafka.git
rabbitmq:https://github.com/huangxiongbiao12/rabbitmq-demo.git

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,569评论 0 10
  • 一、Kafka简介 Kafka (科技术语)。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规...
    边学边记阅读 1,716评论 0 14
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,339评论 0 9
  • Kafka史上最详细原理总结分为上下两部分,承上启下 Kafka史上最详细原理总结上 Kafka史上最详细原理总结...
    小波同学阅读 22,129评论 1 115
  • 西寺沟一日游 很早定好了座位,和一行陌生人开始了一天的旅途,韩姐,娇娇,壮壮,亢亢,六个人的行程,一堆人的旅行,我...
    木西草丁阅读 143评论 0 0