rabbitMQ 和 AMQP

https://www.cnblogs.com/frankyou/p/5283539.html
AMQP 是个协议, RabbitMQ 是根据这个协议开发的可以直接使用的工具
rabbitMQ 官网练习
我们就以 AMQP 0.9.1 client with RabbitMQ extensions in Go 来做练习
AMQP 0-9-1 简介

AMQP基本组成及基本概念

生产者(publisher/producer)生产具有指定路由键(routing key)的消息将其发布给消息代理(broker 即rabbitmq)中的交换机(exchange),交换机将消息中的路由键与队列(queue)绑定信息(binding)中存储的路由键对比,将消息路由到匹配的队列中。队列再将信息通过连接(connection)中的通道(channel)推送给订阅了消息的消费者(consumer)

amqp 的一些流程完全适用于 rabbitMQ


amqp 的知识点目录

由于, amqp 协议比较重要,所以我们,重点来看几个方面的概念

  • 发布 -> 交换机 -> 管道 -> 消费者 流程图

发布 -> 交换机 -> 管道 -> 消费者
  • 交换机和交换机类型

default
就是一个 默认的直连 交换机, 不像 topic 等,他的名字是 空字符, 下面用 go 客户端第一个就是默认交换机。
默认交换机看起来貌似能够直接将消息投递给队列, 就是说,你发布的 routing key 写上 队列的名字,他就会自动发消息到队列里,好像没有中间交换机转发的过程。

direct
队列通过一个 routing key 绑定到 某个交换机
消息携带 routing key ,直连交换机 把他转发到 同样 routing key 的队列里

消息的负载均衡 =》多个消费者绑定同一个队列, 是按轮询方式派发消息的。

funout
所谓的 broadcast routing, 吧消息copy 到所有绑定 fanout 交换机的队列
自动忽略 routing key

topic
就是正则匹配 routing key

headers

  • 队列

声明:
队列在声明(declare)后才能被使用,重复声明队列,如果信息一致会忽略,如果信息不一可能 406 异常抛出

队列名:
队列的名字可以由应用(application)来取,也可以让消息代理(broker)直接生成一个

队列持久化,就是代理重启后,队列会自动重新创建(注意,除非做了消息持久化,否则,消息是不会重新恢复的)

AMQP的消息无法路由到队列,根据发布者设置的消息属性, 把消息销毁或者返回给发布者

  • 消费者

消费者从管道里获取数据,两个方式:

1 将消息投递给应用 ("push API") 下面的 Consume 函数返回的就是push 的队列
2 应用根据需要主动获取消息 ("pull API") 主动 pull 使用的是下面的 Get 函数

func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)

具体区别在于:https://cloud.tencent.com/document/product/406/4791

  • 消息确认

可能由于一些原因导致,消息被消费成功与否,并不清楚,所以需要二次确认,之后队列才会清除此消息

两个方式
1 自动确认模式: 当消息代理(broker)将消息发送给应用后立即删除
2 显示确认: 消费者主动ack(具体阶段由消费者决定)之后删除
对于 2 主动确认,假设 消费者(通道关闭、连接关闭或 TCP 连接丢失)死亡,则消息会发给另外的消费者
如果没死,有没发 ack,则 RabbitMQ 等消息代理 将消耗越来越多的内存,因为不能释放 没死还没有 ack 的消息。

  • 拒绝消息

就是消费者收到消息,但是不想执行,告诉代理销毁或者重新放入队列给其他消费者

func (d Delivery) Reject(requeue bool) error  
//requeue为true,则RMQ会把这条消息重新加入队列,如果requeue为false,则RMQ会丢弃本条消息
  • 通道

就是相当于客户端,可以把通道理解成共享一个TCP连接的多个轻量化连接

---------------------------------- 以上是amqp的协议 ------------------------------------

go rabbit mq 绑定

未来可能参见 github 自己的练习代码

消息和队列持久化

  • 队列持久化


    队列持久化
  • 消息持久化


    消息持久化

这里只列出 一些 api
链接 mq

conn, err := amqp.Dial("amqp://guest:guest@172.10.50.42:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

创建 Channel

ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

创建管道

q, err := ch.QueueDeclare(
        "hello", // name
        false,   // 消息代理重启后,队列依旧存在
        false,   // 当最后一个消费者退订后即被删除
        false,   // 只被一个连接(connection)使用,而且当连接关闭后队列即被删除
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

发布消息

err = ch.Publish(
        "",     // exchange    默认交换机
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

消费者接受消息 ( Consume 函数为 push 模式)

msg2, err := ch.Consume(  // 第二个 worker
        q.Name, // queue
        "",     // consumer
        true,
        // auto-ack, 自动确认,即取出任务立刻清楚任务管道这条记录
        // false 则,需要 d.Ack(false) 向服务请求次任务执行完,可以删除
        // 如果消费者死亡(通道关闭、连接关闭或 TCP 连接丢失),没有发 d.Ack,RabbitMQ将理解消息未完全处理并将重新排队或立即交给另一个消费者
        // 如果消费者没死,但是忘记 发 d.Ack, RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

从队列取消息
for d := range msgs1 {
            log.Printf("worker1 Received a message: %s", d.Body)
            dotCount := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dotCount)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            //d.Ack(false)   ch.Consume autoAck 为false 的时候
                        // d.Ack(true)   表示批量确认 当前 channel 的 所有 ack, false 为只确
认这一条信息
        }

更多 exchane 类型,后面在看吧

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容