Java 如何使用redis实现发布订阅

背景:

项目中采用多节点部署,保证http请求可以在多个节点执行,提高业务运行效率;由于不愿引入MQ消息队列,所以利用redis的发布订阅功能来实现类似消息队列功能,话不多说直接上代码。


步骤一:

项目中引入redis依赖

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>


步骤二:

添加redis订阅频道:

package cn.yzw.ibuild.domain.redis.subscribe;

import cn.yzw.ibuild.domain.support.enums.RedisMessageChannelEnum;

import org.springframework.context.annotation.Bean;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.listener.ChannelTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component

public class RedisSubConfig {

@Bean

    public RedisMessageListenerContainercontainer(RedisConnectionFactory factory, RedisMessageListener listener) {

RedisMessageListenerContainer container =new RedisMessageListenerContainer();

        container.setConnectionFactory(factory);

        // 添加redis订阅频道, 频道配置在 RedisMessageChannelEnum枚举中

        Arrays.stream(RedisMessageChannelEnum.values()).forEach(item ->container.addMessageListener(listener, new ChannelTopic(item.code())));

        return container;

    }

}


步骤三:

添加redis发布端代码

@Component

public class RedisMessagePublish {

@Resource

    private RedisTemplateredisTemplate;

    @Resource

    private RedissonClientredissonClient;

    public void publishWorkerReportMessage(String channel, RedisMessageUserVO redisMessageUserVO){

        redisTemplate.convertAndSend(channel, redisMessageUserVO);

    }

}


步骤四:

编写消费者端代码,

@Component

@Slf4j

public class RedisMessageListenerimplements MessageListener {

@Resource

    private RedisTemplateredisTemplate;

    @Resource

    private RedissonClientredissonClient;

    @Resource

    private IWorkerStatisticsworkerStatistics;

    @Resource

    private ObjectMapperobjectMapper;

    @Override

    public void onMessage(Message message, byte[] pattern) {

// 获取消息

        byte[] messageBody = message.getBody();

        // 使用值序列化器转换

        Object msg =redisTemplate.getValueSerializer().deserialize(messageBody);

        // 获取监听的频道

        byte[] channelByte = message.getChannel();

        // 使用字符串序列化器转换

        Object channel =redisTemplate.getStringSerializer().deserialize(channelByte);

        System.out.println("---频道---: " + channel);

        System.out.println("---消息内容---: " + msg);

        RedisMessageUserVO redisMessageUserVO =objectMapper.convertValue(msg, RedisMessageUserVO.class);

if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_PRD.code()) && Objects.equals(SpringUtils.getActiveProfile(), "prd")) {

workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());

            }else if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_STG.code())) {

workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());

            }

}

}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容