选择什么
官方的建议:
npm i --save amqplib amqp-connection-manager
但是我不推荐,我司实际项目中,我使用的是 @golevelup/nestjs-rabbitmq 点击去看GitHub的文档描述。
你到底要讲什么?
别急, 先从场景入手,我司有Java项目,也是用的RabbitMQ,Java和nestJs能不能通过RabbitMQ擦出点不一样的火花?
问题1:消息共享
Java推送到Queue的消息起初我这nestjs这边之间消费不了,擦,业务全写好了给我整这个? 嗯几经摸索,我发现哈,这个消息的content_type是:
application/x-java-serialized-object, node没法反序列化,需要把消息toString,作为text/plain:文本数据类型存储,在实际消费的时候,nest这边拿到的就是个Object对象,也不需要JSON.parse。
问题2:RabbitMQ延迟队列 nest怎么实现
// 伪代码
// 定义 exchanges
exchanges: [
{
name: "exchangeName",
type: "x-delayed-message",
options: {
durable: true,
arguments: {
"x-delayed-type": "direct",
},
},
}
]
// -----------发送延时消息--------
await this.amqpConnection.publish(
"exchangeName", // exchange
"queueIxxx", // routingKey
{
// 自己的message
},
/// 重要!!!这么设置就完成了
{
headers: {
"x-delay": 60 * 1000,
},
},
)
嗯 就这么简单
问题3:消费者数量
说实话像Java那种直接一个注解设置多少个消费者我还真没找到该怎么写。但是!channels可以设置好几个信道
// 伪代码
// 设置信道 和exchanges同级
channels: {
// 全局异步队列 默认消费者数量
global: {
prefetchCount: 5, // 这玩意就当是消费者数量吧
default: true,
},
// xxx有接口QPS限制,就使用一个消费者
a: {
prefetchCount: 1,
},
// xxx需求比较频繁,多给几个消费者
b: {
prefetchCount: 10,
},
},
// --------------------------------------------
@RabbitSubscribe({
exchange: "exchangeName",
routingKey: "queueIxxx",
queue: "queueIxxx",
queueOptions: {
durable: true,
channel: "b", // 这个不设置就是用的 global,也可以指定上面设置的 channel
},
})