什么是死信队列?如何实现消息的幂等性?拿来吧你!

死信队列

  • 死信队列:DLX,dead-letter-exchange,一般是由于消息被否定了,消息过期了,或者消息队列超过最大长度导致信息不能被正常消费,那么这条消息就成了死信消息,如果我们绑定了死信队列,那么这个消息就会被投递到死信队列

  • 使用场景:
    使用rabbitmq的死信队列完成对库存、题库的回收工作,比如 某个商品被下单了减了库存,但是迟迟没有付款,超过30分钟我们就默认订单取消,并恢复库存。或者在医生抢题 答题的业务中,有的医生抢了10道题,但是只在有效期(比如1天)内答了3道题,但是剩余的7道题不可能一直被这个医生绑定,超过1天就要被解绑回库。

  • 先说明以下内容:
    1.队列可以在producer里面声明创建和绑定,也可以在consumer里面声明创建和绑定。declare并不关系和影响你的逻辑
    2.如果你的死信队列想使用fanout ,在绑定的时候不要绑定key 即可,即key为空,如果你想使用direct,则必须指定key,其他类型也是如此。

我们先来看消费的代码

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    //声明交换机和消费的队列和key
    var exchange = "direct_guofu_exchange"
    var queue = "direct_guofu_queue_dlx_key"
    var key = "direct_key"
    
    //声明死信队列的交换机和路由键
    var dlxExchange = "dlx_exchange"//死信队列交换机
    var dlxKey = "dlx_key"//死信队列路由键

    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //试探性声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        //此处绑定死信交换机和路由键,不指定路由键默认是fanout模式
        amqp.Table{
            "x-dead-letter-exchange":dlxExchange,//交换机
            "x-message-ttl":6000,//消息过期时间
            "x-dead-letter-routing-key":dlxKey,//绑定死信队列的key
        },
    )
    if err != nil {
        panic(err)
    }
    //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
    ch.QueueBind(queue, key, exchange, false, nil)
    //选择消费死信队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        queue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d := range msg {
        fmt.Println(string(d.Body))
        d.Ack(false)

    }

}


  • 我们再来看消费死信队列的代码,这里面就是普通消费队列的代码,没有什么特殊
package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    var dlxExchange = "dlx_exchange"//死信队列交换机
    var dlxKey = "dlx_key"//死信队列交换机
    var dxlxQueue="dlx_queue"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //试探性声明交换机类型
    ch.ExchangeDeclare(
        dlxExchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //试探性创建队列
    //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        dxlxQueue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
    ch.QueueBind(dxlxQueue, dlxKey, dlxExchange, false, nil)

    //选择消费死信队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        dxlxQueue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d := range msg {
        fmt.Println(string(d.Body))
        d.Ack(false)

    }

}

  • 生产代码,也是一段普通的生产代码
package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示死信队列,
 */
func main() {
    var exchange = "direct_guofu_exchange"
    var key = "direct_key"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //定义消息
    msgBody := "i am a dead_letter"
    //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange, //exchange
        key,      //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err != nil {
        panic(err)
    }
}

  • 我们暂停队列的正常消费,看看6秒后能否进入到死信队列


    image.png
  • 从上图可以看到,死信消息在6s后被投递到死信队列

消息的幂等性参考

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容