RabbitMQ(四): rabbitmq 的消息确认机制(事务+confirm)

在 rabbitmq 中我们可以通过持久化数据解决 rabbitmq 服务器异常的数据丢失问题。

问题:生产者将消息发送出去之后,消息到底有没有到达 rabbitmq 服务器。默认情况下是不知道的。

两种方式:

  • AMQP 实现了事务机制
  • Confirm 模式

事务机制

  • txSelect:用户将当前的 channel 设置成 transaction 模式
  • txCommit:用于提交事务
  • txRollback:回滚事务

缺点:降低了 rabbitmq 的吞吐量。

生产者

public class TxSend {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello tx message!";

        try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // 出错测试
            int xx = 1 / 0;
            System.out.println("send: " + msg);
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("send message rollback.");
        }

        channel.close();
        connection.close();
    }
}

消费者

public class TxRecv {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv[tx] msg: " + new String(body, "utf-8"));
            }
        });
    }
}

Confirm 模式

生产者端 confirm 模式的实现原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有的匹配队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示这个序列号之前的所有消息已经得到了处理。

confirm 模式最大的好处在于它是异步的。

开启 confirm 模式:

channel.confirmSelect();

编程模式:

  • 普通 发一条 waitForConfirms()
  • 批量 发一批 waitForConfirms()
  • 异步 confirm 模式:提供一个回调方法

confirm 单条

public class Send1 {
    private static final String QUEUE_NAME = "test_queue_confirm1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 生产者调用 confirmSelect 将 channel 设置成为 confirm 模式 (注意)
        channel.confirmSelect();
        String msg = "hello confirm message!";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        if (!channel.waitForConfirms()) {
            System.out.println("message send failed.");
        } else {
            System.out.println("message send ok.");
        }
        channel.close();
        connection.close();
    }
}

confirm 批量

public class Send2 {
    private static final String QUEUE_NAME = "test_queue_confirm1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 生产者调用 confirmSelect 将 channel 设置成为 confirm 模式 (注意)
        channel.confirmSelect();

        String msg = "hello confirm message batch!";
        // 批量模式
        for (int i = 0; i< 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        // 确认
        if (!channel.waitForConfirms()) {
            System.out.println("message send failed.");
        } else {
            System.out.println("message send ok.");
        }
        channel.close();
        connection.close();
    }
}

confirm 异步

Channel 对象提供的 ConfirmListener() 回调方法只包含 deliveryTag (当前 Channel 发出的消息序号),我们需要自己为每一个 Channel 维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中元素加 1,每回调一次 handleAck 方法,unconfirm 集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个 unconfirm 集合最好采用有序集合 SortedSet 存储结构。

public class Send3 {
    private static final String QUEUE_NAME = "test_queue_confirm3";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 生产者调用 confirmSelect 将 channel 设置为 confirm 模式
        channel.confirmSelect();

        // 未确认的消息标识
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 通道添加监听
        channel.addConfirmListener(new ConfirmListener() {
            // 没有问题的 handleAck
            public void handleAck(long l, boolean b) throws IOException {
                if (b) {
                    System.out.println("---handleAck---multiple");
                    confirmSet.headSet(l + 1).clear();
                } else {
                    System.out.println("---handleAck---multiple false");
                    confirmSet.remove(l);
                }
            }
            // handleNack 1s 3s 10s xxx...
            public void handleNack(long l, boolean b) throws IOException {
                if (b) {
                    System.out.println("---handleNack---multiple");
                    confirmSet.headSet(l + 1).clear();
                } else {
                    System.out.println("---handleNack---multiple false");
                    confirmSet.remove(l);
                }
            }
        });

        String msg = "sssss";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            confirmSet.add(seqNo);
        }
    }
}

消费者

(只需修改 QUEUE_NAME)

public class Send3 {
    private static final String QUEUE_NAME = "test_queue_confirm3";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 生产者调用 confirmSelect 将 channel 设置为 confirm 模式
        channel.confirmSelect();

        // 未确认的消息标识
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 通道添加监听
        channel.addConfirmListener(new ConfirmListener() {
            // 没有问题的 handleAck
            public void handleAck(long l, boolean b) throws IOException {
                if (b) {
                    System.out.println("---handleAck---multiple");
                    confirmSet.headSet(l + 1).clear();
                } else {
                    System.out.println("---handleAck---multiple false");
                    confirmSet.remove(l);
                }
            }
            // handleNack 1s 3s 10s xxx...
            public void handleNack(long l, boolean b) throws IOException {
                if (b) {
                    System.out.println("---handleNack---multiple");
                    confirmSet.headSet(l + 1).clear();
                } else {
                    System.out.println("---handleNack---multiple false");
                    confirmSet.remove(l);
                }
            }
        });

        String msg = "sssss";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            confirmSet.add(seqNo);
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容