Routing
在上一个教程中,我们创建了一个简单的日志系统。我们可以将日志消息广播给所有的接收者(消费者)。
在这个教程中,我们将为我们的日志系统添加一个功能:仅仅订阅一部分消息。比如,我们可以直接将关键的错误类型日志消息保存到日志文件中,还可以同时将所有的日志消息打印到控制台。
绑定(Bindings)
在之前的例子中,我们已经创建了绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一个绑定是建立在一个队列和一个路由器之间的关系,可以解读为:该队列对这个路由器中的消息感兴趣。
绑定可以设置另外的参数:路由键routingKey
。为了避免和void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
中的routingKey混淆,我们将这里的key称为绑定键binding key
,下面的代码展示了如何使用绑定键来创建一个绑定关系:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的含义取决于路由器的类型,我们之前使用的fanout
类型路由器会忽略该值。
直接路由器 (Direct Exchange)
我们之前的日志系统会将所有消息广播给所有消费者。现在我们想根据日志的严重程度来过滤日志。比如,我们想要一个程序来将error
日志写到磁盘文件中,而不要将warning
或info
日志写到磁盘中,以免浪费磁盘空间。
我们之前使用的fanout
路由器缺少灵活性,它只是没头脑地广播消息。所以,我们用direct
路由器来替换它。direct
路由器背后的路由算法很简单:只有当消息的路由键routing key
与队列的绑定键binding key
完全匹配时,该消息才会进入该队列。
为了演示上面拗口的表述中的意思,考虑下面的设置:
上图中,直接路由器x
与两个队列绑定。第一个队列以绑定键orange
来绑定,第二个队列以两个绑定键black
和green
和路由器绑定。
按照这种设置,路由键为orange
的消息以发布给路由器后,将会被路由到队列Q1,路由键为black
或者green
的消息将会路由到队列Q2。
多重绑定(Multiple bindings)
多个队列以相同的绑定键binding key
绑定到同一个Exchange上,是完全可以的。按照这种方式设置的话,直接路由器就会像fanout
路由器一样,将消息广播给所有符合路由规则的队列。一个路由键为black
的消息将会发布到队列Q1和Q2。
发布消息
在这个教程中,我们使用direct
路由器来代替上个教程中的fanout
路由器。同时,我们为日志设置严重级别,并将此作为路由键。这样,接收者(消费者)就可以选择性地接收日志消息。
首先,创建一个路由器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
接着,发送一个消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
简单起见,我们假设severity
只能是info
、warning
、error
中的一种。
消息订阅
接收消息将会和之前的教程类似,只是我们会为每一个级别的消息来创建不同的绑定:
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
放在一块
生产者EmitLogDirect.java
的完整代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明路由器和路由器的类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = "info";
String message = ".........i am msg.........";
//发布消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
}
消费者ReceiveLogsDirect.java
的完整代码如下:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
//建立连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明路由器和类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
String queueName = channel.queueDeclare().getQueue();
//定义要监听的级别
String[] severities = {"info", "warning", "error"};
//根据绑定键绑定
for (String severity : severities) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
现在可以进行测试了。首先,启动一个消费者实例(ReceiveLogsDirect.java
),然后将其中的要监听的级别改为String[] severities = {"error"};
,再启动另一个消费者实例。此时,这两个消费者都开始监听了,一个监听所有级别的日志消息,另一个监听error
日志消息。
然后,启动生产者(EmitLogDirect.java
),之后将String severity = "info";
中的info
,分别改为warning
、error
后运行。
这样,就可以在控制台看到如下输出:
//生产者
[x] Sent 'warning':'.........i am msg.........'
[x] Sent 'info':'.........i am msg.........'
[x] Sent 'error':'.........i am msg.........'
//消费者1
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'info':'.........i am msg.........'
[x] Received 'error':'.........i am msg.........'
[x] Received 'warning':'.........i am msg.........'
//消费者2
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'error':'.........i am msg.........'
说明
①与原文略有出入,如有疑问,请参阅原文
②原文均是编译后通过javacp
命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。