上篇文章讲了简单队列的使用,这其实就是RMQ给的demo,实际并没有什么用
本篇讲讲工作模式队列,也称之为任务队列
一个生产者发布了多条消息,消费者A可以接受消息,接受消息后该消息就消除,消费者B可以接受其他消息
使用场景,一些数据库操作比较缓慢的话可以分别给多个接口调用,降低压力,或者抢单场景也能考虑,
比如就10个商品,100个消费者来抢单,前10个抢到了后,消息队列就为空了,那么第11个以后的所有消费者都不会抢到
代码示例:
生产者
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
消费者1
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成 设置fasle代表需要手动返回消息的确认状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 休眠
Thread.sleep(10);
// 手动确认 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态 设置fasle代表需要手动返回消息的确认状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
// 手动确认 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}