今天遇到了这样一个需求: 在每个WebSocketServer实例启动时,动态声明一个RabbitMQ Queue,然后监听这个Queue,并将queueName存储在redis上.
正常来说应该是这样实现:
public class RabbitQueueDeclarer implements ApplicationRunner(){
public void run(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.80");
factory.setPort(5672);
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()
){
String queueName = channel.queueDeclare().getQueue();
JedisUtil.setString("queueName", "userId", 60000);
channel.basicConsume("exchange", (s, delivery) -> {
System.out.println(delivery.getBody());
}, consumerTag ->{});
}
}
}
但是这样的话有个问题, 就是这个Connection的生命周期没有办法和Application同步, 要想延长它的声明周期,需要把这个方法移到Application的main里去执行. 但总觉得那样的写法不太方便,也不够优雅. 所以就想到让Springboot整合RabbitMQ, 把整个RabbitMQ的生命周期交给它去管理, 日后添加一些功能也比较方便.
首先万年不变的pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
如果之前引过rabbitmq的包的话,删了就可以了,这个依赖里面有.
然后配置application.yml:
spring.rabbitmq.host=192.168.1.80
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
因为我们只是消费者所以不用配置太多.
接下来我们的监听器:
@Component
@Slf4j
public class RabbitQueueDeclarer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${im.nodename}", durable = "false", exclusive = "true"),
exchange = @Exchange(value = "${im.rabbitmq.msgpushexchange}", ignoreDeclarationExceptions = "true"),
key = "${im.nodename}")
)
public void handleMessagePush(Message delivery, Channel channel, @Headers Map<String,Object> headers){
byte[] message = delivery.getBody();
try {
ImMessagePush messagePush = ImServerMessage.parseFrom(message).getImMessagePush();
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(message.length);
buf.writeBytes(message);
NettyHelper.imUsers
.writeAndFlush(new BinaryWebSocketFrame(buf), (io.netty.channel.Channel c) ->
c.attr(NettyHelper.userID).get().equals(messagePush.getToUserId())
);
} catch (InvalidProtocolBufferException e) {
log.error("can not parse ImMessagePush : [" + Arrays.toString(message) + "]", e);
}
}
}
这里的@RabbitListener支持简单的声明,比如这样
@RabbitListener(queues="myqueue")
这里用了@QueueBinding来动态声明一个queue,使用节点本身的名称作为queueName和routingKey, 将其绑定到一个设置好的exchange上.
这样就搞定了,比起自己写Connection然后consume要方便得多, 也可以通过实现一个SimpleListenerContainer来监听.
如果需要操作rabbit的话, Spring提供了AmqpAdmin和AmqpTemplate工具类来实现一些常用操作.