分布式消息队列rabbitmq 总结

什么时候使用

用于系统的物理解耦和逻辑解耦

场景

  • 削峰填谷
  • 数据驱动的任务依赖
  • 多个接收方,上游不关心多下游执行结果
  • upsteam 关注结果但时间很长

消息队列的推送

  • 推送 push
    • 优点
      • 实时性好
      • 中间件服务器做负载均衡
    • 缺点
      • 需要确认收到
  • 拉取 pull
    • 优点
      • 可以拉取多条
      • 服务端逻辑少
    • 缺点:
      1. 可能导致消息堆积
      2. 消费端主动轮训

rabbitmq的push和pull


  • 这种方式Api比较简单,但是需要自己控制拉取节奏,
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)

Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be preferred.

If there was a delivery waiting on the queue and that delivery was received, the second return value will be true. If there was no delivery waiting or an error occurred, the ok bool will be false.

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
c.deliveryChan, err = c.channel.Consume(c.queueName,
        c.tag,
        false,
        false,
        false,
        false,
        nil)
    if err != nil {
        c.RabbitMQClient.Close()
        return err
    }
    
    // important: 在一个goroutine中同时对msgs和notifyClose两个channel进行读取可能会导致死锁。
    // 因为msgs被关闭就会结束相应的goroutine,
    // 此时notifyClose因为没有接收者,而在amqp.channel关闭的过程中出现死锁。
    go c.handle(c.deliveryChan) //另外启动一个携程处理任务

消息不重复消费

在消息中添加唯一的消息ID;同时确保消息的幂等性;

消息不丢

消息发送的时候Producer要收到rabbitmq的Confirm消息;消费端收到消息后应该给rabbitmq发送ACK;

保证送达

  1. 在保证消息不丢的前提下,在发送到rabbitmq之后写入数据库,当消息被consumer处理之后更新数据库中的状态;
  2. 启动一个异步认为定时检查数据库中的任务,如果状态没有被更新就取出来重新发送到消息队列;
  3. 在保证消息幂等性的前提下,可以保证消息被送达

消息顺序性

消息中只有一个接收者的情况下,可以保证消息的顺序消费

消息队列的延时以及过期失效问题

延时队列可以通过以下2种方式实现:

  1. 死信队列

    • 死信产生:

      • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
      • 消息TTL过期
      • 队列达到最大长度
    • 死信说明
      DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任 何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。可以监听这个队列中的消息做相应的处理。

    • 延时消息
      基于上面的说明,发送一条消息到一个没有consumer的exchange 并设置ttl的过期时间为我们需要延时的时间比如30(秒),当ttl过期之后就会根据私信dlx.exchange路由到指定的queue中;然后再死信队列中的consumer复制消费这个消息

  2. 使用插件 delayed_exchange
    rabbitmq-delayed-message-exchange
    创建exchange的时候需要按照下图所示指定类型

    arg := make(map[string]interface{})
    arg["x-delayed-type"] = "topic"
    ex := mq.Exchange{
        Name: "delay-task2",
        Kind: "x-delayed-message",
        Args: arg,
    }
    

    发送message的时候指定消息的delay的时间

     table := make(map[string]interface{})
     table["x-delay"] = 3000  // 指定delay 3s
     err := p.Publish(ctx, d, "delay.order.abc", table);
    

限流

  // 限定prefetch count  prefetch_size, global
  //Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.
  if err := c.channel.Qos(1, 0, false); err != nil {
  }
c.deliveryChan, err = c.channel.Consume(c.queueName,
      c.tag,
      false, // 关闭auto ack
      false,
      false,
      false,
      nil)
  if err != nil {
      c.RabbitMQClient.Close()
      return err
  }

断线重连

单独启动一个协程,执行下面的操作

func (c *RabbitMQConsumer) reconnect() error {
    // 重新连接成功之后,重新执行consume;

    for {
        // 是否发生错误
        select {
        case err := <-c.connNotify:
            if err != nil {
                log.Println("rabbitmq consumer - connection NotifyClose: ", err)
            }
        case err := <-c.channelNotify:
            if err != nil {
                log.Println("rabbitmq consumer - channel NotifyClose: ", err)
            }
        case <-c.quit:
            return nil
        }
        // 连接未关闭
        if !c.conn.IsClosed() {
            var errConn, errChannel *amqp.Error
            for errChannel = range c.channelNotify {
                log.Println(errChannel)
            }
            for errConn = range c.connNotify {
                log.Println(errConn)
            }

            // 关闭 SubMsg common delivery
            if err := c.channel.Cancel(c.tag, true); err != nil {
                log.Println("rabbitmq consumer - channel cancel failed: ", err)
            }
            if err := c.channel.Close(); err != nil {

            }
            if err := c.conn.Close(); err != nil {
                log.Println("rabbitmq consumer - connection close failed: ", err)
            }
        } else {
            log.Println("conn is closed")
        }
        // IMPORTANT: 必须清空 Notify,否则死连接不会释放 如果还有error一起读完否则连接不能释放

    retry:
        for {
            select {
            case <-c.quit:
                return nil
            default:
                log.Println("rabbitmq consumer - reconnect")
                // 第二次重新连接
                if err := c.Init(); err != nil {
                    // 等待;然后重试
                    time.Sleep(time.Second * 10)
                    log.Println("loop again continue")
                    continue
                }
                break retry
            }
        }
    }
}

高可用

  1. 主备
    只有主节点提供读写;备用节点只是在主节点挂掉的情况下服务;并发量并不大的情况可以使用haproxy做主备;

  2. 镜像模式
    mirror镜像队列;保证rabbitmq数据的高可靠性,实现数据同步2-3个节点的数据同步;前端需要自己做负载均衡


    image.png
  3. Federation
    在broker之间传输消息的插件
    https://github.com/rabbitmq/rabbitmq-federation

image.png

如上图所示,Federation Exchanges,可以看成Downstream从Upstream主动拉取消息,但并不是拉取所有消息,必须是在Downstream上已经明确定义Bindings关系的Exchange,也就是有实际的物理Queue来接收消息,才会从Upstream拉取消息到Downstream。使用AMQP协议实施代理间通信,Downstream会将绑定关系组合在一起,绑定/解绑命令将会发送到Upstream交换机。因此,FederationExchange只接收具有订阅的消息。

rabbitmq 思维导图

rabbit mq.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容