MQ!Rabbit-client 事务及消息确认机制

MQ!Rabbit-client 事务及消息确认机制

参考文档:

https://blog.csdn.net/u013256816/article/details/55515234

https://blog.csdn.net/hzw19920329/article/details/54315940

https://blog.csdn.net/anzhsoft/article/details/21603479

https://blog.csdn.net/hzw19920329/article/details/54340711

▥ ♬ゃō۰• •▥ ♬ゃō۰• •▥ ♬ゃō۰• •▥ ♬ゃō۰• •▥ ♬ゃō۰• •▥ ♬ゃō۰• •▥ ♬ゃō۰• •▥ ♬ゃō۰• •

本篇文章内容都是上面几篇内容的整合,写的也没上面参考资料写的好。如果对这部分内容感兴趣的强烈建议读上面的几篇博客(不是打广告🐷,而且这里也不适合打广告)。

事务机制

RabbitMQ中与事务机制有关的方法:txSelect(), txCommit()txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

代码位于AMQImplMQ!Rabbit-client AMQImpl

public static class Tx {
        public static final int INDEX = 90;

        public static class Select
            extends Method
            implements com.rabbitmq.client.AMQP.Tx.Select
        {
            public static final int INDEX = 10;


            public Select() {
            }
            public Select(MethodArgumentReader rdr) throws IOException {
                this();
            }

            public int protocolClassId() { return 90; }
            public int protocolMethodId() { return 10; }
            public String protocolMethodName() { return "tx.select";}

            public boolean hasContent() { return false; }

            public Object visit(MethodVisitor visitor) throws IOException
            {   return visitor.visit(this); }

            public void appendArgumentDebugStringTo(StringBuilder acc) {
                acc.append("()");
            }

            public void writeArgumentsTo(MethodArgumentWriter writer)
                throws IOException
            {
            }
        }

        public static class SelectOk
            extends Method
            implements com.rabbitmq.client.AMQP.Tx.SelectOk
        {
            public static final int INDEX = 11;


            public SelectOk() {
            }
            public SelectOk(MethodArgumentReader rdr) throws IOException {
                this();
            }

            public int protocolClassId() { return 90; }
            public int protocolMethodId() { return 11; }
            public String protocolMethodName() { return "tx.select-ok";}

            public boolean hasContent() { return false; }

            public Object visit(MethodVisitor visitor) throws IOException
            {   return visitor.visit(this); }

            public void appendArgumentDebugStringTo(StringBuilder acc) {
                acc.append("()");
            }

            public void writeArgumentsTo(MethodArgumentWriter writer)
                throws IOException
            {
            }
        }

        public static class Commit
            extends Method
            implements com.rabbitmq.client.AMQP.Tx.Commit
        {
            public static final int INDEX = 20;


            public Commit() {
            }
            public Commit(MethodArgumentReader rdr) throws IOException {
                this();
            }

            public int protocolClassId() { return 90; }
            public int protocolMethodId() { return 20; }
            public String protocolMethodName() { return "tx.commit";}

            public boolean hasContent() { return false; }

            public Object visit(MethodVisitor visitor) throws IOException
            {   return visitor.visit(this); }

            public void appendArgumentDebugStringTo(StringBuilder acc) {
                acc.append("()");
            }

            public void writeArgumentsTo(MethodArgumentWriter writer)
                throws IOException
            {
            }
        }

        public static class CommitOk
            extends Method
            implements com.rabbitmq.client.AMQP.Tx.CommitOk
        {
            public static final int INDEX = 21;


            public CommitOk() {
            }
            public CommitOk(MethodArgumentReader rdr) throws IOException {
                this();
            }

            public int protocolClassId() { return 90; }
            public int protocolMethodId() { return 21; }
            public String protocolMethodName() { return "tx.commit-ok";}

            public boolean hasContent() { return false; }

            public Object visit(MethodVisitor visitor) throws IOException
            {   return visitor.visit(this); }

            public void appendArgumentDebugStringTo(StringBuilder acc) {
                acc.append("()");
            }

            public void writeArgumentsTo(MethodArgumentWriter writer)
                throws IOException
            {
            }
        }

        public static class Rollback
            extends Method
            implements com.rabbitmq.client.AMQP.Tx.Rollback
        {
            public static final int INDEX = 30;


            public Rollback() {
            }
            public Rollback(MethodArgumentReader rdr) throws IOException {
                this();
            }

            public int protocolClassId() { return 90; }
            public int protocolMethodId() { return 30; }
            public String protocolMethodName() { return "tx.rollback";}

            public boolean hasContent() { return false; }

            public Object visit(MethodVisitor visitor) throws IOException
            {   return visitor.visit(this); }

            public void appendArgumentDebugStringTo(StringBuilder acc) {
                acc.append("()");
            }

            public void writeArgumentsTo(MethodArgumentWriter writer)
                throws IOException
            {
            }
        }

        public static class RollbackOk
            extends Method
            implements com.rabbitmq.client.AMQP.Tx.RollbackOk
        {
            public static final int INDEX = 31;


            public RollbackOk() {
            }
            public RollbackOk(MethodArgumentReader rdr) throws IOException {
                this();
            }

            public int protocolClassId() { return 90; }
            public int protocolMethodId() { return 31; }
            public String protocolMethodName() { return "tx.rollback-ok";}

            public boolean hasContent() { return false; }

            public Object visit(MethodVisitor visitor) throws IOException
            {   return visitor.visit(this); }

            public void appendArgumentDebugStringTo(StringBuilder acc) {
                acc.append("()");
            }

            public void writeArgumentsTo(MethodArgumentWriter writer)
                throws IOException
            {
            }
        }
    }

事务提交步骤

  • client发送Tx.Select
  • broker发送Tx.Select-Ok(之后publish)
  • client发送Tx.Commit
  • broker发送Tx.Commit-Ok

代码示例:

try {
    channel.txSelect();
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    // 触发异常,走回滚
    int result = 1 / 0;
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}

Confirm模式

RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,随后通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。

producer端confirm模式的实现原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

开启confirm模式的方法

生产者通过调用channel的confirmSelect方法将channel设置为confirm模式,如果没有设置no-wait标志的话,broker会返回confirm.select-ok表示同意发送者将当前channel信道设置为confirm模式(从目前RabbitMQ最新版本3.6来看,如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的)。

已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。

客户端实现生产者confirm有三种编程方式

  • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。

  • 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。

  • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

// 普通confirm模式
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
    System.out.println("send message failed.");
}

// 批量confirm模式
channel.confirmSelect();
for(int i=0;i<batchCount;i++){
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
    System.out.println("send message failed.");
}

// 异步confirm模式
 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
 channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
            }
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
                if (multiple) {
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
            }
        });

        while (true) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
            confirmSet.add(nextSeqNo);
        }

消息确认(Consumer端)

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(ConfirmConfig.queueName, false, consumer);

while(true){
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String msg = new String(delivery.getBody());
    // do something with msg. 
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容