在前面的教程里,我们改进了日志系统。我们用direct交换类型代替了fanout交换类型,并实现了可以有选择性的接收日志。
虽然使用direct类型成功的改进了我们的系统,但是它仍然有一定的局限性——它不能够基于多个标准进行路由。
在我们的日志系统中,我们可能希望自己不仅仅基于严重性去订阅日志,我们还应该关注发出日志的源。可能在syslog unix工具上可以了解到这个概念,这个工具根据“严重性”(info/warn/crit...)和“设施”(auth/cron/kern..)来路由(routing)日志。(routing这个词我实在不知道咋翻译了...)
这将给我们带来灵活性,我们想要收到来自"cron"的错误,也想要收到来自"kern"的所有日志。要在我们的日志系统中实现这一点,我们需要学习一个更复杂的——topic exchange
Topic Exchange
发送到Topic Exchange的消息,不能是任意的路由键。他必须遵守一些规则:由点分割的单词列表。单词可以是任意的东西,但通常他们是与消息有关的特性。
路由键比如:"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit".在路由键中可以有很多单词,最多可以达到255个字节。
绑定键也必须使用相同的方式,topic exchange背后的逻辑类似于direct exchange——一条带有特定路由键的消息将会被发送到能够匹配绑定键的所有队列!还有,绑定键有两个重要的特殊情况:
- “*”可以代替一个词。
- “#”可以代替0个或者多个单词。
用图解释一下:
在这个例子中,我们将会发送所有描述东西的信息。消息将会发送到一个由三个单词组成的路由键。路由键第一个词描述“速度”,第二个词描述“颜色”,第三个词描述“物种”。
两个队列,我们创建了三个绑定:
- Q1绑定".orange."
- Q2绑定" * . * .rabbit" 和 "lazy.#"
这些绑定概括成:
- Q1 对所有橙色(orange)动物感兴趣
- Q2 想要听到兔子(rabbit)的一切,也想听到懒惰(lazy)动物的一切~
如果一条消息的路由键为"quick.orange.rabbit",他将被传到两个队列中去。
如果消息路由键是"lazy.orange.elephant",也会被传递到两个队列中去。
如果消息路由键是"quick.orange.fox"将会只被发送到Q1队列
如果消息路由键是"lazy.brown.fox"将会只被发送到Q2队列
如果消息路由键是"lazy.pink.rabbit"它将只被发送到Q2队列,而且只有一次!
如果消息路由键是"quick.brown.fox"它没有匹配任何的绑定,所以他会被丢弃。
如果我们违反了设置,用一个单词或者四个单词发送一条信息。
比如“orange”或”quick.orange.male.rabbit“会怎么样???
答案是,这些消息不匹配任何绑定,将会丢失。
但是!!!如果是“lazy.orange.male.rabbit”,尽管他是四个单词,但是它和Q2的绑定匹配。所以它将会被传送到第二个队列。
Topic exchange 小知识
Topic exchange很好很强大,行为可以表现的和其他的交换类型一样,也可以有自己的扩展。
当队列与"#"绑定键绑定的时候,它将会化身为fanout类型,接受所有消息。我们就可以代替fanout类型进行使用了。
当特殊字符"*"和"#"没有在绑定中使用的话,那么topic exchange就会化身为direct exchange。
嗯~~~topic完成双杀!
完整代码
我们将在我们的日志系统里使用一个topic exchange。我们将会从一个工作的假设开始,也就是日志的路由键会有两个词:"<设施>"."<严重性>"
还有代码几乎和前一期一模一样!所以注意点小改动啦~
EmitLogTopic.java
public class EmitLogTopic {
//设置交换器的名字
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
//获取连接
connection = ConnectionUtil.getConnection();
//创建通道
channel = connection.createChannel();
//声明交换器,给它名字,设置交换类型为direct
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//代码里手动设置路由键——RoutingKey
String routingKey = getRouting(args);
//待传递的消息内容
String message = getMessage(args);
//发送消息
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("[x] Sent '"+routingKey+"':'"+message+"'");
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭连接
if (connection!=null){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static String getRouting(String[] strings) {
if (strings.length<1){
return "anonymous.info";
}
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length<2){
return "hello world";
}
return joinStrings(strings," ",1);
}
private static String joinStrings(String[] strings, String delimiter,int startIndex) {
int length = strings.length;
if (length == 0){
return "";
}
if (length<startIndex){
return "";
}
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex+1 ; i < length; i++){
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
ReceiveLogsTopic.java
public class ReceiveLogsTopic {
//设置交换器的名字
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException {
Connection connection = null;
Channel channel = null;
//获取连接
connection = ConnectionUtil.getConnection();
//创建通道
channel = connection.createChannel();
//声明交换器,给它名字,设置交换类型为topic
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//得到队列的名字
String queueName = channel.queueDeclare().getQueue();
//截一下输入错误的情况
if (args.length<1){
System.err.println("Usage: ReceiveLogsTopic [binding_key]");
System.exit(1);
}
String wholeSeverity = "";
//根据输入,确定程序想要收集的"RoutingKey",进行绑定。
for (String bindingKey : args){
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);
wholeSeverity = wholeSeverity + " " + bindingKey;
}
System.out.println("[*] Waiting for"+wholeSeverity+" message.To exit press CTRL+C");
//消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("[x] Received'"+envelope.getRoutingKey()+"':'"+message+"'");
}
};
channel.basicConsume(queueName,true,consumer);
}
}
这个太过于灵活,想给大家展示一下结果,但是图太多了。。。所以给大家贴一下官方的图,结合我前面几期写的就能得到结果!请大家请自行测试✧(≖ ◡ ≖✿)