背景:
项目中采用多节点部署,保证http请求可以在多个节点执行,提高业务运行效率;由于不愿引入MQ消息队列,所以利用redis的发布订阅功能来实现类似消息队列功能,话不多说直接上代码。
步骤一:
项目中引入redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
步骤二:
添加redis订阅频道:
package cn.yzw.ibuild.domain.redis.subscribe;
import cn.yzw.ibuild.domain.support.enums.RedisMessageChannelEnum;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class RedisSubConfig {
@Bean
public RedisMessageListenerContainercontainer(RedisConnectionFactory factory, RedisMessageListener listener) {
RedisMessageListenerContainer container =new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// 添加redis订阅频道, 频道配置在 RedisMessageChannelEnum枚举中
Arrays.stream(RedisMessageChannelEnum.values()).forEach(item ->container.addMessageListener(listener, new ChannelTopic(item.code())));
return container;
}
}
步骤三:
添加redis发布端代码
@Component
public class RedisMessagePublish {
@Resource
private RedisTemplateredisTemplate;
@Resource
private RedissonClientredissonClient;
public void publishWorkerReportMessage(String channel, RedisMessageUserVO redisMessageUserVO){
redisTemplate.convertAndSend(channel, redisMessageUserVO);
}
}
步骤四:
编写消费者端代码,
@Component
@Slf4j
public class RedisMessageListenerimplements MessageListener {
@Resource
private RedisTemplateredisTemplate;
@Resource
private RedissonClientredissonClient;
@Resource
private IWorkerStatisticsworkerStatistics;
@Resource
private ObjectMapperobjectMapper;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg =redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel =redisTemplate.getStringSerializer().deserialize(channelByte);
System.out.println("---频道---: " + channel);
System.out.println("---消息内容---: " + msg);
RedisMessageUserVO redisMessageUserVO =objectMapper.convertValue(msg, RedisMessageUserVO.class);
if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_PRD.code()) && Objects.equals(SpringUtils.getActiveProfile(), "prd")) {
workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());
}else if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_STG.code())) {
workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());
}
}
}