5.rabbitmq-消息发布时的可靠性保障

消息链路

在 RabbitMQ 在设计的时候,特意让生产者和消费者“脱钩”,也就是消息的发布和消息的消费之间是解耦的。
在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎
么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。

各种可靠性机制对性能的影响

接下来我们分别介绍下这几种投递机制

1. 无保障模式

通过 basicPublish 发布你的消息并使用正确的交换器和路由信息,你的消息会被接收并发送到合适的
队列中。但是如果有网络问题,或者消息不可路由,或者 RabbitMQ 自身有问题的话,这种方式就有风险。所以无保证的消息发送一般情况下不推荐。

2. 失败确认模式

失败确认图

在发送消息时设置 mandatory 标志,告诉 RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败。可以这样认为,开启 mandatory
是开启故障检测模式

注意:它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失

代码演示:

生产者代码:

1.发布消息时,传入mandatory为true
2.channal增加消息路由失败的回调处理
public class MandatoryProducer {
    //队列名称
    public static final String DIRECT_NAME = "lb_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个direct队列
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //增加消息发布失败的回调处理
        channel.addReturnListener(
                (replyCode, replyText, exchange, routingKey, properties, body) -> {
                    String message = new String(body);
                    System.out.println("返回的message:" + message);
                    System.out.println("返回的replycode:" + replyCode);
                    System.out.println("返回的replyText:" + replyText);
                    System.out.println("返回的exchange:" + exchange);
                    System.out.println("返回的routeKey:" + routingKey);
                });
        for (int i = 1; i < 4; i++) {
            if (i != 1) {
                //发布这条消息时,乱写一个路由键,使其无法匹配到相应队列,看是否会进入上面回调处理的方法中
                channel.basicPublish(DIRECT_NAME, "乱打的", true, null, ("hello,world!" + i).getBytes());
            } else {
                //以下这种为正常发布,是可以匹配到队列的
                channel.basicPublish(DIRECT_NAME, "lb", true, null, ("hello,world!" + i).getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}

消费者:只创建绑定'lb'这个路由键的队列,并消费该队列消息

public class MandatoryConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个队列
        String queueName = "lb-queue";
        channel.queueDeclare(queueName, false, false, false, null);
        //队列绑定交换机"lb_direct",并指定路由键为"lb"
        channel.queueBind(queueName, "lb_direct", "lb");
        //消费投递至该队列的消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者接收到消息***:" + new String(body));
            }
        });
    }
}

先启动消费者等待消费消息,再启动生产者生产消息

消费者端会接受到hellworld2,hellworld3两条消息

image.png

生产者因为hellworld无法路由到具体的队列,从而进入路由失败的回调处理方法中

image.png

3. 发送方确认模式

基于事务的性能问题,RabbitMQ 团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。

原理:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),由这
个 id 在生产者和 RabbitMQ 之间进行消息的确认。
不可路由的消息,当交换器发现,消息不能路由到任何队列,会进行确认操作,表示收到了消息。如果发送方设置了 mandatory 模式,则会先调用
addReturnListener 监听器。

可路由的消息,要等到消息被投递到所有匹配的队列之后,broker 会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确
到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了
确认消息的序列号。


image.png

confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最
终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产
者应用程序同样可以在回调方法中处理该 nack 消息决定下一步的处理。

Confirm 的三种实现方式:

方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回 true。

代码演示:

生产者:

/**
 * 生产者
 */
public class ConfirmProducer {

    //交换机名称
    public static final String DIRECT_NAME = "lb_direct";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个direct队列
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //开启发送者确认模式
        channel.confirmSelect();
        channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
        //判断信道中刚才的消息是否发布成功
        if (channel.waitForConfirms()) {
            System.out.println("msg send success");
        } else {
            System.out.println("msg send fail");
        }
        channel.close();
        connection.close();
    }
}

执行结果:我们可以发现消息已经发布成功。


image.png

注意点:上图中我们给定的路由键是"乱打的",是匹配不到队列的。但是在这个模式下,仍然返回了发布成功的信息。因此这里需要强调一点,消息发布成功只是代表已经成功发送至交换机,并不代表成功的路由到对应队列,更不代表消息已经被消费成功。

接下来结合上面的失败确认模式,看看结果

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个direct队列
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //增加消息路由失败的回调处理
        channel.addReturnListener(
                (replyCode, replyText, exchange, routingKey, properties, body) -> {
                    String message = new String(body);
                    System.out.println("返回的message:" + message);
                    System.out.println("返回的replycode:" + replyCode);
                    System.out.println("返回的replyText:" + replyText);
                    System.out.println("返回的exchange:" + exchange);
                    System.out.println("返回的routeKey:" + routingKey);
                });
        //开启发送者确认模式
        channel.confirmSelect();
        channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
        if (channel.waitForConfirms()) {
            System.out.println("msg send success");
        } else {
            System.out.println("msg send fail");
        }
        channel.close();
        connection.close();
    }
image.png

可以发现,消息是发布成功,但是是路由失败的

方式二:channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会抛出 IOException 异常。

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个direct队列
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //开启发送者确认模式
        channel.confirmSelect();
        //发送4条消息
        for (int i = 0; i < 4; i++) {
            channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
        }
        //将上面的确认方式改成批量确认
        channel.waitForConfirmsOrDie();
        channel.close();
        connection.close();
    }

方式三:channel.addConfirmListener()异步监听发送方确认模式

public class ConfirmAsyncProducer {

    //交换机名称
    public static final String DIRECT_NAME = "lb_direct";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个direct队列
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //开启发送者确认模式
        channel.confirmSelect();
        //发送确认监听
        channel.addConfirmListener(new ConfirmListener() {
            //deliveryTag 每成功发送一条消息自增1
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                 System.out.println("发送成功回调,deliveryTag=" + deliveryTag);
                //doSomeThingAfterSuccess.....
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                //doSomeThingAfterFail.....
            }
        });
        //发送四条消息
        for (int i = 0; i < 4; i++) {
            channel.basicPublish(DIRECT_NAME, "乱打的", true, null, "hello,world!".getBytes());
        }
        channel.close();
        connection.close();
    }
}

启动消费者,4条消息已经发送成功,deliveryTag每次自增

image.png

4. 备用交换机

在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器

使用备用交换器,向往常一样,声明 Queue 和备用交换器,把 Queue 绑定到备用交换器上。然后在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器。


image.png
生产者:
public class BackupProducer {

    //正常的交换机
    public static final String DIRECT_NAME = "lb_direct";
    //备用交换机
    public static final String DIRECT_BACKUP_NAME = "lb_backup_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明备用交换机
        channel.exchangeDeclare(DIRECT_BACKUP_NAME, BuiltinExchangeType.FANOUT);
        //声明主交换机
        Map<String, Object> hashMap = new HashMap<>();
        //通过键值对的方式将备用交换机的名称设置进去
        hashMap.put("alternate-exchange", DIRECT_BACKUP_NAME);
        //声明主交换机,最后一个参数是上面的hashMap
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT, false, false, hashMap);
        channel.basicPublish(DIRECT_NAME, "不知名路由键", false, null, "this is backup exchange test".getBytes());
        channel.close();
        connection.close();
    }
}
消费者:
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个队列
        String queueName = "lb-backup-queue";
        channel.queueDeclare(queueName, false, false, false, null);
        //绑定备用交换机,路由键的其实无所谓,因为备用交换机是一个fanout类型的交换机
        channel.queueBind(queueName, BackupProducer.DIRECT_BACKUP_NAME, DirectProducer.ROUTE_KEY);
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println(consumerTag + ":" + "receiver[" + envelope.getRoutingKey() + "] : " + new String(body));
            }
        });
    }

启动消费者,再启动生产者发送消息


image.png

结果表明,当主交换机发送消息,无法发送至指定队列时,会传送到备用交换机,绑定备用交换机的队列会收到消息

5. 事务

开启事务后,会执行下面步骤
    1.生产者发送Tx.Select
    2.服务端回复Tx.Select-Ok
    3.生产者发送消息
    4.生产者发送Tx.Commit
    5.服务端回复Tx.Commit-Ok
    6.如果提交前发生异常,生产者发送Tx.Rollback
    7.服务端回复Tx.Rollback-Ok
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("maomaoyu.xyz");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(TRANSACTION_EXCHANGE, BuiltinExchangeType.DIRECT);
        try {
            //开启事务
            channel.txSelect();
            channel.basicPublish(TRANSACTION_EXCHANGE, "lb1", true, null, "hello world".getBytes());
            //事务提交
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
            //出现异常回滚
            channel.txRollback();
        }
    }
客户端与服务器端数据包收发情况

事务是非常消耗性能的且是同步的。如果需要发送多条消息,建议一次性提交,如果每次单独提交的话,步骤4和步骤5每次都需要执行,会消耗更多的性能。

下一篇 6.rabbitmq-消息消费时的可靠性保障

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342