如何使用消息队列发布与订阅【易扩展】

redis消息队列实现

定义消息接受注解

@Component
@Retention(value = RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisTopic {
    String value();
}

定义消息接受interface

public interface RedisTopicInterface<T> {
    public final static String PREX = "houdamis_";
    /**
     * 发布消息
     * @return
     */
    public String getChannel();
    /**
     * 接受消息
     * @return
     */
    public boolean receiveMsg(T message,String channel);
    /**
     * 接受消息
     * @return
     */
    public boolean isReceiver( String channel);

}

消息接受代码示例

@RedisTopic(value = TOPIC)
public class DataSourceReloadService  implements RedisTopicInterface {
    /**
        必须参数
     * 定义消息队列主题
     */
    public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";

    public   final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param message 消息内容
     * @param channel 消息主题 同topic
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//               TODO 进行业务处理
                xxxx
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }
}

消息队列工具类

public class RedisTopicUtils {
    private static RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
    private static ConcurrentHashMap<String , Set<RedisTopicInterface>> topicBeans = new ConcurrentHashMap<String, Set<RedisTopicInterface>>();

    /**
     * 发布消息
     * @param channel
     * @param messgae
     * @return
     */
    public static boolean sendMessage(String channel ,Object messgae){
        redisTemplate.convertAndSend(channel, messgae);
        return true;
    }

    /**
     * 接受消息
     * @param channel
     * @param messgae
     */
    public static void receiveMessage(String channel, Object messgae) {
        Set<RedisTopicInterface> beans = getReceiver(channel);
        for (RedisTopicInterface v : beans) {
            try {
                v.receiveMsg(messgae,channel);
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 注册RedisTopicInterface
     * @param receiver
     */
    public static void regist(RedisTopicInterface receiver){
        String key = receiver.getClass().getAnnotation(RedisTopic.class).value();
        Set<RedisTopicInterface>  beans = getReceiver(key);
        beans.add(receiver);
        topicBeans.put(key,beans);
    }

    public static Set<RedisTopicInterface> getReceiver(String key){
        Set<RedisTopicInterface>  beans = topicBeans.get(key);
        if(beans == null){
            beans = Sets.newHashSet();
        }
        return beans;
    }

    /**
     * 初始化
     */
    public static void init(){
        try {
            Map<String, Object> serviceBeanMap = SpringContextHolder.getApplicationContext().getBeansWithAnnotation(RedisTopic.class);
            if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RedisTopic.class).value();
                    if(null != interfaceName){
                         regist( (RedisTopicInterface) serviceBean);
                        System.out.println(serviceBean.getClass().getName()+ " regist to RedisTopic[" + interfaceName+"]");
                    }else{
                        System.err.println(serviceBean.getClass().getName()+ "can't regist to RedisTopic[" + interfaceName+"]");
                    }
                }
            }
        } catch ( Throwable e) {
            e.printStackTrace();
        }
    }


}

消息队列-接受者注册

系统启动时进行注册

@Service
public class DataSourceInitListener implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent evt) {
        
        RedisTopicUtils.init();
    }
}

redis xml配置

<!-- 定义监听类 -->
    <bean id="redisMessageListener" class="com.thinkgem.jeesite.common.redis.topic.RedisMessageListener">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>
    <!-- 定义监听容器 -->
    <bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
          destroy-method="destroy">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <!-- 任务执行器 -->
        <property name="taskExecutor">
            <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
                <property name="poolSize" value="10"/>
            </bean>
        </property>
        <!-- 消息监听器 -->
        <property name="messageListeners">
            <map>
                <entry key-ref="redisMessageListener">
                    <list>
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="*" />
                        </bean>
                    </list>
                </entry>
            </map>
        </property>
    </bean>

消息监听

RedisMessageListener监听到消息之后,交由RedisTopicUtils处理,RedisTopicUtils根据topic找到已注册的Set<RedisTopicInterface>,然后通知每个RedisTopicInterface元素进行处理。


/**
 * redis消息接受
 */
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String msgChannel = (String) getRedisTemplate().getKeySerializer().deserialize(channel);
        RedisTopicUtils.receiveMessage(msgChannel,getRedisTemplate().getValueSerializer().deserialize(body));
    }
    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }
    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    
}

消息发布

//        发布 消息
        RedisTopicUtils.sendMessage(DataSourceReloadService.TOPIC, “这是xx消息”;

如何使用消息队列发布与订阅

订阅消息

新增代码 实现RedisTopicInterface,添加@RedisTopic注解,并设置TOPIC对应的主题,实现3个方法(有2个直接使用示例代码即可)
主要实现receiveMsg()方法。

@RedisTopic(value = TOPIC)
public class AAAService  implements RedisTopicInterface {
/** * 定义消息队列主题 */
public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
    /**
     * 接受消息
     * @param message
     * @param channel
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//             TODO   进行业务处理
               此处实现
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }
     @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }

发布消息

//        发布 消息
       RedisTopicUtils.sendMessage(AAAService.TOPIC, “这是xx消息”;
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354