RabbitMQ学习笔记(五)

重要的主题交换机

ps:使用pika Python客户端
之前的日志系统 中使用的直连交换机代替了扇型交换机。扇型交换机相当于全订阅,而直连交换机可以订阅队列感兴趣的数据。
但是在我们的日志系统中,我们不只是希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。为了实现这个目的,下面来看一下更加复杂的交换机——主题交换机。

主交换机

发送到主题交换机(topic exchange)的消息不可以携带任意的样子的路由建,他的路由键必须是一个由"."分隔开的单词列表。这些单词随便什么都可以,但是最好还是带一些和消息有关的词汇。
一个带有特定路由键的消息会被主题交换机投递到绑定与之匹配的队列。但是他的绑定键和路由键有两个特殊的应用

  • *(星号) 用来表示一个单词
  • #######(#号)用来表示任意数量的单词(0个或者多个)单词。

上图中的两个队列Q1和Q2分别绑定了不同的路由键。

  • Q1对多有的橘黄色的动物感兴趣
  • Q2对所有的兔子感兴趣
    例如:一个携带quick.orange.rabbit的消息会被分别投递到两个队列。携带azy.orange.elephant 的消息同样也会给两个队列都投递过去。

注意

主题交换机很强大,他可以完成其交换机相同的功能,当一个队列绑定键是井号的时候,这个队列就会无时消息的路由键,相当于扇型交换机。

组合代码

emit_log_topic.py

# coding:utf-8
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

receive_logs_topic.py

# coding:utf-8
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

执行下面的命令则会接收所有的日志文件:

python receive_logs_topic.py "#"

只想下面的命令,接收来自"kern"设备的日志:

python receive_logs_topic.py "hern.*"

执行下面命令绑定多个:

python receive_logs_topic.py "kern.*" "*.critical"

执行下面的命令将会发送路由键为"kern.critical"的日志

python emit_log_topic.py "kern.critical" "A critical kernel error"

通过运行结果可以看出,主题交换机是怎样将消息发送到不同的队列中去的。

待续。。。

参考文章http://rabbitmq.mr-ping.com/tutorials_with_python/[5]Topics.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,671评论 19 139
  • 远程调用(RPC) ps: 使用pika以前的笔记中,已经知道如何使用工作队列,在多个工作者Worker中分发耗时...
    嘿嘿_小于同学阅读 2,755评论 0 2
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 13,514评论 2 34
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 5,157评论 0 1
  • 以前只是把词语的解释抄下来,但是并未用方法来记忆,所以效果也不是很好: 比如 靡(1)没有 靡不有初 (2)倒下所...