上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,比较容易理解。
但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,我们今天再来看看工作队列。
1. 什么是工作队列
工作队列:用来将耗时的任务分发给多个消费者
主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。
2. 代码部分
生产者
package cn.lovingliu.rabbitmq_work.producer;
import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 生产者
* @Date:Created in 2020-01-16
*/
public class Producer {
/** 队列名称 */
private static final String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.获取连接 */
Connection newConnection = ConnectionUtil.getConnection();
/** 2.创建通道 */
Channel channel = newConnection.createChannel();
/** 3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 在收到消费者确认回执消息之前 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 避免重复消费 */
channel.basicQos(1);
for (int i = 1; i <= 100; i++) {
String msg = "生产者消息_" + i;
System.out.println("生产者发送消息:" + msg);
/** 4.发送消息 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
channel.close();
newConnection.close();
}
}
消费者1
package cn.lovingliu.rabbitmq_work.consumer;
import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 消费者1
* @Date:Created in 2020-01-16
*/
public class Customer_1 {
/**
* 队列名称
*/
private static final String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("001");
/** 1.获取连接 */
Connection newConnection = ConnectionUtil.getConnection();
/** 2.获取通道 */
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
try {
Thread.sleep(1000);// 阻塞事件更长,代表该消息处理事件更长
} catch (Exception e) {
} finally {
/** 手动回执消息 */
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
/** 3.监听队列 */
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
消费者2
package cn.lovingliu.rabbitmq_work.consumer;
import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 消费者1
* @Date:Created in 2020-01-16
*/
public class Customer_2 {
/**
* 队列名称
*/
private static final String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("002");
/** 1.获取连接 */
Connection newConnection = ConnectionUtil.getConnection();
/** 2.获取通道 */
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
try {
Thread.sleep(10);
} catch (Exception e) {
} finally {
/** 手动回执消息 */
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
/** 3.监听队列 */
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
3.测试结果
4.分发机制
轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,
RabbitMQ
将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin
(轮询)。公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而
RabbitMQ
则是不了解这些的。这是因为当消息进入队列,RabbitMQ
就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。
5.实现公平分发
由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。注意:要实现公平分发,必须关闭自动应答
公平分发
其实发生上述问题的原因是 RabbitMQ
收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似于TCP/UDP
中的UDP
,面向无连接。
因此我们可以使用 basicQos
方法,并将参数 prefetchCount
设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ
就不会轮流分发了,而是寻找空闲的工作者进行分发。
关键性代码:
/** 2.获取通道 */
final Channel channel = newConnection.createChannel();
/** 设置为手动应答
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);
6.消息确认模式
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并在其他消费者空闲时,发给空闲消费者。
手动模式:
自动模式:
7.消息持久化
问题背景
上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill
的情况。
当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。
这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。
但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ
死掉了或者重启了,上次创建的队列、消息都不会保存。
怎么办呢?
参数配置
- 参数配置一:生产者创建队列声明时,修改第二个参数为
true
/**3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- 参数配置二:生产者发送消息时,修改第三个参数为
MessageProperties.PERSISTENT_TEXT_PLAIN
for (int i = 1; i <= 50; i++) {
String msg = "生产者消息_" + i;
System.out.println("生产者发送消息:" + msg);
/**4.发送消息 */
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}
实现持久化
package cn.lovingliu.rabbitmq.producer;
import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @Author:LovingLiu
* @Description: 生产者发送消息
* @Date:Created in 2020-01-15
*/
public class Send {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception {
/** 1.获取连接 */
Connection newConnection = ConnectionUtil.getConnection();
/** 2.创建通道 */
Channel channel = newConnection.createChannel();
/** 3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 持久化
String msg = "我是生产者生成的消息";
System.out.println("生产者发送消息:" + msg);
/** 4.发送消息 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化
channel.close();
newConnection.close();
}
}
8. 工作队列总结
1、循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。
2、消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会。
3、公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。