Redis Queue

消息模式

一般来说,消息队列有两种模式:

  • 生产者消费者模式(Queue);
image.png

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

  • 一种是发布者订阅者模式(Topic);
image.png

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

利用redis这两种场景的消息队列都能实现。

Queue 模式

生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。
具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。

package com.hand.hap.message;

import redis.clients.jedis.Jedis;

import java.util.List;

public class TestQueue {
    static Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) throws Exception {
        TestQueue test = new TestQueue();
        Thread thred2 = new Thread(test.runnable, "消息发送者线程");
        thred2.start();

        while (true) {
            Thread.currentThread().sleep(1);

            // 设置超时时间为0,表示无限期阻塞
            List<String> message = jedis.brpop(0, "queue1");

            System.out.println(message.toString());
        }
    }


    Runnable runnable = () -> {
        Long count = 0L;
        while (count < 10) {
            System.out.println(Thread.currentThread().getName());
            count++;
            jedis.lpush("queue1", "message: hello redis queue" + count);

        }
    };

}

分别在命令行下执行以下命令,结果如下

brpop queue1 0

pop queue1 0
image.png

发布者订阅者模式

发布者生产消息放到队列里,多个监听队列的订阅者都会受到同一份消息,订阅者可以订阅多个Topic。

package com.hand.hap.message;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import java.util.Date;

public class TestTopic {
    static Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) throws Exception {
        MessageHandler n = new MessageHandler();
        Thread thread1 = new Thread(n);
        thread1.start();

        Thread thread2 = new Thread(n);
        thread2.start();

        Thread thread3 = new Thread(n);
        thread3.start();

        Thread.currentThread().sleep(1000);

        // 向“channel1”的频道发送消息, 返回订阅者的数量
        Long publishCount = jedis.publish("channel1", new Date() + ": hello redis channel1");
        System.out.println("发送成功,该频道有" + publishCount + "个订阅者");
        jedis.publish("channel1", "close channel");


    }
}


class MessageHandler extends JedisPubSub implements Runnable {

    /**
     * channel频道接收到新消息后,执行的逻辑
     *
     * @param channel
     * @param message
     */
    @Override
    public void onMessage(String channel, String message) {
        // 执行逻辑
        System.out.println(channel + "频道发来消息:" + message);
        // 如果消息为 close channel, 则取消此频道的订阅
        if ("close channel".equals(message)) {
            this.unsubscribe(channel);
        }
    }

    /**
     * channel频道有新的订阅者时执行的逻辑
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(channel + "频道新增了" + subscribedChannels + "个订阅者");
    }


    /**
     * channel频道有订阅者退订时执行的逻辑
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(channel + "频道退订成功");
    }

    @Override
    public void run() {
        MessageHandler handler = new MessageHandler();
        Jedis jedis = new Jedis("localhost", 6379);
        jedis.subscribe(handler, "channel1");

        /**
         * 使用下面会报错
         * ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
         */
//        TestTopic.jedis.subscribe(handler, "channel1");
    }
}

Spring继承

本章节要介绍 Redis两种消息队列模式在Spring中的应用,依赖于spring-data-redis。所以在此之前,简单的了解以下spring-data-redis。

RedisMessageListenerContainer

RedisMessageListenerContainer在spring-data-redis中负责消息监听。客户程序需要自己实现MessageListener,并以指定的topic注册到RedisMessageListenerContainer,这样,在指定的topic上如果有消息,RedisMessageListenerContainer便会通知该MessageListener。好了,有关于对 spring-data-redis 的了解到这里就可以结束了,因为本文的主要目的是介绍Redis两种消息队列模式在Hap中的应用,其它的可以根据自己兴趣选择看或者不看。

配置文件

Hap中提供了两种消息队列支持:Redis 和 RabbitMQ。相对来说 RabbitMQ 相对 Redis 更重量级一些,如果只是一些简单的业务场景,使用 Redis 作为消息队列也足够了。RabbitMQ不属本文的讲解范文,所以主要看看 Redis 消息队列的配置文件。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">


    <bean id="mapSerializer" class="org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer">
        <constructor-arg type="java.lang.Class" value="java.util.HashMap"/>
        <property name="objectMapper" ref="objectMapper"/>
    </bean>

    <!-- 发布消息工具类 -->
    <bean class="com.hand.hap.message.impl.MessagePublisherImpl"/>

    <!--发布/订阅监听-->
    <bean class="com.hand.hap.message.TopicListenerContainer">
        <property name="connectionFactory" ref="v2redisConnectionFactory"/>
        <property name="recoveryInterval" value="10000"/>
        <!--<property name="messageListeners" ref="messageListeners"/>-->
    </bean>

    <bean id="simpleQueueConsumer" class="com.hand.hap.message.impl.SimpleMessageConsumer"/>

    <!--队列监听-->
    <bean class="com.hand.hap.message.QueueListenerContainer">
        <property name="connectionFactory" ref="v2redisConnectionFactory"/>
        <property name="recoveryInterval" value="5000"/>
        <property name="stringRedisSerializer" ref="stringRedisSerializer"/>
        <property name="listeners">
            <list>
                <!-- auto detect bean with annotation QueueMonitor -->
            </list>
        </property>
    </bean>

</beans>

v2redisConnectionFactory 在application-redis.x配置文件中已经定义过了,这里没必要刨根问底这些配置项对应的的各个含义,没有太大意思。SimpleMessageConsumer 在这里没什么做哦有那个,所以,这里主要关注的内容有个:MessagePublisherImplTopicListenerContainer、、QueueListenerContainer。除了MessagePublisherImpl是和消息发布有关,其它两个都是属于消息监听,刚好对应Redis消息队列的两种模式,TopicListenerContainer 对应 Topic 模式, QueueListenerContainer对应 Queue模式。

MessagePublisherImpl

这其实是一个发布消息工具类。不太清楚为什么要在配置文件里面配置,加个注解不是会更好吗?或许也是为了方便我们理解吧。其源码如下

/*
 * #{copyright}#
 */

package com.hand.hap.message.impl;

import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hand.hap.message.IMessagePublisher;

@Component
public class MessagePublisherImpl implements IMessagePublisher {

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private Logger logger = LoggerFactory.getLogger(MessagePublisherImpl.class);

    @Override
    public void publish(String channel, Object message) {
        //添加前缀
        channel = ChannelAndQueuePrefix.addPrefix(channel);
        if (message instanceof String || message instanceof Number) {
            redisTemplate.convertAndSend(channel, message.toString());
        } else {
            try {
                redisTemplate.convertAndSend(channel, objectMapper.writeValueAsString(message));
            } catch (JsonProcessingException e) {
                if (logger.isErrorEnabled()) {
                    logger.error("publish message failed.", e);
                }
            }
        }
    }

    @Override
    public void rPush(String list, Object message) {
        message(list, message);
    }

    @Override
    public void message(String name, Object message) {
        if (message instanceof String || message instanceof Number) {
            redisTemplate.opsForList().rightPush(name, message.toString());
        } else {
            try {
                redisTemplate.opsForList().rightPush(name, objectMapper.writeValueAsString(message));
            } catch (JsonProcessingException e) {
                if (logger.isErrorEnabled()) {
                    logger.error("push data failed.", e);
                }
            }
        }
    }
}

发布消息的时候,也是调用spring-data-redis的API,这里就不过多说明。

TopicListenerContainer

这个类要具体分析,起源吗如下:

/*
 * Copyright Hand China Co.,Ltd.
 */

package com.hand.hap.message;

import com.hand.hap.core.AppContextInitListener;
import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author shengyang.zhou@hand-china.com
 */
public class TopicListenerContainer extends RedisMessageListenerContainer implements AppContextInitListener {

    private Logger logger = LoggerFactory.getLogger(TopicListenerContainer.class);

    @Override
    public boolean isAutoStartup(){
        return false;
    }

    @Override
    public void contextInitialized(ApplicationContext applicationContext) {
        Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
        Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>();
        monitors.forEach((k, v) -> {
            Class<?> clazz = v.getClass();
            TopicMonitor tm = clazz.getAnnotation(TopicMonitor.class);
            String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
            List<Method> avaMethods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));

            if (avaMethods.isEmpty()) {
                if (logger.isErrorEnabled()) {
                    logger.error("can not find proper method of name '{}' for bean {}", mn, v);
                }
                return;
            }

            SimpleMessageListener adaptor = new SimpleMessageListener(v, avaMethods.get(0));
            List<Topic> topics = new ArrayList<>();
            for (String t : tm.channel()) {
                //添加前缀
                t = ChannelAndQueuePrefix.addPrefix(t);
                Topic topic = new PatternTopic(t);
                topics.add(topic);
            }

            listeners.put(adaptor, topics);
        });
        setMessageListeners(listeners);
        // start(); // auto call
//        if (listeners != null) {
//            for (ITopicMessageListener receiver : listeners) {
//                MessageListenerAdapter messageListener = new MessageListenerAdapter(receiver, "onTopicMessage");
//                if (receiver.getRedisSerializer() != null) {
//                    messageListener.setSerializer(receiver.getRedisSerializer());
//                }
//                messageListener.afterPropertiesSet();
//                List<Topic> topics = new ArrayList<>();
//                for (String t : receiver.getTopic()) {
//                    Topic topic = new PatternTopic(t);
//                    topics.add(topic);
//                }
//                listeners.put(messageListener, topics);
//            }
//        }
    }

    private static class SimpleMessageListener implements MessageListener {
        private RedisSerializer redisSerializer;

        private Object target;
        private Method method;

        private Logger logger;

        SimpleMessageListener(Object target, Method method) {
            this.target = target;
            this.method = method;
            Class p0 = method.getParameterTypes()[0];
            redisSerializer = MethodReflectUtils.getProperRedisSerializer(p0);
            logger = LoggerFactory.getLogger(target.getClass());
        }

        @Override
        public void onMessage(Message message, byte[] pattern) {
            try {
                Object obj = redisSerializer.deserialize(message.getBody());
                String p = new String(pattern, "UTF-8");
                //去掉前缀
                p = ChannelAndQueuePrefix.removePrefix(p);
                method.invoke(target, obj, p);
            } catch (Exception e) {
                Throwable thr = e;
                while (thr.getCause() != null) {
                    thr = thr.getCause();
                }
                if (logger.isErrorEnabled()) {
                    logger.error(thr.getMessage(), thr);
                }
            }
        }
    }
}

实现了 AppContextInitListener 类,所以在项目启动的时候 contextInitialized 方法将被调用。整体思路就是:在项目启动的时候,根据注解找到所以的 订阅者,然后维护“订阅者”和“主题”的关系,然后交给消息监听器。

        Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);

就是通过这行代码找到所有的消息订阅者。TopicMonitor 注解如下,包括两个属性:channel 和 method。channel 就是主题,;可能大家对method 的作用不是很理解,我们知道,实现了 MessageListener 的消息监听器,在收到消息的时候,会执行 onMessage() 方法,这个 method ,就是为了让消息监听器收到消息时可以执行别的方法,而这个方法名就是通过 method 属性定义。

package com.hand.hap.message;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicMonitor {
    String[] channel() default {};

    /**
     * default empty,auto detect depends on object type.
     * <p>
     * IQueueMessageListener:onQueueMessage<br>
     * OTHERS:onMessage
     *
     */
    String method() default "";
}

获取到所有的消息执订阅者之后,然后进行遍历,下面就是获取方法名的具体逻辑

    String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);

    public static String getTopicMethodName(String mn, Object target) {
        if (org.apache.commons.lang.StringUtils.isBlank(mn)) {
            if (target instanceof ITopicMessageListener) {
                mn = ITopicMessageListener.DEFAULT_METHOD_NAME;
            } else {
                mn = IMessageConsumer.DEFAULT_METHOD_NAME;
            }
        }
        return mn;
    }

也就是说,如果代码中是同通过 实现 ITopicMessageListener 来实现消息监听的话,则会执行 onTopicMessage 方法,或者会执行 onMessage 方法。

image.png

image.png

如上所示,在获取到方法名之后,组装 "消息订阅者" 和 "Topic" 为Map对象, 因为 一个 订阅者 可以订阅多个 Topic,所以是一对多的关系。组装之后,然后将 listeners 交给 spring-data-redi管理,最后通过反射执行具体的逻辑代码。

QueueListenerContainer

QueueListenerContainer没有继承RedisMessageListenerContainer,所以它的实现方式有些不同,相当于是框架为我们封装了一套API,其具体实现如下:

/*
 * #{copyright}#
 */

package com.hand.hap.message;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

import com.hand.hap.core.AppContextInitListener;

import redis.clients.jedis.Jedis;


public class QueueListenerContainer implements AppContextInitListener, DisposableBean, SmartLifecycle {

    private Logger logger = LoggerFactory.getLogger(QueueListenerContainer.class);

    private RedisConnectionFactory connectionFactory;

    private static final int PHASE = 9999;

    private static final long MIN_RECOVERY_INTERVAL = 2000L;

    private static final long DEFAULT_RECOVERY_INTERVAL = 5000L;

    /**
     * 100ms.
     */
    private static final long IDLE_SLEEP_TIME = 100L;

    private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

    private volatile boolean running = false;

    private ExecutorService executorService;

    private List<IQueueMessageListener<?>> listeners;

    private List<MonitorTask> monitorTaskList = new ArrayList<>();

    private RedisSerializer<String> stringRedisSerializer;

    public RedisConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public long getRecoveryInterval() {
        return recoveryInterval;
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
        if (recoveryInterval < MIN_RECOVERY_INTERVAL) {
            if (logger.isWarnEnabled()) {
                logger.warn("minimum for recoveryInterval is {}", MIN_RECOVERY_INTERVAL);
            }
            this.recoveryInterval = MIN_RECOVERY_INTERVAL;
        }
    }

    public List<IQueueMessageListener<?>> getListeners() {
        return listeners;
    }

    public void setListeners(List<IQueueMessageListener<?>> listeners) {
        this.listeners = listeners;
    }

    public RedisSerializer<String> getStringRedisSerializer() {
        return stringRedisSerializer;
    }

    @Autowired
    public void setStringRedisSerializer(RedisSerializer<String> stringRedisSerializer) {
        this.stringRedisSerializer = stringRedisSerializer;
    }

    @Override
    public void destroy() throws Exception {
        stop();
    }

    @Override
    public void contextInitialized(ApplicationContext applicationContext) {
        if (listeners == null) {
            listeners = new ArrayList<>();
        }
        Map<String, Object> lts = applicationContext.getBeansWithAnnotation(QueueMonitor.class);
        lts.forEach((k, v) -> {
            Class clazz = v.getClass();
            QueueMonitor qm = (QueueMonitor) clazz.getAnnotation(QueueMonitor.class);
            final String queue = qm.queue();
            String mn = MethodReflectUtils.getQueueMethodName(qm.method(), v);
            List<Method> methods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
            if (methods.isEmpty()) {
                if (logger.isErrorEnabled()) {
                    logger.error("can not find proper method of name '{}' for bean {}", mn, v);
                }
                return;
            }
            final Method method = methods.get(0);
            IQueueMessageListener qml = new SimpleQueueListener(queue, v, method);
            listeners.add(qml);

        });
        executorService = Executors.newFixedThreadPool(listeners.size());
        for (IQueueMessageListener<?> receiver : listeners) {
            MonitorTask task = new MonitorTask(receiver);
            monitorTaskList.add(task);
            executorService.execute(task);
        }
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void stop(Runnable callback) {
        stop();
        callback.run();
    }

    @Override
    public void start() {
        if (!running) {
            running = true;

            if (logger.isDebugEnabled()) {
                logger.debug("startup success");
            }
        }
    }

    @Override
    public void stop() {
        if (isRunning()) {
            running = false;
            monitorTaskList.forEach(MonitorTask::stop);
            executorService.shutdownNow();
            if (logger.isDebugEnabled()) {
                logger.debug("shutdown complete");
            }
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    @Override
    public int getPhase() {
        return PHASE;
    }

    private static class SimpleQueueListener implements IQueueMessageListener {
        private String queue;
        private Object target;
        private Method method;
        private RedisSerializer redisSerializer;
        private Logger logger;

        SimpleQueueListener(String queue, Object target, Method method) {
            this.queue = queue;
            this.target = target;
            this.method = method;
            this.redisSerializer = MethodReflectUtils.getProperRedisSerializer(method.getParameterTypes()[0]);
            this.logger = LoggerFactory.getLogger(target.getClass());
        }

        @Override
        public String getQueue() {
            return queue;
        }

        @Override
        public RedisSerializer getRedisSerializer() {
            return redisSerializer;
        }

        @Override
        public void onQueueMessage(Object message, String queue) {
            try {
                method.invoke(target, message, queue);
            } catch (Exception e) {
                Throwable thr = e;
                while (thr.getCause() != null) {
                    thr = thr.getCause();
                }
                if (logger.isErrorEnabled()) {
                    logger.error(thr.getMessage(), thr);
                }
            }
        }
    }

    /**
     * 
     * @param <T>
     */
    private class MonitorTask<T> implements SchedulingAwareRunnable {

        private IQueueMessageListener<T> receiver;
        private RedisConnection connection;

        private boolean running = false;

        MonitorTask(IQueueMessageListener<T> receiver) {
            this.receiver = receiver;
            Assert.notNull(receiver, "receiver is null.");
            Assert.hasText(receiver.getQueue(), "queue is not valid");
        }

        public void stop() {
            running = false;
            safeClose(true);
        }

        @Override
        public void run() {
            running = true;
            T message;
            while (running) {
                try {
                    if (connection == null) {
                        connection = connectionFactory.getConnection();
                    }
                    message = fetchMessage(connection, receiver.getQueue());
                    if (message == null) {
                        sleep_(IDLE_SLEEP_TIME);
                        continue;
                    }
                } catch (Throwable thr) {
                    if (!running) {
                        break;
                    }
                    safeClose();
                    if (logger.isDebugEnabled()) {
                        logger.error("exception occurred while get message from queue [" + receiver.getQueue() + "]",
                                thr);
                        logger.debug("try recovery after {}ms", getRecoveryInterval());
                    }
                    sleep_(getRecoveryInterval());
                    continue;
                }
                try {
                    receiver.onQueueMessage(message, receiver.getQueue());
                } catch (Throwable thr) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("exception occurred while receiver consume message.", thr);
                    }
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug("stop monitor:" + this);
            }
            safeClose();
        }

        T fetchMessage(RedisConnection connection, String queue) {
            List<byte[]> bytes = connection.bLPop(0, stringRedisSerializer.serialize(queue));
            if (bytes == null || bytes.isEmpty()) {
                return null;
            }
            return receiver.getRedisSerializer().deserialize(bytes.get(1));
        }

        void safeClose(boolean... closeNative) {
            if (connection != null) {
                try {
                    if (closeNative.length > 0 && closeNative[0]) {
                        // close native connection to interrupt blocked
                        // operation
                        ((Jedis) connection.getNativeConnection()).disconnect();
                    }
                    connection.close();
                } catch (Exception e) {
                    // if (logger.isErrorEnabled()) {
                    // logger.error(e.getMessage(), e);
                    // }
                }
            }
            connection = null;
        }

        void sleep_(long time) {
            try {
                Thread.sleep(time);
            } catch (Exception e) {
                // if (logger.isErrorEnabled()) {
                // logger.error(e.getMessage(), e);
                // }
            }
        }

        @Override
        public boolean isLongLived() {
            return true;
        }
    }
}

前面的实现基本类似,区别如果实现了IQueueMessageListener 类,消息监听器收到消息的时候会执行 onQueueMessage 方法;另一个区别就是这里用了另外一个 注解 QueueMonitor

package com.hand.hap.message;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueMonitor {
    String queue() default "";

    /**
     * default empty,auto detect depends on object type.
     * <p>
     * ITopicMessageListener:onTopicMessage<br>
     * OTHERS:onMessage
     * 
     */
    String method() default "";

}
image.png

image.png

获取到所有的消息监听器之后,开启多线程执行消息监听。因为存在多个消费者监听同一个队列的情况,存在消费者竞争,所以需要判断一下消息是否还在。

使用方式

package com.hand.hap.activiti.manager;

import com.hand.hap.activiti.util.ActivitiUtils;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.Position;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.mapper.PositionMapper;
import com.hand.hap.message.IMessageConsumer;
import com.hand.hap.message.TopicMonitor;
import org.activiti.engine.identity.Group;
import org.activiti.engine.impl.persistence.entity.UserEntity;
import org.activiti.engine.impl.persistence.entity.data.impl.MybatisUserDataManager;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author shengyang.zhou@hand-china.com
 */
@TopicMonitor(channel = "employee.change")
public class CustomUserDataManager extends MybatisUserDataManager
        implements IMessageConsumer<Employee>, InitializingBean {

    @Autowired
    private PositionMapper positionMapper;

    @Autowired
    private EmployeeMapper employeeMapper;

    public Map<String, UserEntity> userCache = new HashMap<>();

    @Autowired
    private SpringProcessEngineConfiguration pec;

    public CustomUserDataManager() {
        super(null);
    }

    @Override
    public List<Group> findGroupsByUser(String userId) {
        List<Position> positions = positionMapper.getPositionByEmployeeCode(userId);
        List<Group> gs = new ArrayList<>();
        for (Position position : positions) {
            gs.add(ActivitiUtils.toActivitiGroup(position));
        }
        return gs;
    }

    /**
     * 这个方法使用非常频繁,做缓存支持
     *
     * @param entityId
     * @return
     */
    @Override
    public UserEntity findById(String entityId) {
        UserEntity userEntity = userCache.get(entityId);
        if (userEntity == null) {
            Employee employee = employeeMapper.queryByCode(entityId);
            userEntity = ActivitiUtils.toActivitiUser(employee);
            userCache.put(entityId, userEntity);
        }
        return userEntity;
    }

    @Override
    public void onMessage(Employee message, String pattern) {
        userCache.remove(message.getEmployeeCode());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.processEngineConfiguration = pec;
    }
}
package com.hand.hap.hr.service.impl;

import com.github.pagehelper.PageHelper;
import com.github.pagehelper.StringUtil;
import com.hand.hap.account.dto.User;
import com.hand.hap.account.dto.UserRole;
import com.hand.hap.account.mapper.UserRoleMapper;
import com.hand.hap.account.service.IUserRoleService;
import com.hand.hap.account.service.IUserService;
import com.hand.hap.cache.impl.UserCache;
import com.hand.hap.core.IRequest;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.UserAndRoles;
import com.hand.hap.hr.mapper.EmployeeAssignMapper;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.service.IEmployeeService;
import com.hand.hap.message.IMessagePublisher;
import com.hand.hap.message.TopicMonitor;
import com.hand.hap.mybatis.common.Criteria;
import com.hand.hap.system.dto.DTOStatus;
import com.hand.hap.system.service.impl.BaseServiceImpl;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Service
public class EmployeeServiceImpl extends BaseServiceImpl<Employee> implements IEmployeeService {

    @Autowired
    EmployeeMapper employeeMapper;

    @Autowired
    private IMessagePublisher messagePublisher;

    @Autowired
    private IUserRoleService userRoleService;

    @Autowired
    private IUserService userService;

    @Autowired
    private UserRoleMapper userRoleMapper;

    @Autowired
    private EmployeeAssignMapper employeeAssignMapper;

    @Autowired
    private UserCache userCache;

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public List<Employee> submit(IRequest request, List<Employee> list) {
        self().batchUpdate(request, list);
        for (Employee e : list) {
            messagePublisher.publish("employee.change", e);
        }
        return list;
    }
.......
}

当在界面修改了员工信息的时候,会调用 messagePublisher.publish("employee.change", e); 方法,因为 CustomUserDataManager 监听了 "employee.change" 主题的消息,所以在发布之后,会执行CustomUserDataManager .onMessage 的方法,消息队列的使用方式与之类似。

思考

在Hap中,定义过多的 Queue 消息监听器会影响系统性能吗?一个消息监听器就要开启一个线程,如果消息监听器太多,线程池够用吗?不知道自己的思路有没有问题,值得验证。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,018评论 25 707
  • 朋友难求,能碰到志同道合之人,更是人生中莫大的幸事。有心人会发现,我是很少为某个人写东西的,但,今天我有一些话要送...
    左左兮阅读 299评论 0 0
  • 沙沙大盗阅读 105评论 0 0
  • 今天的我,又在看过去的动漫,电视剧了。我很喜欢怀念过去,怀念童年,因为现在的日子过得乏味无趣,但我也懒得去...
    芝芝AZ阅读 187评论 0 0