RabbitMQ的消息模式

RabbitMQ在应用中一定包含下面三种角色

  • 一个rabbitmq-client,作为消息发出者。
  • 一个rabbitmq-client,作为消息接收者。
  • 一个rabbitmq-server,作为中间转发broker。

生产者 - 消费者模式

一个消息只能被一个消费者取走
client端充当生产者或者消费者,消息都存储在server端的消息队列中。

客户端,不管是生产者还是消费者,都要连接到server先。

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.117.175.50'))  //连接到server端

channel = connection.channel() // 

channel.queue_declare(queue='hello')   //声明server端消息队列的名称

对于生产者,向队列publish

channel.basic_publish(exchange='', routing_key='hello',  body='Hello World!') //routing_key设置为queue的名称

对于消费者,从队列中consume

def callback(ch, method, properties, body):
     ...
channel.basic_consume(callback, queue='hello', no_ack=True)   // 设计一个回调函数来处理消息。

参数delivery_mode的作用

delivery_mode = 2 意味着要publish的这条消息在server重启之后依然要不丢失。

channel.basic_qos(prefetch_count=X)的作用

在有多个消费者的情况下,默认的如果不设置prefetch_count,server会把消息轮流分给各个消费者去处理。

消费者如果设置了prefetch_count=1,那么它在处理完消息之后需要发出确认ack,
ch.basic_ack(delivery_tag = method.delivery_tag)
server只会在收到它的ack之后才会再分给它新消息。
server端挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到其设置的prefetch_count个数。

发布 - 订阅模式

多播,一个消息被所有订阅者接收
发布者可以有多个,订阅者可以有多个。

需要设置exchange
channel.exchange_declare(exchange='logs', type='fanout') // 设置exchange的类型

对于消息发布者

channel.basic_publish(exchange='logs', routing_key='', body=message)  // routing_key设置为空

对于消息订阅者

result = channel.queue_declare(exclusive=True)

channel.queue_bind(exchange='logs', queue=result.method.queue)  // 绑定到特定的queue

def callback(ch, method, properties, body):
  print(" [x] %r" % body)

channel.basic_consume(callback, queue=result.method.queue, no_ack=True)

channel.start_consuming()

消息分类的发布-订阅模式

消息是有分类的,比如log条目的级别error/warn/info。
消息根据routing_key的设定进行Routing。
发布者按照分类进行消息发布,订阅者按需进行消息订阅。

对于发布者

channel.exchange_declare(exchange='direct_logs', type='direct')
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)  // 需要设定当前消息的routing_key

对于订阅者

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
 channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)  // 需要按需绑定routing_key

消息多重分类的发布-订阅模式

消息的分类更多,还支持对分类进行正则匹配。注意通配符和别的地方不太一样。

  • . (dot) 点符号用来隔离不同级别的分类。
  • * (star) can substitute for exactly one word.
  • # (hash) can substitute for zero or more words.

对于发布者

channel.exchange_declare(exchange='topic_logs', type='topic')

channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) // 需要设定当前消息的

对于订阅者

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for binding_key in binding_keys:
  channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)  // 需要按需绑定routing_key

RPC请求 - 响应模式

将rabbitmq设计成一个RPC服务框架。这个设计很有意思。
RPC请求者是主动方。PRC服务者是被动方。

RPC请求者,首先订阅消息,回调函数是处理PRC响应值。 客户端需要主动发布消息来作为RPC请求。

RPC服务者,首先订阅消息,回调函数是处理PRC的请求,计算,并发布消息来作为PRC的响应。

服务者取出一条消息(作为RPC请求,来自于某个请求者),之后会再发一条消息(作为RPC响应)给那个请求者。
所以需要设置 properties=pika.BasicProperties(correlation_id = props.correlation_id) 以记住消息请求者的身份。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,107评论 19 139
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,138评论 3 51
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,424评论 2 34
  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,737评论 0 3
  • 对思维导图很感兴趣,自己也用了几次,感觉耳目一新,学习效果,分析效果超好。 读书时,先按照作者的思路画一张思维导图...
    爱存心间阅读 435评论 0 0