nestjs 中使用RabbitMQ

选择什么

官方的建议:npm i --save amqplib amqp-connection-manager
但是我不推荐,我司实际项目中,我使用的是 @golevelup/nestjs-rabbitmq 点击去看GitHub的文档描述。

你到底要讲什么?

别急, 先从场景入手,我司有Java项目,也是用的RabbitMQ,Java和nestJs能不能通过RabbitMQ擦出点不一样的火花?

问题1:消息共享

Java推送到Queue的消息起初我这nestjs这边之间消费不了,擦,业务全写好了给我整这个? 嗯几经摸索,我发现哈,这个消息的content_type是:application/x-java-serialized-object, node没法反序列化,需要把消息toString,作为text/plain:文本数据类型存储,在实际消费的时候,nest这边拿到的就是个Object对象,也不需要JSON.parse。

问题2:RabbitMQ延迟队列 nest怎么实现

// 伪代码
// 定义 exchanges

exchanges: [
  {
    name: "exchangeName",
    type: "x-delayed-message",
    options: {
      durable: true,
      arguments: {
        "x-delayed-type": "direct",
      },
    },
  }
]

// -----------发送延时消息--------
await this.amqpConnection.publish(
  "exchangeName", // exchange
  "queueIxxx", // routingKey
  {
    // 自己的message
  },
  ///  重要!!!这么设置就完成了
  {
    headers: {
      "x-delay": 60 * 1000,
    },
  },
)

嗯 就这么简单

问题3:消费者数量

说实话像Java那种直接一个注解设置多少个消费者我还真没找到该怎么写。但是!channels可以设置好几个信道

// 伪代码
// 设置信道 和exchanges同级
    channels: {
      // 全局异步队列 默认消费者数量
      global: {
        prefetchCount: 5,  // 这玩意就当是消费者数量吧
        default: true,
      },
      // xxx有接口QPS限制,就使用一个消费者
      a: {
        prefetchCount: 1,
      },
      // xxx需求比较频繁,多给几个消费者
      b: {
        prefetchCount: 10,
      },
    },
// --------------------------------------------
@RabbitSubscribe({
    exchange: "exchangeName",
    routingKey: "queueIxxx",
    queue: "queueIxxx",
    queueOptions: {
      durable: true,
      channel: "b", // 这个不设置就是用的 global,也可以指定上面设置的 channel
    },
  })

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容