一、准备工作
1、安装easy_install
easy_install是python包setuptools的一部分,它能帮我们安装需要的python包,这里我们使用pika库来编写小程序,安装参考:easy_install的安装与使用。
2、安装pika
$ easy_install pika
PS.《玩转RabbitMQ》系列文章的实例都是在centos7+python+pika+RabbitMQ的环境上完成的,如果你的环境是windows,请自行搜索搭建环境教程。
二、安装IDE
你可以选择使用编辑器,但是我还是更喜欢装个IDE,这样还能自动补全还有报错提示,提高写程序的效率,pycharm是一个比较不错的python IDE,安装教程可以参考: CentOS 7安装Pycharm 简记,如果你需要注册码,可以点击链接IntelliJ IDEA 注册码。
三、开始小程序
终于要动手写代码了,心情还有点小激动,你可以专门创建一个目录来放置代码,并且为每个章节创建一个目录,这样到后面当你需要回顾前面的章节和例子时,方便查找。
1、生产者
代码如下:
#coding=utf-8
import pika, sys
# 1. Built connect to proxy server
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials = credentials))
# 2. Get channel
channel = connection.channel()
# 3. Declare exchanger
channel.exchange_declare(exchange="hello-exchange",
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
# 4. Create message
msg = sys.argv[1]
msg_props = pika.BasicProperties()
msg_props.content_type = "text/plain"
# 5. Release message
channel.basic_publish(exchange='hello-exchange',
routing_key='hola',
body=msg,
properties=msg_props)
下面逐行解读代码:
第一行声明了编码为UTF-8,如果不加则打印中文时可能会出现乱码。
#coding=utf-8
第二行引入了需要的包。
import pika, sys
RabbitMQ默认有一个用户名为guest,密码为guest的用户,我们使用这个用户来连接本地的MQ代理服务器。
# 1. Built connect to proxy server
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials = credentials))
成功连接上后就会创建一个AMQP的TCP连接,通过这个连接来获取信道。
# 2. Get channel
channel = connection.channel()
接下来声明一个交换器,交换器名称是hello-exchange,交换器类型是direct,这里要注意一下,RabbitMQ 3.0之前的版本声明交换器类型用的属性是type(即type="direct"),3.0之后的版本的属性值改为exchange_type,需要根据使用MQ版本来调整代码,否则程序运行会报错;passive属性如果设为true,那么会去检测交换器是否已创建,有则成功返回,无则失败报错,这里我们是想创建交换器,所以设为false;durable属性设为True则会创建一个持久化的交换器;auto_delete属性设为False则交换器不会被自动删除,一般跟durable配合使用。这样就创建了一个交换器名为hello-exchange,类型为direct,可持久化不会自动删除的交换器。
# 3. Declare exchanger
channel.exchange_declare(exchange="hello-exchange",
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
生产者会发布消息,这里从输入获取消息内容,输入就是执行py文件时传入的参数,并且设置了消息内容类型是text/plain。
# 4. Create message
msg = sys.argv[1]
msg_props = pika.BasicProperties()
msg_props.content_type = "text/plain"
最后一步,将消息发布到交换器上,并声明绑定规则(或者称路由键)为“hola”,消息内容和属性分别通过body和properties设置。
# 5. Release message
channel.basic_publish(exchange='hello-exchange',
routing_key='hola',
body=msg,
properties=msg_props)
2、消费者
import pika
# 1. Build connect to proxy server
credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters("localhost", credentials=credentials)
conn_broker = pika.BlockingConnection(conn_params)
# 2. Get channel
channel = conn_broker.channel()
# 3. Declare exchanger
channel.exchange_declare(exchange="hello-exchange",
exchange_type="direct",
passive=False,
durable=True,
auto_delete=False)
# 4. Declare queue and Bind queue with exchanger by routing_key
channel.queue_declare(queue="hello-queue")
channel.queue_bind(queue="hello-queue",
exchange="hello-exchange",
routing_key="hola")
# 5. Declare function to deal with info
def msg_consumer(channel, method, header, body):
channel.basic_ack(delivery_tag=method.delivery_tag)
if body == "quit":
channel.basic_cancel(consumer_tag="hello-consumer")
channel.stop_consuming()
else:
print body
return
channel.basic_consume(msg_consumer,
queue="hello-queue",
consumer_tag="hello-consumer")
channel.start_consuming()
连接代理服务器,获取信道,声明交换器,前面三步的代码跟生产者完全一样,第四步声明名字叫hello-queue的队列,并将队列绑定到交换器hello-exchange,路由键是"hola",这样上面生产者发布的消息就能到达消费者了,如果生产者发布的消息到达交换器时找不到匹配路由键的队列,那消息将进入“黑洞”。
# 4. Declare queue and Bind queue with exchanger by routing_key
channel.queue_declare(queue="hello-queue")
channel.queue_bind(queue="hello-queue",
exchange="hello-exchange",
routing_key="hola")
来到最后一步,前面定义了一个函数,当消息到达消费者时,函数会被消费者调用,函数被调用时,先通过信道向MQ确认收到了消息,接着输出消息内容,为了能让生产者控制消费者关闭信道停止消费,假如消息内容为"quit",则消费者关闭信道停止消费;将消费者连接到信道上并开启消费,这样就开始监听信道里的消息了。
channel.basic_consume(msg_consumer,
queue="hello-queue",
consumer_tag="hello-consumer")
channel.start_consuming()
四、执行小程序
1、进入执行目录,启动RabbitMQ服务
[root@localhost sbin]# rabbitmq-server -detached
Warning: PID file not written; -detached was passed.
PS.如果consumer启动失败报错,如下图所示:
[root@localhost chapter-2]# python hello_world_consumer.py
Traceback (most recent call last):
File "hello_world_consumer.py", line 35, in <module>
consumer_tag="hello-consumer")
TypeError: basic_consume() got multiple values for keyword argument 'queue'
证明安装的pika版本可能太新了,可覆盖安装一个低版本的pika
[root@localhost ~]# pip install pika==0.12
Collecting pika==0.12
Downloading https://files.pythonhosted.org/packages/bf/48/72de47f63ba353bacd74b76bb65bc63620b0706d8b0471798087cd5a4916/pika-0.12.0-py2.py3-none-any.whl (108kB)
100% |████████████████████████████████| 112kB 46kB/s
Installing collected packages: pika
Found existing installation: pika 1.0.1
Uninstalling pika-1.0.1:
Successfully uninstalled pika-1.0.1
Successfully installed pika-0.12.0
还有另外一种方式是按照新版本pika的语法改代码,参考:
https://cloud.tencent.com/developer/article/1445712
2、启动消费者
[root@localhost chapter-2]# ll
总用量 12
-rwxrwxrwx. 1 root root 1251 8月 25 14:17 hello_world_confirm_producer.py
-rwxrwxrwx. 1 root root 1221 8月 19 22:56 hello_world_consumer.py
-rwxrwxrwx. 1 root root 823 8月 20 00:24 hello_world_producer.py
[root@localhost chapter-2]# python hello_world_consumer.py
3、生产者发布消息
[root@localhost chapter-2]# python hello_world_producer.py hello-world
4、查看消费者终端输出
[root@localhost chapter-2]# ll
总用量 12
-rwxrwxrwx. 1 root root 1251 8月 25 14:17 hello_world_confirm_producer.py
-rwxrwxrwx. 1 root root 1221 8月 19 22:56 hello_world_consumer.py
-rwxrwxrwx. 1 root root 823 8月 20 00:24 hello_world_producer.py
[root@localhost chapter-2]# python hello_world_consumer.py
hello-world
5、让消费者停止消费退出
(1)生产者发布消息
[root@localhost chapter-2]# python hello_world_producer.py quit
(2)消费者收到消息后退出
[root@localhost chapter-2]# ll
总用量 12
-rwxrwxrwx. 1 root root 1251 8月 25 14:17 hello_world_confirm_producer.py
-rwxrwxrwx. 1 root root 1221 8月 19 22:56 hello_world_consumer.py
-rwxrwxrwx. 1 root root 823 8月 20 00:24 hello_world_producer.py
[root@localhost chapter-2]# python hello_world_consumer.py
hello-world
[root@localhost chapter-2]#
第一个hello-world的小程序成功了!!!