rabbitMQ

RabbitMQ安装

我的系统版本:

[root@jinbo ~]#cat /etc/issue
CentOS release 6.5 (Final)
  1. 安装epel库:EPEL 是yum的一个软件源,里面包含了许多基本源里没有的软件。
wget  http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
rpm -ivh epel-release-6-8.noarch.rpm
yum repolist    #看到epel,说明安装成功了

2.安装erlang,rabbitmq

yum install erlang -y    #rabbitmq是erlang语言开发的
yum install rabbitmq-server -y
  1. service rabbitmq-server start 默认端口5672

  2. 启用维护插件:rabbitmq-plugins enable rabbitmq_management
    界面 http://ip:15672/ 用户名密码 guest
    无法登陆解决方法: vi /etc/rabbitmq.config 写入信息,
    [{rabbit, [{loopback_users, []}]}]. 注意 . 一定要有,保存
    service rabbitmq-server restart(如果重启出现 错误 请把楼上的配置文件保存Ansi 编码)

Python操作RabbitMQ

基本用法

发布端:

import pika

#创建一个基本的socket连接对象
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='192.168.1.200')
)

channel = connection.channel()  #创建一个管道对象

#声明queue
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

connection.close()

接收端:

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.200'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("Received %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端主动确认


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

channel.start_consuming()
no-ack
  • no-ack=False 表示消费完以后不主动把状态通知rabbitmq
  • no-ack=True 当程序断开将丢掉消息

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed,
or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

  • 回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume中的no_ack=False
消息持久化

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

channel.queue_declare(queue='hello', durable=True)

This queue_declare change needs to be applied to both the producer and consumer code.

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                      delivery_mode = 2,       # make message persistent
                      ))

生产者和消费者端:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.200"))
channel = connection.channel()
channel.queue_declare(queue='cc', durable=True)  #如果有cc的队列,略过;如果没有,创建cc的队列(持久化队列)

channel.basic_publish(exchange='',
                      routing_key='cc',
                      body='hello world!!!',
                      properties=pika.BasicProperties(delivery_mode=2))  #消息持久化
connection.close()
import pika

connection =pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.200"))
channel = connection.channel()
channel.queue_declare(queue='cc', durable=True)

def callback(ch, method, properties, body):
    print('Received %r' %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue='cc')
channel.start_consuming()

查看当前队列: #rabbitmqctl list_queues (usr/sbin/rabbitmqctl)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • **2014真题Directions:Read the following text. Choose the be...
    又是夜半惊坐起阅读 9,934评论 0 23
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,408评论 2 34
  • python使用pika模块操作RabbitMQ,我们可以通过sudo pip3 install pika来安装p...
    drfung阅读 2,027评论 1 0
  • 注:这份文档是我和几个朋友学习后一起完成的。 目录 RabbitMQ 概念 exchange交换机机制什么是交换机...
    Mooner_guo阅读 33,405评论 8 97
  • 平时外出应酬,和朋友吃饭,免不了觥筹交错,喝点小酒,以致尽兴,喝了酒自然不敢开车,呼叫代驾已成为一种习惯,近一点的...
    zwj发如雪阅读 231评论 4 11