RabbitMQ pika简单使用

MQ 全称为 Message Queue, 是一种应用程序对应用程序的通信方法。MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ 遵循了 AMQP 协议的具体实现和产品。

整篇代码撸下来预计耗时1.5h。

参考博主:anzhsoft
官网


闲聊
  • 什么是 RabbitMQ ?
  • 为什么要用 RabbitMQ ?
  • RabbitMQ 怎么用?

一、What RabbitMQ?

MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ 和 JMS 类似,但不同的是 JMS 是 SUN JAVA 消息中间件服务的一个标准和API定义,而 MQ 则是遵循了 AMQP协议的具体实现和产品。

这个系统架构图版权属于sunjun041640
  • 1、RabbitMQ Server
    • RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是维护一条从 Producer 到 Consumer 的路线。
  • 2、Exchange
    • where producers publish their messages(发送方发布消息的地方)
  • 3、Queue
    • where the messages end up and are received by consumers(接收方获取消息的地方)
  • 4、Producer(Client A, Client B)
    • 数据的发送方
  • 5、Consumer(Client 1, Client 2, Client 3)
    • 数据的接收方
  • 6、Connection
    • 就是一个 TCP 的连接。Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个 TCP 连接。
  • 7、Channel
    • 虚拟连接。它建立在上述的 TCP 连接中。数据流动都是在 Channel 中进行的。也就是说,一般情况是程序起始建立TCP 连接,第二步就是建立这个 Channel
  • 8、Bindings
    • Bindings are how the messages get routed from the exchange to particular queues.
二、Why RabbitMQ

对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块如何通信呢?可以使用 socket,或者 url 请求,但是还有很多问题需要解决,如:

  • 1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?
  • 2)如何降低发送者和接收者的耦合度?
  • 3)如何让 Priority 高的接收者先接到数据?
  • 4)如何做到 load balance?有效均衡接收者的负载?
  • 5)如何有效的将数据发送到相关的接收者?也就是说将接收者 subscribe 不同的数据,如何做有效的 filter。
  • 6)如何做到可扩展,甚至将这个通信模块发到 cluster 上?
  • 7)如何保证接收者接收到了完整,正确的数据
    AMDQ 协议解决了以上的问题,而 RabbitMQ 实现了 AMQP
三、How RabbitMQ

一套 MQ 完整流程如下:

首先将 RabbitMQ 服务启动

Producer

  • 1、创建一个 connection
  • 2、在 connection 基础上创建一个频道 channel
  • 3、在频道 channel 上声明一个 exchange,参数为 exchange 的类型和名称
  • 4、在频道 channel 上发布消息,参数为 exchange 的名称以及路由 (routing_key) 以及消息体
  • 5、关闭 connection

Consumer

  • 1、创建一个 connection
  • 2、在 connection 基础上创建一个 channel
  • 3、在频道 channel 上声明一个 exchange,参数为 exchange 的类型和名称
  • 4、在频道 channel 上声明一个 queue
  • 5、将 queue 绑定到声明的 exchange 上
  • 6、channel 开始监听对应 queue 上的消息,同时设置获取消息的回调
四、example
  • 1、直接使用 queue ,难度等级 ★☆☆☆☆

producer.py

import pika  

if __name__ == '__main__':
    # 创建一个connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()
    
    # 声明一个queue
    channel.queue_declare(queue='hello')  
    
    # exchange为空的时候,routing_key就是指定的queue值
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')  
    print(" [x] Sent 'Hello World!'")
    # 关闭连接
    connection.close()

consumer.py

import pika  

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % (body,))

if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    # 创建频道
    channel = connection.channel()  
    # 声明queue
    channel.queue_declare(queue='hello')  
    
    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 收到指定消息的回调设置 
    channel.basic_consume(callback, queue='hello', no_ack=True)  
    # 开始循环监听 
    channel.start_consuming()
  • 2、消息确认,难度等级 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    # 创建一个 connection
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    # 创建一个 channel
    channel = connection.channel()
    # 声明一个queue
    channel.queue_declare(queue='hello')

    # 发布消息,exchange 为空的情况下,routing_key 值就是指定的 queue 名字,即将消息直接发送到指定的 queue
    channel.basic_publish(exchange='', routing_key='hello', body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    time.sleep(str(body).count('.'))
    ch.basic_ack(delivery_tag = method.delivery_tag)
        
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    channel.queue_declare(queue='hello')  
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
      
    channel.basic_consume(callback, queue='hello',no_ack=False)  
    
    channel.start_consuming()
  • 3、使用 fanout 类型 exchange,难度等级 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    channel.basic_publish(exchange='logs', routing_key='', body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(str(body).count('.'))
      
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    # 声明一个 exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    # 声明一个随机名字的 queue
    result = channel.queue_declare()  
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 获取 queue 的 name 
    queue_name = result.method.queue
    # 将 queue 绑定到 exchange
    channel.queue_bind(exchange='logs', queue=queue_name)
    # 设置监听的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()
  • 4、使用 direct 类型exchange,难度等级 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    # 声明一个 direct 类型的 exchange
    channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
    # 获取 routing_key 值
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    channel.basic_publish(exchange='logs_direct', routing_key=severity, body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import sys
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 确认消息
    ch.basic_ack(delivery_tag = method.delivery_tag)
    time.sleep(str(body).count('.'))
      
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()  
      
    # 声明一个 exchange
    channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
    # 声明一个随机名字的 queue
    result = channel.queue_declare()  

    # 设置监听的 routing_key
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 获取 queue 的 name 
    queue_name = result.method.queue
    # 将 queue 绑定到 exchange
    channel.queue_bind(exchange='logs_direct', queue=queue_name, routing_key=severity)
    # 设置监听的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()
  • 5、使用 topic 类型exchange难度等级 ★☆☆☆☆

producer.py

import pika
import sys

if __name__ == '__main__':

    message = ' '.join(sys.argv[1:]) or "Hello World!"

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
    channel = connection.channel()
    # 声明一个 topic 类型的 exchange
    channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
    # 获取 routing_key 值
    severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
    channel.basic_publish(exchange='logs_topic', routing_key=severity, body=str(message))
    print(" [x] Sent {}".format(message))
    connection.close()

consumer.py

import pika  
import sys
import time

def callback(ch, method, properties, body):  
    print(" [x] Received %r" % body)
    # 确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(str(body).count('.'))
  
if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
    channel = connection.channel()

    # 声明一个 exchange
    channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
    # 声明一个随机名字的 queue
    result = channel.queue_declare()  

    # 设置监听的 routing_key
    severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 获取 queue 的 name 
    queue_name = result.method.queue
    # 将 queue 绑定到 exchange
    channel.queue_bind(exchange='logs_topic', queue=queue_name, routing_key=severity)
    # 设置监听的 queue
    channel.basic_consume(callback, queue=queue_name)  
    
    channel.start_consuming()
五、实现细节部分
  • 1、Exchange
    • 空:直接将消息绑定到指定 queue 处理
    • fanout:广播,所有绑定到该 exchange 的 queue 都会收到消息
    • direct:定向,所有绑定到该 exchange 并且其 routing_key 也相同的 queue 能收到消息
    • topic:主题,所有绑定到该 exchange 并且 routing_key 符合其匹配的 queue 能收到消息。匹配规则如下:* (星号) 代表任意 一个单词,# (hash) 0个或者多个单词。即 111.111.111 是和 *.111.111、#.111 匹配的。
六、RabbitMQ 常用指令
  • 1、服务器的启动与关闭·
启动: rabbitmq-server –detached

关闭: rabbitmqctl stop

若单机有多个实例,则在 rabbitmqctlh 后加 –n 指定名称
  • 2、获取服务器状态
服务器状态:rabbitmqctl status
  • 3、常用的命令
查看所有的消息队列
rabbitmqctl list_queues

清除所有队列
abbitmqctl reset

启动应用
rabbitmqctl start_app

关闭应用
rabbitmqctl stop_app

查看所有的 Exchanges
rabbitmqctl list_exchanges 

查看所有的 bindings
rabbitmqctl list_bindings
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,093评论 3 51
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,995评论 3 41
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,340评论 0 1