4. rabbitmq-消息详解

消息的属性

消息标准化可以减少开发人员重复使用的痛苦,所以 RabbitMQ 对消息进行了如下标准化:

按照 AMQP 的协议单个最大的消息大小为 16EB(2 的 64 次方),但是 RabbitMQ 将消息大小限定为 2GB(2 的 31 次方)。


image.png
image.png

在发送消息时,我们还可以对消息的属性做更细微的控制,比如利用reply-to属性构建 Request-Response 模式,

消息存活时间

当队列消息的 TTL 和消息 TTL 都被设置,时间短的 TTL 设置生效。

如果将一个过期消息发送给RabbitMQ,该消息不会路由到任何队列,而是直接丢弃。

为消息设置 TTL有一个问题:RabbitMQ只对处于队头的消息判断是否过期(即不会扫描队列),所以,很可能队列中已存在死消息,但是队列并不知情。这会影响队列统计数据的正确性,妨碍队列及时释放资源。

消息的持久化

默认情况下,队列和交换器在服务器重启后都会消失,消息当然也是。将队列和交换器的 durable 属性设为 true,缺省为 false,但是消息要持久化还不够,还需要将消息在发布前,将投递模式设置为2。消息要持久化,必须要有持久化的队列、交换器和投递模式都为 2。

交换机设置投递模式为2

image.png

声明持久化队列

image.png

app-id:应用程序的类型和版本号

image.png

reply-to:Request-Response 模式

我们前面的学习模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。

image.png

生产者:

public class ReplyToProducer {

    public final static String EXCHANGE_NAME = "replyto";

    public static void main(String[] args)
            throws IOException, TimeoutException {
        /* 创建连接,连接到RabbitMQ*/
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();

        /*创建信道*/
        Channel channel = connection.createChannel();
        /*创建持久化交换器*/
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",false);

        //TODO 响应QueueName ,消费者将会把要返回的信息发送到该Queue
        String responseQueue = channel.queueDeclare().getQueue();
        //TODO 消息的唯一id
        String msgId = UUID.randomUUID().toString();
        //TODO 设置消息中的应答属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .replyTo(responseQueue)
                .messageId(msgId)
                .build();

        /*声明了一个消费者*/
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received["+envelope.getRoutingKey()
                        +"]"+message);
            }
        };
        //TODO 消费者应答队列上的消息
        channel.basicConsume(responseQueue,true,consumer);

        String msg = "Hello,RabbitMq";
        //TODO 发送消息时,把响应相关属性设置进去
        channel.basicPublish(EXCHANGE_NAME,"error",
                properties,
                msg.getBytes());
        System.out.println("Sent error:"+msg);
    }

}

消费者

public class ReplyToConsumer {

    public static void main(String[] argv)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打开连接和创建频道,与发送端一样
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        /*创建交换器*/
        channel.exchangeDeclare(ReplyToProducer.EXCHANGE_NAME,
                "direct",false);

        /*声明一个队列*/
        String queueName = "replyto";
        channel.queueDeclare(queueName,false,false,
                false,null);

        /*绑定,将队列和交换器通过路由键进行绑定*/
        String routekey = "error";/*表示只关注error级别的日志消息*/
        channel.queueBind(queueName,ReplyToProducer.EXCHANGE_NAME,routekey);

        System.out.println("waiting for message........");

        /*声明了一个消费者*/
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received["+envelope.getRoutingKey()
                        +"]"+message);
                //TODO 从消息中拿到相关属性(确定要应答的消息ID,)
                AMQP.BasicProperties respProp
                        = new AMQP.BasicProperties.Builder()
                        .replyTo(properties.getReplyTo())
                        .correlationId(properties.getMessageId())
                        .build();
                //TODO 消息消费时,同时需要生作为生产者生产消息(以OK为标识)
                channel.basicPublish("", respProp.getReplyTo() ,
                        respProp ,
                        ("OK,"+message).getBytes("UTF-8"));
           }
        };
        /*消费者正式开始在指定队列上消费消息*/
        channel.basicConsume(queueName,true,consumer);
    }
}

下一篇 5.rabbitmq-消息发布时的可靠性保障

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。