简书:亚武de小文 【原创:转载请注明出处】
生产者与消费者模型
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。
一、基本模型图
二、工作流程
- 发送端
1)创建连接 2)创建通道 3)声明队列 4)发送消息
- 接收端
1)创建连接 2)创建通道 3)声明队列 4)监听队列 5)接收消息 6)ack回复
三、代码
生产者-发件人
-
MsgProducer.java
package com.yawu.xiaowen.pc; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者-发件人 * @date 2019.06.25 * @author yawu */ public class MsgProducer { private static final String QUEUE_NAME = "mq_pc_hello"; private static final Logger LOGGER = LoggerFactory.getLogger(MsgProducer.class); public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try { // 连接管理器:应用程序与RabbitMQ建立连接的管理器。 ConnectionFactory factory = new ConnectionFactory(); // 服务器地址 factory.setHost("127.0.0.1"); // 帐号密码:默认为guest/guest,可省略 factory.setUsername("guest"); factory.setPassword("guest"); // 新建连接 connection = factory.newConnection(); // 再创建一个信道 channel = connection.createChannel(); //1、在信道中声明一个队列 /** * 参数详解 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列是否独占连接, * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它属性参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //2、创建一条消息 String message = "Hello,亚武de小文!"; // 3、采用二进制流的方式传输 byte[] msg = message.getBytes("UTF-8"); // 4、channel是一个信道,它接收到msg数据,并将纳入到QUEUE_NAME队列中 /** * 消息发布方法参数详解: * exchange:如果没有指定,则使用Default Exchange * routingKey:消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * props:消息包含的属性 * body:消息体 */ channel.basicPublish("", QUEUE_NAME, null, msg); LOGGER.info("发件人---发送消息:{}", message); } catch (Exception e) { e.printStackTrace(); } finally { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } } }
消费者-收件人
-
MsgConsumer.java
package com.yawu.xiaowen.pc; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * 消费者-收件人 * @date 2019.06.25 * @author yawu */ public class MsgConsumer { private static final String QUEUE_NAME = "mq_pc_hello"; private static final Logger LOGGER = LoggerFactory.getLogger(MsgConsumer.class); public static void main(String[] args) { try { // 应用程序与RabbitMQ建立连接的管理器。 ConnectionFactory factory = new ConnectionFactory(); // 服务器地址 factory.setHost("127.0.0.1"); // 新建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel(); //1、首先在通道中申明一个队列 /** * 参数详解 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列只能被一个消费者占有并消费 * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //2、创建消费者,并重写如何消费的方法,eg:输出消息 //3、首先从信道里面获取数据 Consumer consumer = new DefaultConsumer(channel) { /** * 消费者接收消息调用此方法 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 * (收到消息失败后是否需要重新发送) * @param properties * @param body * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); LOGGER.info("收件人---收到消息:{}", message); } }; /** * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息 * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中,Unacked * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。 */ channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
四、几种情况运行与分析
1、分别启动生产者服务和消费者服务
2、关闭生产者服务,开启消费者服务
3、关闭消费者服务,开启生产者服务
该信息处于队列中等待状态,等待消费者消费
4、服务都保持启动
-
设置autoAck参数为false
/** * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息 * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中 * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。 */ channel.basicConsume(QUEUE_NAME, false, consumer);
-
生产者发送多条信息(此处我发出五条消息)