python实现RabbitMq

接下来的一个任务需要用到消息队列,公司用的rabbitmq,然后就对它进行了研究,记录下学习路程,编程语言选择的是python。

感谢大神:Itoddy
大神的源码:https://github.com/ltoddy/rabbitmq-tutorial

  1. 初识RabbitMq
  • 首先我们先创建01-send.py文件,用于发送消息队列。
# pip install pika 安装
import pika

# 连接RabbitMq 默认端口5672
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# 订阅一个频道
channel = connection.channel()

# 声明一个叫hello的队列
channel.queue_declare(queue='hello')

# 消息不会直接发送到队列,先发送到交换机,exchange为空,默认交换--->允许我们准确的指定到那一个队列中,routing_key表示队列名称,body代表要发送过去的消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

print(" [x] Sent 'Hello World!'")
# 刷新网络缓冲区,连接断开
connection.close()
  • 然后创建一个接收消息的02-receive.py文件
import pika

# 连接RabbitMq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

# 订阅一个频道
channel = connection.channel()

# 确保队列存在,多次运行也只会创建一个,如果先运行接收队列的py文件,没有声明这个队列会报错。可以让它一直等待发送方发送消息。
channel.queue_declare(queue='hello')


# 接收到hello后执行的回调函数
def callback(ch, method, propertites, body):
    print(" [x] Received {}".format(body))


# 指定接收名为hello的队列, no_ack接收到后不发确认信息回去
channel.basic_consume(callback, queue='hello', no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

# 消费者开始
channel.start_consuming()
  1. 查看队列:
$ docker container exec -it rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...  
Listing queues for vhost / ...           
hello   1
task_queue      0
  1. worker默认调度方法

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分配消息的方式称为循环法。
我们尝试开启多个receive.py,然后运行send.py。发现依次循环获得消息。

  1. 消息确认

假设当其中一个消费者的任务需要处理很久,处理到一半突然挂了。RabbitMq只要把任务发出去了,将任务标记为删除状态,这样的话任务就丢失了。为了保证永不丢失,RabbitMq支持消息确认,消费者发回ack(请求)告诉RabbitMQ已经收到,处理了特定的消息,并且RabbitMQ可以自由删除它。消息确认默认是被打开的。在前面的例子中,我们通过 no_ack = True 标志明确地将它们关闭。没收到ack,超时后会重新发送这个任务到worker。

  • 修改了下send.py来进行验证
import pika
import sys

# 对RabbtMq进行连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')
# 需要发送的消息,消息为python send.py后面的参数
message = ' '.join(sys.argv[1:]) or 'Hello World'

channel.basic_publish(exchange='', routing_key='hello', body=message)
print(" [x] Sent %r" % message)
# 连接关闭
connection.close()
  • 修改了下receive.py来进行验证
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, propertites, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))  # 根据消息中.的个数来设置延迟多少秒,然后强制退出,再运行receive.py文件,会发现任务继续执行。
    print("[x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue='hello')

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  1. 消息持久性

消费者死亡,任务不会丢失,如果生产者死亡,任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它是永久的,原理就是会将任务写入到本地,如果数据量太大的话且消息丢失不是太重要的话,对性能来说的话可以考虑不用。

  • send.py
import pika
import sys

# 对RabbtMq进行连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='task_queue', durable=True)

# 需要发送的消息
message = ' '.join(sys.argv[1:]) or 'Hello World'

channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(
    delivery_mode=2))  # 消息持久化
print(" [x] Sent %r" % message)
# 连接关闭
connection.close()
  • receive.py
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, propertites, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print("[x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)  #  接收到消息后会给rabbitmq发送一个确认


channel.basic_consume(callback, queue='task_queue')

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  1. 公平派遣

在派遣任务时,假设一名工人收到的任务很忙,另一个却很轻松,rabbitmq还是会公平派遣,均匀的发送消息。发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。为了解决这个问题,我们可以在消费者那里使用basic.qos方法和设置prefetch_count = 1。直到消费者处理并确认了前一个消息才给它分配任务。

  • receive.py
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, propertites, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print("[x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 接收到消息后会给rabbitmq发送一个确认


# 消费者给rabbitmq发送一个信息:在消费者处理完消息之前不要再给消费者发送消息
channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback, queue='task_queue')

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  1. 交换机

生产者不是将消息发送给队列,而是将消息发送给交换机,由交换机决定将消息发送给哪个队列。所以exchange必须准确知道消息是要送到哪个队列,还是要被丢弃。因此要在exchange中给exchange定义规则,所有的规则都是在exchange的类型中定义的。
exchange有4个类型:direct, topic, headers ,fanout

  • fandout类型: 广播类型,生产者将消息发送给所有消费者,如果某个消费者没有收到当前消息,就再也收不到了。
  • send.py
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')  # 创建一个fanout(广播)类型的交换机exchange,名字为logs。

# 需要发送的消息
message = ' '.join(sys.argv[1:]) or 'Hello World'

# 指定交换机exchange为logs,这里只需要指定将消息发给交换机logs就可以了,不需要指定队列,因为生产者消息是发送给交换机的。
# 在fanout类型中,绑定关键字routing_key必须忽略,写空即可
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Send %r" % message)
# 连接关闭
connection.close()
  • receive.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')  # 消费者需再次声明一个exchange 以及类型。

# 创建一个队列,exclusive=True(唯一性)表示在消费者与rabbitmq断开连接时,该队列会自动删除
result = channel.queue_declare(exclusive=True)

# 因为rabbitmq要求新队列名必须是与现存队列名不同,所以为保证队列的名字是唯一的,method.queue方法会随机创建一个队列名字,如:‘amq.gen-JzTY20BRgKO-HjmUJj0wLg‘。
queue_name = result.method.queue

# 将交换机logs与接收消息的队列绑定。表示生产者将消息发给交换机logs,logs将消息发给随机队列queue,消费者在随机队列queue中取消息
channel.queue_bind(exchange='logs', queue=queue_name)

def callback(ch, method, propertites, body):
    print(" [x] Received %r" % body)

# 消费者给rabbitmq发送一个信息:在消费者处理完消息之前不要再给消费者发送消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue_name, no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  • direct:关键字类型,交换机根据生产者消息中含有的不同的关键字将消息发送给不同的队列,消费者根据不同的关键字从不同的队列取消息。
  • send.py
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建一个交换机并声明exchange的类型为:关键字类型,表示该交换机会根据消息中不同的关键字将消息发送给不同的队列
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# severity这里只能为一个字符串,这里为‘info’表明本生产者只将下面的message发送到info队列中,消费者也只能从info队列中接收info消息
severity = 'err'

# 需要发送的消息
message = ' '.join(sys.argv[1:]) or 'Hello World'

# 绑定关键字,即将message与关键字info绑定,明确将消息发送到哪个关键字的队列中。
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Send %r" % message)
# 连接关闭
connection.close()
  • receive.py
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机,命名为‘direct_logs’并声明exchange类型为关键字类型。
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 创建一个队列,exclusive=True(唯一性)表示在消费者与rabbitmq断开连接时,该队列会自动删除
result = channel.queue_declare(exclusive=True)

# 因为rabbitmq要求新队列名必须是与现存队列名不同,所以为保证队列的名字是唯一的,method.queue方法会随机创建一个队列名字,如:‘amq.gen-JzTY20BRgKO-HjmUJj0wLg‘。
queue_name = result.method.queue

severities = ['info', 'err']  # 可以接收绑定关键字info或err的消息,列表中也可以只有一个

if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)


for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)


def callback(ch, method, propertites, body):
    print(" [消费者] %r Received %r" % (method.routing_key, body))


# 消费者给rabbitmq发送一个信息:在消费者处理完消息之前不要再给消费者发送消息
channel.basic_consume(callback, queue=queue_name, no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 循环等待消息接收。
  • topic:模糊匹配类型
  • send.py
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建一个交换机并声明exchange的类型为:关键字类型,表示该交换机会根据消息中不同的关键字将消息发送给不同的队列
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = 'info.warn'

# 需要发送的消息
message = ' '.join(sys.argv[1:]) or 'Hello World'

# 绑定关键字,即将message与关键字info绑定,明确将消息发送到哪个关键字的队列中。
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Send %r" % message)
# 连接关闭
connection.close()
  • receive.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机,命名为‘topic_logs’并声明exchange类型为模糊匹配类型。
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 创建一个队列,exclusive=True(唯一性)表示在消费者与rabbitmq断开连接时,该队列会自动删除
result = channel.queue_declare(exclusive=True)

# 因为rabbitmq要求新队列名必须是与现存队列名不同,所以为保证队列的名字是唯一的,method.queue方法会随机创建一个队列名字,如:‘amq.gen-JzTY20BRgKO-HjmUJj0wLg‘。
queue_name = result.method.queue

# 绑定键。‘#’匹配所有字符,‘*’匹配一个单词。这里列表中可以为一个或多个条件,能通过列表中字符匹配到的消息,消费者都可以取到
binding_keys = ['[warn]', 'info.*']


for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)


def callback(ch, method, propertites, body):
    print(" [消费者] %r Received %r" % (method.routing_key, body))


channel.basic_consume(callback, queue=queue_name, no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 循环等待消息接收。
  1. 远程过程调用(RPC)

AMQP 0-9-1协议预定义了一组包含14个属性的消息。大多数属性很少使用,但以下情况除外:
delivery_mode:将消息标记为持久(值为2)或瞬态(任何其他值)。你可能会记得第二篇教程中的这个属性。
content_type:用于描述编码的MIME类型。例如,对于经常使用的JSON编码,将此属性设置为application/json是一种很好的做法。
reply_to:通常用于命名回调队列。
correlation_id:用于将RPC响应与请求关联起来。

  • server.py
import pika


class Center(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        self.response = body

    def request(self, n):
        self.response = None
        # 发送计算请求,并声明返回队列
        self.channel.basic_publish(exchange='',
                                   routing_key='compute_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                   ),
                                   body=str(n))
        # 接收返回的数据
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


center = Center()

print(" [x] Requesting increase(30)")
response = center.request(30)
print(" [.] Got %r" % (response,))
  • client.py
import pika

# 连接rabbitmq服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

# 定义队列
channel.queue_declare(queue='compute_queue')
print(' [*] Waiting for n')


# 将n值加1
def increase(n):
    return n + 1


# 定义接收到消息的处理方法
def request(ch, method, properties, body):
    print(" [.] increase(%s)" % (body,))

    response = increase(int(body))

    # 将计算结果发送回控制中心
    ch.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(request, queue='compute_queue')

channel.start_consuming()

服务端给客户端发送一个数,服务端对这个数加1然后返回。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,354评论 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,904评论 2 11
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,997评论 3 41
  • 本文大纲 RabbitMQ 历史 RabbitMQ 应用场景 RabbitMQ 系统架构 RabbitMQ 基本概...
    Java_Explorer阅读 16,358评论 1 40