一、前言
RabbitMQ其实是我最早接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一个人去看,由于RabbitMQ官网的英文还不算太难,因此也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而现在回过头来再看,发现已经忘记了个差不多了,现在再回过头来继续看看,然乎记之。以防再忘,读者看时最好有一定的MQ基础。
二、RabbitMQ
首先我们需要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。
2.1、队列
存储消息的地方,多个生产者可以将消息发送到一个队列,多个消费者也可以消费同一个队列的消息。
注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,并且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。
2.2、消费者
消费消息的一方
public class Send {
private final static String QUEUE_NAME = "hello";
private final static String IP = "172.16.12.162";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP);
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}catch (Exception e){
e.printStackTrace();
}
}
}
public class Recv {
private final static String QUEUE_NAME = "hello";
private final static String IP = "172.16.12.162";
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}catch (Exception e){
e.printStackTrace();
}
}
}
2.3、小结
1、Rabbit是如何保证消息被消费的?
答:通过ack机制。每当一个消息被消费端消费的时候,消费端可以发送一个ack给RabbitMQ,这样RabbitMQ就知道了该条消息已经被完整消费并且可以被delete了。;如果一条消息被消费但是没有发送ack,那么此时RabbitMQ将会认为需要重新消费该消息,如果此时还有其它的消费者,那么此时RabbitMQ将会把这条消息交给它处理。
注意:开启ack机制的是autoAck=
false
;
2、消息如何进行持久化?
- 将queue持久化,即设置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二个参数durable为true
- 设置消息持久化,即设置MessageProperties.PERSISTENT_TEXT_PLAIN
注意:消息持久化并不一定保证消息不会被丢失
3、RabbitMQ如何避免两个消费者一个非常忙一个非常闲的情况?
通过如下设置,保证一个消费者一次只能消费一个消息,只有当它消费完成并且返回ack给RabbitMQ之后才给它派发新的消息。
int prefetchCount = 1 ;
channel.basicQos(prefetchCount)
4、RabbitMQ异常情况下如何保证消息不会被重复消费?
需要业务自身实现密等性,RabbitMQ没有提供比较好的方式去保证。
2.2、交换机
在RabbitMQ中,生产者其实从来不会发送消息到队列,甚至,它不知道消息被发送到了哪个队列。那它被发送到了哪里呢?就是本节的重点:交换机,下面就是它在RabbitMQ中的介绍图。(X就是交换机)生产者发送消息给交换机,然后由交换机将消息转发给队列。
从上图就产生一个问题:X怎么将消息发给queue呢?它是把消息发给所有queue还是发给一个指定的queue或者丢弃消息呢?这就是看交换机的类型了。下面一起谈谈这几种类型
2.2.1、fanout
fanout:广播模式,这个比较好理解,就是所有的队列都能收到交换机的消息。
如上面,两个队列都能收到交换机的消息。
2.2.2、direct
这个模式相当于发布/订阅模式的一种,当交换机类型为direct的时候,此时我们需要设置两个参数:
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二个参数,我们可以把它称呼为routeKey
- channel.queueBind(queueName, EXCHANGE_NAME, "");第三个参数,我们把它称呼为bindKey
有了这两个参数,我们就可以指定我们订阅哪些消息了。
如图,Q1订阅了orange的消息,Q2订阅了black、green的消息。
2.2.3、topic
其实topic和direct有一点类似,它相当于对direct作了增强。在direct中,我们上面所说的bind routeKey为black、green的它是有限制的,它只能绝对的等于routeKey,但是有时候我们的需求不是这样,我们可能想要的是正则匹配即可,那么Topic就派上用场了。
当类型为topic时,它的bindKey对应字符串需要是以“.”分割,同时RabbitMQ还提供了两个符号:
- 星号(*):表示1个单词
- 井号(#):表示0、多个单词
上图的意思是:所有第二个单词为orange的消息发送个Q1,所有最后一个单词为rabbit或者第一个单词为lazy的消息发送给Q2。
2.2.4、header
这一种类型官方demo没有过多解释,这里也不研究了。
2.3、RPC
RabbitMQ 还可以实现RPC(远程过程调用)。什么是RPC,简单来说就是local调用remote方法。对应于RabbitMQ中则是Client发送一个request message,Server处理完成之后将其返回给Client。这里就有了一个疑问?Server是如何将response返回给Client的,这里RabbitMQ定义了一个概念:Callback Queue。
Callback Queue
注意这个队列是独一无二的String replyQueueName = channel.queueDeclare().getQueue();
。
首先我们需要明白一点的是为什么需要这个queue?我们知道在RabbitMQ作消息队列的时候,Client只需要将消息投放到queue中,然后Server从queue去取就可以了。但是在RabbitMQ作为RPC的时候多了一点就是,Client还需要返回结果,这时Server端怎么知道把消息发送给Client,这就是Callback Queue的用处了。
Correlation Id
在上面我们知道Server返回数据给Client是通过Callback Queue的,那么是为每一个request都创建一个queue吗?这未免太过浪费资源,RabbitMQ有更好的方案。在我们发送request,绑定一个唯一ID(correlationId),然后在消息被处理返回的时候取出这个ID和发出去的ID进行匹配。这样来说一个Callback Queue是Client级别而不是request级别的了。
实现
上面介绍了RabbitMQ实现RPC最重要的两个概念,具体代码比较简单还是贴下把。
client 端
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) throws Exception{
RPCClient fibonacciRpc = new RPCClient();
try {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
服务端
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
三、总结
这次回头再看RabbitMQ,再次重新理解了以下RabbitMQ,有些东西还是要慢慢嚼的。当然这些也都是官网的入门例子,后续有机会的话再深入研究。