Topics
在上一个教程中我们改进了我们的日志系统:使用direct
路由器替代了fanout
路由器,从而可以选择性地接收日志。
尽管使用direct路由器给我们的日志系统带了了改进,但仍然有一些限制:不能基于多种标准进行路由。
在我们的日志系统中,我们可能不仅需要根据日志的严重级别来接收日志,而且有时想基于日志来源进行路由。如果你知道syslog这个Unix工具,你可能了解这个概念,sysylog
会基于日志严重级别(info/warn/crit...
)和设备(auth/cron/kern...
)进行日志分发。
如果我们可以监听来自corn
的错误日志,同时也监听kern
的所有日志,那么我们的日志系统就会更加灵活。
为了实现这个功能,我们需要了解一个复杂的路由器:topic
路由器。
主题路由器(Topic Exchange)
发送到topic
路由器的消息的路由键routing_key
不能任意给定:它必须是一些单词的集合,中间用点号.
分割。这些单词可以是任意的,但通常会体现出消息的特征。一些有效的路由键示例:stock.usd.nyse
,nyse.vmw
,quick.orange.rabbit
。这些路由键可以包含很多单词,但路由键总长度不能超过255个字节。
绑定键binding key
也必须是这种形式。topic
路由器背后的逻辑与direct
路由器类似:以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:
①*(星号)仅代表一个单词
②#(井号)代表任意个单词
下图可以很好地解释这两个符号的含义:
对于上图的例子,我们将会发送描述动物的消息。这些消息将会以由三个单词组成的路由键发送。路由键中的第一个单词描述了速度,第二个描述了颜色,第三个描述了物种:<speed>.<colour>.<species>
。
我们创建了三个绑定,Q1的绑定键为*.orange.*
,Q2的绑定键有两个,分别是*.*.rabbit
和lazy.#
。
上述绑定关系可以描述为:
①Q1关注所有颜色为orange
的动物。
②Q2关注所有的rabbit
,以及所有的lazy
的动物。
如果一个消息的路由键是quick.orange.rabbit
,那么Q1和Q2都可以接收到,路由键是lazy.orange.elephant
的消息同样如此。但是,路由键是quick.orange.fox
的消息只会到达Q1,路由键是lazy.brown.fox
的消息只会到达Q2。注意,路由键为lazy.pink.rabbit
的消息只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox
的消息因为不和任意的绑定键匹配,所以将会被丢弃。
假如我们不按常理出牌:发送一个路由键只有一个单词或者四个单词的消息,像orange
或者quick.orange.male.rabbit
,这样的话,这些消息因为不和任意绑定键匹配,都将会丢弃。但是,lazy.orange.male.rabbit
消息因为和lazy.#
匹配,所以会到达Q2,尽管它包含四个单词。
Topic exchange
Topic exchange
非常强大,可以实现其他任意路由器的功能。
</br>当一个队列以绑定键#
绑定,它将会接收到所有的消息,而无视路由键(实际是绑定键#
匹配了任意的路由键)。----这和fanout
路由器一样了。
</br>当*
和#
这两个特殊的字符不出现在绑定键中,Topic exchange
就会和direct exchange
类似了。
放在一块
我们将会在我们的日志系统中使用主题路由器Topic exchange
,并假设所有的日志消息以两个单词<facility>.<severity>
为路由键。
代码和上个教程几乎一样。
生产者EmitLogTopic.java
:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
//建立连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//声明路由器和路由器类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//定义路由键和消息
String routingKey = "";
String message = "msg.....";
//发布消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
消费者ReceiveLogsTopic.java
:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
//建立连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明路由器和路由器类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//
String bingingKeys[] = {""};
for (String bindingKey : bingingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//监听消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
现在,可以动手实验了。
开头提到的:日志严重级别info/warn/crit...
和设备auth/cron/kern...
。
消费者:
将String bingingKeys[] = {""}
改为String bingingKeys[] = {"#"}
,启动第一个消费者;
再改为String bingingKeys[] = {"kern.*"}
,启动第二个消费者;
再改为String bingingKeys[] = {"*.critical"}
,启动第三个消费者;
再改为String bingingKeys[] = {"kern.*", "*.critical"}
,启动第四个消费者。
生产者,发送多个消息,如:
路由键为kern.critical
的消息:A critical kernel error
;
路由键为kern.info
的消息:A kernel info
;
路由键为kern.warn
的消息:A kernel warning
;
路由键为auth.critical
的消息:A critical auth error
;
路由键为cron.warn
的消息:A cron waning
;
路由键为cron.critical
的消息:A critical cron error
;
试试最后的结果:第一个消费者将会接收到所有的消息,第二个消费者将会kern
的所有严重级别的日志,第三个消费者将会接收到所有设备的critical
消息,第四个消费者将会接收到kern
设备的所有消息和所有
critical
消息。
下个教程,我们将会学习如何让消息往返,以此来作为一个远程过程调用(RPC)。
说明
①与原文略有出入,如有疑问,请参阅原文
②原文均是编译后通过javacp命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。