rabbitmq指南

rabbitmq 简介

RabbitMQ 是一个用 erlang 开发的 AMQP(Advanced Message Queue)的开源实现,AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用和消息中间代理之间进行通讯。

AMQP 模型简介

image

消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

AMQP 是一个可编的程协议

AMQP 是一个可编程协议,某种意义上说 AMQP 的实体和路由规则是由应用本身定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议本身的操作。

这虽然能让开发人员自由发挥,但也需要他们注意潜在的定义冲突。当然这在实践中很少会发生,如果发生,会以配置错误(misconfiguration)的形式表现出来。

应用程序(Applications)声明AMQP实体,定义需要的路由方案,或者删除不再需要的AMQP实体。

Exchange

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 的代理提供了四种交换机。

Name(交换机类型) Default pre-declared names(预声明的默认名称)
Direct exchange(直连交换机) (Empty string) and amq.direct
Fanout exchange(扇型交换机) amq.fanout
Topic exchange(主题交换机) amq.topic
Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)

下列示例代码可参考官方文档:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

默认 exchanage

exchanage 可以使用空字符串代替,消息会根据指定的 routing_key 分发到与 routing_key 同名的队列。

image

消息发送端 python 代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = "Hello World!"

count = 1

while True:
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=str(count) + message,
                          properties=pika.BasicProperties(
                              delivery_mode=2
                          ))

    count += 1
    print("[x] Send %d %r" % (count, message))
    time.sleep(1)

消息接收端 python 代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print('[*] Waiting for message. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] Worker1 Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(5)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

Direct exchange(直连交换机)

消息发送到与 exchange 绑定的 queue 上,且 routing_key 必须精确匹配。

image

发布端 python 代码:

import pika
import time
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 定义一个 Direct 类型的 exchanage
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 发送消息给 exchange,如果 queue 不存在消息会丢失
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

print ("[x] Sent %r:%r" % (severity, message))
connection.close()

接受端 python 代码:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

channel.queue_declare(queue='aaaa', durable=True)


severities = sys.argv[1:]
if not severities:
    sys.exit(1)

for severity in severities:
    # exchange 跟 queue 绑定,且 routing_key 必须精确匹配才能接收
    channel.queue_bind(exchange='direct_logs',
                       queue='aaaa',
                       routing_key=severity)

print (' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):

    print (" [x] %r:%r" % (method.routing_key, body,))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue='aaaa')

channel.start_consuming()

Fanout exchange(扇型交换机)

将消息发送给绑定到 exchange 上的所有 queue。就是发布/订阅模式。

image

发布端 python 代码:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 定义一个 faout 类型的 exchanage
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

count = 1

message = 'Hello World!'

while True:
    # faout 会忽略 routing_key,所以这里为空
    channel.basic_publish(
        exchange='logs',
        routing_key='',
        body=str(count) + message
    )

    count += 1

    print("[x] Sent %d %r" %(count, message))

    time.sleep(3)

接收端 python 代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# exchange 与 queue 绑定
channel.queue_bind(exchange='logs',
                   queue=queue_name)

def callback(ch, method, properties, body):
    print("[x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Topic exchange(主题交换机)

与 Direct exchange 类似,routing_key 可使用通配符匹配。

image

安装向导

这里直接使用二进制包的方式安装:https://www.rabbitmq.com/install-generic-unix.html

  1. 系统需要安装 Erlang
  2. 内核参数系统 limits 调整。
    • 设置打开文件最大数,推荐至少 65536
  3. 下载二进制包 rabbitmq-server-generic-unix-3.7.8.tar.xz
  4. 解压至 /user/local 并将 sbin 目录添加至 $PATH 中。
  5. 启动,默认的数据目录在 ./var 下,sbin/rabbitmq-server 或者 sbin/rabbitmq-server -detached 后台运行。
  6. 停止,sbin/rabbitmqctl shutdown
  7. 配置文件 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf
  8. 开启 WEB UI rabbitmq-plugins enable rabbitmq_management

rabbitmq 相关 文件&目录 路径

https://www.rabbitmq.com/relocate.html

你可以通过环境变量来设置 rmq 相关文件或目录的位置,但是大多数情况下使用默认的即可。

Deb/RPM 包安装的情况下(${install_prefix} 为空)

Name Location
RABBITMQ_BASE (Not used - Windows only)
RABBITMQ_CONFIG_FILE ${install_prefix}/etc/rabbitmq/rabbitmq
RABBITMQ_MNESIA_BASE ${install_prefix}/var/lib/rabbitmq/mnesia
RABBITMQ_MNESIA_DIR RABBITMQ_MNESIA_BASE/RABBITMQ_NODENAME
RABBITMQ_LOG_BASE ${install_prefix}/var/log/rabbitmq
RABBITMQ_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME.log
RABBITMQ_SASL_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME-sasl.log
RABBITMQ_PLUGINS_DIR /usr/lib/rabbitmq/plugins:$RABBITMQ_HOME/plugins
RABBITMQ_ENABLED_PLUGINS_FILE ${install_prefix}/etc/rabbitmq/enabled_plugins
RABBITMQ_PID_FILE $RABBITMQ_MNESIA_DIR.pid

二进制包安装的情况 (${RABBITMQ_HOME} 是指二进制包解压的目录)

Name Location
RABBITMQ_BASE (Not used)
RABBITMQ_CONFIG_FILE $RABBITMQ_HOME/etc/rabbitmq/rabbitmq
RABBITMQ_MNESIA_BASE $RABBITMQ_HOME/var/lib/rabbitmq/mnesia
RABBITMQ_MNESIA_DIR RABBITMQ_MNESIA_BASE/RABBITMQ_NODENAME
RABBITMQ_LOG_BASE $RABBITMQ_HOME/var/log/rabbitmq
RABBITMQ_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME.log
RABBITMQ_SASL_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME-sasl.log
RABBITMQ_PLUGINS_DIR $RABBITMQ_HOME/plugins
RABBITMQ_ENABLED_PLUGINS_FILE $RABBITMQ_HOME/etc/rabbitmq/enabled_plugins
RABBITMQ_PID_FILE $RABBITMQ_MNESIA_DIR.pid

备份&恢复

https://www.rabbitmq.com/backup.html#rabbitmq-definitions

rmq 中有两种类型的数据:

  • definitions (metadata, schema/topology)【Users, vhosts, queues, exchanges, bindings, runtime parameters all fall into this category.】
  • message 数据

exporting definitions

有两种方式:

  • rabbitmqadmin export rabbit.definitions.json (# => Exported configuration for localhost to "rabbit.config")
  • GET /api/definitions,需要开启 rabbitmq_management 插件。

import definitions

有两种方式:

  • rabbitmqadmin -q import rabbit.definitions.json
  • POST /api/definitions

手动备份

  1. 通过 rabbitmqctl eval 'rabbit_mnesia:dir().' 查出数据目录并备份,如果是备份 message 数据,则需要将 node 停止,如果集群中队列是 mirror 则需要将整个集群停止。
  2. (可选)如果 node 的名字改变了需要使用 rabbitmqctl rename_cluster_node <oldnode> <newnode>

手动恢复

  1. 将上诉备份的目录拷贝至相应的目录。

集群

https://www.rabbitmq.com/clustering.html

Virtual hosts, exchanges, users, and permissions are automatically mirrored across all nodes in a cluster. Queues may be located on a single node, or mirrored across multiple nodes. A client connecting to any node in a cluster can see all queues in the cluster, even if they are not located on that node.

通常一些分布式系统会有 master 跟 node 节点,但是 rabbitmq 并不是这样。在集群中所有节点都是平等的 (equal peer) 。集群中的节点使用 /var/lib/rabbitmq/.erlang.cookie 来允许它们彼此之间通讯,改文件权限必须是 600

前提

  1. 集群中的各个节点使用域名通讯,确保各个节点之间的 hostname 都能够解析。

  2. 以下端口确保互通:

    • 4369: epmd, a peer discovery service used by RabbitMQ nodes and CLI tools
    • 5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS
    • 25672: used for inter-node and CLI tools communication (Erlang distribution server port) and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). Unless external connections on these ports are really necessary (e.g. the cluster uses federation or CLI tools are used on machines outside the subnet), these ports should not be publicly exposed. See networking guide for details.
    • 35672-35682: used by CLI tools (Erlang distribution client ports) for communication with nodes and is allocated from a dynamic range (computed as server distribution port + 10000 through server distribution port + 10010). See networking guide for details.
    • 15672: HTTP API clients, management UI and rabbitmqadmin (only if the management plugin is enabled)
    • 61613, 61614: STOMP clients without and with TLS (only if the STOMP plugin is enabled)
    • 1883, 8883: (MQTT clients without and with TLS, if the MQTT plugin is enabled
    • 15674: STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
    • 15675: MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)

手动创建集群

三个节点名字为 rabbit1 rabbit2 rabbit3

  1. 分别启动三台 rabbitmq

    rabbit1$ rabbitmq-server -detached
    rabbit2$ rabbitmq-server -detached
    rabbit3$ rabbitmq-server -detached
    
  2. 将其他两个节点(如:rabbit2 rabbit3)加入 rabbit1 组成集群

    # rabbit2 节点操作
    rabbit2$ rabbitmqctl stop_app
    Stopping node rabbit@rabbit2 ...done.
    
    rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
    Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
    
    rabbit2$ rabbitmqctl start_app
    Starting node rabbit@rabbit2 ...done.
    
    # rabbit3 节点操作
    rabbit3$ rabbitmqctl stop_app
    Stopping node rabbit@rabbit3 ...done.
    
    rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
    Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
    
    rabbit3$ rabbitmqctl start_app
    Starting node rabbit@rabbit3 ...done.
    
  3. 查看集群状态

    rabbit1$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit1 ...
    [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
     {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]
    ...done.
    
  4. 重启节点,已加入集群的节点可以任意重启,当节点恢复后它就会从其他节点同步数据。

    • 当一个节点重启时,他会联系对等体 10 次,每次超时 30s,如果通讯成功,则启动成功,并且同步对等体数据。
    • 当一个节点关闭时没有其他的对等体了(最后一个关闭的节点),它启动的时候不会作为一个独立节点,它将等待对等体加入。
    • 当所有节点关闭后,集群也就关闭了。当再次启动一个节点,在指定的时间内(默认为 5min, 可以通过下面的配置文件修改),其他节点再次启动就会自动加入原来的集群。
      # wait for 60 seconds instead of 30
      mnesia_table_loading_retry_timeout = 60000
      
      # retry 15 times instead of 10
      mnesia_table_loading_retry_limit = 15
      

rabbitmq-peer-discovery-k8s 插件自动创建集群

https://github.com/rabbitmq/rabbitmq-peer-discovery-k8s

可以使用 helm 的 rabbitmq-ha chart 在 kubernetes 集群中快速部署一套 rabbitmq cluster.

参考

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 什么叫消息队列? 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复...
    Agile_dev阅读 6,932评论 0 24
  • 本文大纲 RabbitMQ 历史 RabbitMQ 应用场景 RabbitMQ 系统架构 RabbitMQ 基本概...
    Java_Explorer阅读 16,754评论 1 40
  • 整体架构 部署步骤 基于 Docker 基本概念内存节点只保存状态到内存,例外情况是:持久的 queue 的内容将...
    mvictor阅读 14,370评论 5 30
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,073评论 19 139
  • RabbitMQ采用Erlang编写,需安装语言库才能运行RabbitMQ代理服务器。AMQP:高级消息队列协议。...
    JAVA觅音阁阅读 9,304评论 0 7

友情链接更多精彩内容