RabbitMQ快速熟悉

RabbitMQ Tutorials

一、简介

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ是消息代理:它接受并转发消息。可以将其视为邮局:将要发布的邮件放在邮箱中时,可以确保邮递员最终将邮件传递给收件人。以此类推,RabbitMQ是一个邮箱,一个邮局和一个邮递员。

img

1.组成部分

image
  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤。
    • exchange的作用就是类似路由器,routing key 就是路由键,服务器会根据路由键将消息从交换器路由到队列上去。
    • 交换机类型:(direct)1:1,(fanout)1:N,(topic)N:1
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
    • 推模式:通过AMQP的basic.consume命令订阅,有消息会自动接收,吞吐量高
    • 拉模式:通过AMQP的bsaic.get命令
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

2.消息发布接收流程

  • 发送消息

    • 生产者和Broker建立TCP连接。
    • 生产者和Broker建立通道。
      • 通道是建立在TCP连接上的虚拟连接,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID。
      • 类似概念:TCP是电缆,通道就是里面的光纤,每个光纤都是独立的,互不影响。
    • 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
    • Exchange将消息转发到指定的Queue(队列)
  • 接收消息

    • 消费者和Broker建立TCP连接
    • 消费者和Broker建立通道
    • 消费者监听指定的Queue(队列)
    • 当有消息到达Queue时Broker默认将消息推送给消费者。
    • 消费者接收到消息。

二、安装RabbitMQ服务器

Downloading and Installing RabbitMQ

安装RabbitMQ需要运行Erlang/OTP,并保持版本匹配。

支持的distribution list

  • focal for Ubuntu 20.04
  • bionic for Ubuntu 18.04
  • xenial for Ubuntu 16.04
  • buster for Debian Buster
  • stretch for Debian Stretch

component是指Erlang/OTP 版本

  • 23.x
  • 22.3.x
  • 21.3.x
  • 20.3.x
  • master (24.x)
  • R16B03 (16.x)

1.在ubuntu下直接安装

sudo apt-get update -y
sudo apt-get install curl gnupg -y

#安装RabbitMQ签名密钥 
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -

# 使用密钥服务器
sudo apt-key adv --keyserver "hkps://keys.openpgp.org" --recv-keys "0x0A9AF2115F4687BD29803A206B73A36E6026DFCA"

sudo apt-get install apt-transport-https

# 添加源列表文件
sudo vim /etc/apt/sources.list.d/bintray.erlang.list
    ## Installs the latest Erlang 23.x release.
    ## Change component to "erlang-22.x" to install the latest 22.x version.
    ## "bionic" as distribution name should work for any later Ubuntu or Debian release.
    ## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
    deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang
    ## Installs latest RabbitMQ release
    deb https://dl.bintray.com/rabbitmq/debian bionic main
    
# 安装rabbitmq-server软件包
sudo apt-get update -y
sudo apt-get install rabbitmq-server -y --fix-missing

2.使用docker安装【推荐】

docker hub镜像仓库地址

  • rabbitmq:镜像未配有控制台

  • rabbitmq:management:镜像配有控制台

# 拉取RabbitMQ镜像
$ sudo docker pull rabbitmq:management
Status: Downloaded newer image for rabbitmq:management
docker.io/library/rabbitmq:management

$ sudo docker images |grep rabbit
rabbitmq                                                  management                                 5726af297dd4        5 days ago          187MB

# 将RabbitMQ 镜像放入容器中(docker create), 然后将容器启动(docker start)
$ sudo docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
    # d 后台运行容器;
    # --name 指定容器名;
    # -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);

$ sudo docker ps
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                                                         NAMES
85a2ce7d3dfb        rabbitmq:management   "docker-entrypoint.s…"   6 seconds ago       Up 5 seconds        4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   rabbitmq

# 停止 RabbitMQ 容器
$ sudo docker stop rabbitmq

# 启动 RabbitMQ 容器
$ sudo docker start rabbitmq

控制台信息

  • 启动容器后,可以浏览器中访问http://localhost:15672来查看控制台信息。

    • RabbitMQ默认的用户名:guest,密码:guest
    image

三、主机使用rabbitmq 服务器

# 启动服务器
service rabbitmq-server start

1.端口访问

RabbitMQ节点绑定到端口(开放服务器TCP套接字),以便接受客户端和CLI工具连接。需要确保可以访问以下端口:

  • 4369:epmd,RabbitMQ节点和CLI工具使用的对等发现服务
  • 5672、5671:由不带TLS和带TLS的AMQP 0-9-1和1.0客户端使用
  • 25672:用于节点间和CLI工具通信(Erlang分发服务器端口),并从动态范围分配(默认情况下限制为单个端口,计算为AMQP端口+ 20000)。除非确实需要这些端口上的外部连接,否则这些端口不应公开。
  • 35672-35682:由CLI工具用于与节点进行通信,并从动态范围分配(通过服务器分发端口+ 10010计算为服务器分发端口+ 10000)。
  • 15672:HTTP API客户端,管理UIRabbitmqadmin (仅在启用了管理插件的情况下)
  • 61613、61614:不带TLS和带TLS的STOMP客户端(仅在启用STOMP插件的情况下)
  • 1883、8883 :(不带和带有TLS的MQTT客户端,如果启用了MQTT插件
  • 15674:STOMP-over-WebSockets客户端(仅在启用了Web STOMP插件的情况下)
  • 15675:MQTT-over-WebSockets客户端(仅当启用了Web MQTT插件时
  • 15692:Prometheus指标(仅在启用Prometheus插件的情况下)

2.调整Linux上系统限制

消息队列在linux系统中运行主要受到两个限制:

  • 操作系统内核允许的最大打开文件数(fs.file-max)

    • 修改配置文件/etc/systemd/system/rabbitmq-server.service.d/limits.conf,设置LimitNOFILE=64000。

    • 修改Docker包含的内核限制:/etc/docker/daemon.json

      {
         “ default-ulimits”:{
           “ nofile”:{
             “ Name”:“ nofile”,
             “ Hard”:64000,
             “ Soft”:64000 
          } 
        } 
      }
      
  • 每个用户的限制打开文件数(ulimit -n)

调整系统限制和内核参数,才能处理相当数量的并发连接和队列。建议在生产环境中为用户Rabbitmq至少允许65536个文件描述符。

验证内核限制

$ rabbitmqctl status

# 查看进程的限制
$ cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits

四、使用RabbitMQ客户端

1.RabbitMQ支持的客户端

Clients Libraries and Developer Tools

RabbitMQ支持以下语言的客户端

2.RabbitMQ客户端的工作模式

1)简单模式(hello world)

img
  • 一个生产者,一个消费者)

2)工作队列(Work queues)

img
  • 一个生产者,多个消费者
  • 一条消息只会被一个消费者接收;
  • 采用轮询的方式将消息是平均发送给消费者的;
  • 消费者在处理完某条消息后,才会收到下一条消息。

3)发布/订阅(Publish/Subscribe)

img
  • 每个消费者监听自己的队列。
  • 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

4)路由模式(Routing)

img
  • 每个消费者监听自己的队列,并且设置routing key。
  • 生产者将消息发给交换机,由交换机根据routing key来转发消息到指定的队列。

5)主题模式(Topics)

img

主题模式应该算是路由模式的一种,也是通过 routing_key 来分发,只不过是 routing_key 支持了正则表达式,更加灵活。

  • 星号井号代表通配符
  • 星号代表多个单词,井号代表一个单词
  • 路由功能添加模糊匹配
  • 消息产生者产生消息,把消息交给交换机
  • 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

6)RPC

img

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现。

  • 客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
  • 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
  • 服务端将RPC方法 的结果发送到RPC响应队列。
  • 客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

2.Go RabbitMQ 客户端使用

Go code for RabbitMQ tutorials

当前主要介绍Go语言的Rabbit 客户端使用。

  • 获取Go RabbitMQ客户端库

    go get github.com/streadway/amqp
    

    package amqp spi说明

  • 发送消息(生产者)

    image
    // 1.import the library
    package main
    import (
      "log"
      "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
      if err != nil {
          log.Fatalf("%s: %s", msg, err)
      }
    }
    
    func main() {
        // 2.connect to RabbitMQ server
        //该连接抽象了套接字连接,并处理协议版本协商和认证等。
      conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
      failOnError(err, "Failed to connect to RabbitMQ")
      defer conn.Close()
    
        // 3.create a channel
      ch, err := conn.Channel()
      failOnError(err, "Failed to open a channel")
      defer ch.Close()
    
        // 4.使用channel声明一个队列
      q, err := ch.QueueDeclare(
          "hello", // name
          false,   // durable
          false,   // delete when unused
          false,   // exclusive
          false,   // no-wait
          nil,     // arguments
      )
      failOnError(err, "Failed to declare a queue")
    
        // 5.使用通道发布消息到队列中
      body := "Hello World!"
      err = ch.Publish(
          "",     // exchange
          q.Name, // routing key
          false,  // mandatory
          false,  // immediate
          amqp.Publishing{
              ContentType: "text/plain",
              Body:        []byte(body),
          })
      log.Printf(" [x] Sent %s", body)
      failOnError(err, "Failed to publish a message")
    }
    
  • 接收消息(消费者)

    image
    // 1.import the library
    package main
    import (
      "log"
      "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
      if err != nil {
          log.Fatalf("%s: %s", msg, err)
      }
    }
    
    func main() {
        // 2.connect to RabbitMQ server
      conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
      failOnError(err, "Failed to connect to RabbitMQ")
      defer conn.Close()
    
        // 3.create a channel
      ch, err := conn.Channel()
      failOnError(err, "Failed to open a channel")
      defer ch.Close()
    
         // 4.使用channel声明一个队列
      q, err := ch.QueueDeclare(
          "hello", // name
          false,   // durable
          false,   // delete when unused
          false,   // exclusive
          false,   // no-wait
          nil,     // arguments
      )
      failOnError(err, "Failed to declare a queue")
    
        //5. 接收(消费)队列消息
      msgs, err := ch.Consume(
          q.Name, // queue
          "",     // consumer
          true,   // auto-ack
          false,  // exclusive
          false,  // no-local
          false,  // no-wait
          nil,    // args
      )
      failOnError(err, "Failed to register a consumer")
    
      forever := make(chan bool)
    
      go func() {
            //6. 打印队列消息的内容
          for d := range msgs {
              log.Printf("Received a message: %s", d.Body)
          }
      }()
    
      log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
      <-forever
    }
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352