rabbitmq使用

RabbitMQ概述

参考博客

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法;

RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言)编写的,可复用的企业级消息系统;

AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;

支持主流操作系统:Linux、Windows,MacOX等;

支持多种客户端开发语言:Java、Python、Ruby、.NET,PHP、C/C++、Node.js等

2.RabbitMQ安装

centos上安装

1.安装前准备

  wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
  rpm -ivh epel-release-6-8.noarch.rpm
  wget -P /etc/yum.repos.d/ http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo 
  yum clean all 
  yum -y install erlang

2.安装rabbitmq

 rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc 
 wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.8.5/rabbitmq-server-2.8.5-1.noarch.rpm 
 rpm -ivh rabbitmq-server-2.8.5-1.noarch.rpm

3.启动rabbitmq并设置开机启动

systemctl start rabbitmq-server
chkconfig rabbitmq-server on

4.检查rabbitmq是否启动

ps aux|grep rabbitmq    或者    systemctl status rabbitmq-server

5.开启web管理页面

rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

6.防火墙开放15672端口

/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save

mac

brew install rabbitmq

windows安装

2.基本使用

1.启动rabbitmq-server

2.在rabbitmq server上创建一个用户
rabbitmqctl add_user zou 123

3.配置权限,允许从外面访问
rabbitmqctl set_permissions -p / zou ".*" ".*" ".*"

4.查看当前队列
rabbitmqctl list_queues

基本模型

producer

import pika

# 和本地服务器建立连接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 声明队列名
channel.queue_declare(queue="hello")

# 在 RabbitMQ 中,消息并不能直接发送到队列中,而总会被传递给代理(exchange)
# 我们可以在参数中将 exchange 设定为空字符串,
# 则可以使用一个默认的 exchange。这个 exchange 非常特殊:
# 允许我们指定使用哪个队列,我们可以使用 routing_key 参数指定队列名称:
channel.basic_publish(
    exchange="",
    routing_key="hello",
    body="fuck you",
)

print("Send 'fuck you' ")

# 关闭连接
connection.close()

consumer

import pika

credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 队列连接通道
channel.queue_declare(queue="hello")


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(callback,  # 取到消息后,调用callback 函数
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 阻塞模式

Work Queues

消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。

channel.queue_declare(queue="hello3",durable=True)
channel.basic_publish(
    exchange="",
    routing_key="hello3",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 使消息持久化,rabbitmq挂了,队列消息仍在
    )
)

消息公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

[图片上传失败...(image-d63693-1526605794569)]

消息持久化+公平分发的完整代码

Producer

import pika,time

# 和本地服务器建立连接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 声明队列名
channel.queue_declare(queue="hello3",durable=True) # 队列持久化

import sys
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(
    exchange="",
    routing_key="hello3",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 使消息持久化,rabbitmq挂了,队列消息仍在
    )
)
print("[p] Send %s " % (message))

# 关闭连接
connection.close()

consumer

import pika,time

credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 队列连接通道
# channel.queue_declare(queue="hello3")

def callback(ch, method, properties, body):
    print(" [x] Received %s" % body)
    time.sleep(6)
    print(" [x] Done")
    # print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,  # 取到消息后,调用callback 函数
                      queue='hello3',
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 阻塞模式

Publish\Subscribe(消息发布\订阅)

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

广播模式

Prodcer

import pika,time

# 和本地服务器建立连接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 声明队列名
# channel.queue_declare(queue="hello3",durable=True)
channel.exchange_declare(exchange='logs',type='fanout') # 广播模式

import sys
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(
    exchange="logs",
    routing_key="",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 使消息持久化,rabbitmq挂了,队列消息仍在
    )
)
print("[p] Send %s " % (message))

# 关闭连接
connection.close()

consumer

import pika,time

credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 队列连接通道

#不指定queue名字,rabbit会随机分配一个名字,
# exclusive=True会在使用此queue的消费者断开后,自动将queue删除
channel.exchange_declare(exchange='logs',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)

def callback(ch, method, properties, body):
    print(" [x] Received %s" % body)
    time.sleep(1)
    print(" [x] Done")
    # print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 代表消费完毕

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,  # 取到消息后,调用callback 函数
                      queue=queue_name,
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 阻塞模式

组播

Producer

__author__ = 'Administrator'
import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

channel.exchange_declare(exchange='direct_log',type='direct')

log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange='direct_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

consumer

__author__ = 'Administrator'
import pika,sys
credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)

log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)


for level in log_levels:
    channel.queue_bind(exchange='direct_log',
                       queue=queue_name,
                       routing_key=level) #绑定队列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)

channel.start_consuming()

Topic播

producer

import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

channel.exchange_declare(exchange='topic_log',type='topic')

#log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level =  sys.argv[1] if len(sys.argv) > 1 else 'all.info'

message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"

channel.basic_publish(exchange='topic_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

consumer

import pika,sys
credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = queue_obj.method.queue


log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)


for level in log_levels:
    channel.queue_bind(exchange='topic_log',
                       queue=queue_name,
                       routing_key=level) #绑定队列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)

channel.start_consuming()

Remote procedure call (RPC)

Rpc-Server

__author__ = 'Administrator'

#1 。 定义fib函数
#2. 声明接收指令的队列名rpc_queue
#3. 开始监听队列,收到消息后 调用fib函数
#4 把fib执行结果,发送回客户端指定的reply_to 队列
import subprocess
import pika
import time
credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

channel.queue_declare(queue='rpc_queue2')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result = cmd_obj.stdout.read() + cmd_obj.stderr.read()

    return result


def on_request(ch, method, props, body):
    cmd = body.decode("utf-8")

    print(" [.] run (%s)" % cmd)
    response = run_cmd(cmd)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #队列
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=response)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='rpc_queue2')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

Rpc-Client

# 1.声明一个队列,作为reply_to返回消息结果的队列
# 2.  发消息到队列,消息里带一个唯一标识符uid,reply_to
# 3.  监听reply_to 的队列,直到有结果
import queue

import pika
import uuid

class CMDRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials('zou', '123')
        parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue #命令的执行结果的queue

        #声明要监听callback_queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        """
        收到服务器端命令结果后执行这个函数
        :param ch:
        :param method:
        :param props:
        :param body:
        :return:
        """
        if self.corr_id == props.correlation_id:
            self.response = body.decode("gbk") #把执行结果赋值给Response

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4()) #唯一标识符号
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue2',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))


        while self.response is None:
            self.connection.process_data_events()  # 检测监听的队列里有没有新消息,如果有,收,如果没有,返回None
            #检测有没有要发送的新指令
        return self.response

cmd_rpc = CMDRpcClient()

print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ipconfig')

print(response)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,092评论 3 51
  • 本文大纲 RabbitMQ 历史 RabbitMQ 应用场景 RabbitMQ 系统架构 RabbitMQ 基本概...
    Java_Explorer阅读 16,287评论 1 40
  • 消息队列是后台开发常用的中间件,使用消息队列有下列好处:1、可以使系统异步化,降低响应时间;2、减少不同模块的耦合...
    millions_chan阅读 5,525评论 0 5