消息模式
一般来说,消息队列有两种模式:
- 生产者消费者模式(Queue);
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
- 一种是发布者订阅者模式(Topic);
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
利用redis这两种场景的消息队列都能实现。
Queue 模式
生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。
具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。
package com.hand.hap.message;
import redis.clients.jedis.Jedis;
import java.util.List;
public class TestQueue {
static Jedis jedis = new Jedis("localhost", 6379);
public static void main(String[] args) throws Exception {
TestQueue test = new TestQueue();
Thread thred2 = new Thread(test.runnable, "消息发送者线程");
thred2.start();
while (true) {
Thread.currentThread().sleep(1);
// 设置超时时间为0,表示无限期阻塞
List<String> message = jedis.brpop(0, "queue1");
System.out.println(message.toString());
}
}
Runnable runnable = () -> {
Long count = 0L;
while (count < 10) {
System.out.println(Thread.currentThread().getName());
count++;
jedis.lpush("queue1", "message: hello redis queue" + count);
}
};
}
分别在命令行下执行以下命令,结果如下
brpop queue1 0
pop queue1 0
发布者订阅者模式
发布者生产消息放到队列里,多个监听队列的订阅者都会受到同一份消息,订阅者可以订阅多个Topic。
package com.hand.hap.message;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import java.util.Date;
public class TestTopic {
static Jedis jedis = new Jedis("localhost", 6379);
public static void main(String[] args) throws Exception {
MessageHandler n = new MessageHandler();
Thread thread1 = new Thread(n);
thread1.start();
Thread thread2 = new Thread(n);
thread2.start();
Thread thread3 = new Thread(n);
thread3.start();
Thread.currentThread().sleep(1000);
// 向“channel1”的频道发送消息, 返回订阅者的数量
Long publishCount = jedis.publish("channel1", new Date() + ": hello redis channel1");
System.out.println("发送成功,该频道有" + publishCount + "个订阅者");
jedis.publish("channel1", "close channel");
}
}
class MessageHandler extends JedisPubSub implements Runnable {
/**
* channel频道接收到新消息后,执行的逻辑
*
* @param channel
* @param message
*/
@Override
public void onMessage(String channel, String message) {
// 执行逻辑
System.out.println(channel + "频道发来消息:" + message);
// 如果消息为 close channel, 则取消此频道的订阅
if ("close channel".equals(message)) {
this.unsubscribe(channel);
}
}
/**
* channel频道有新的订阅者时执行的逻辑
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "频道新增了" + subscribedChannels + "个订阅者");
}
/**
* channel频道有订阅者退订时执行的逻辑
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "频道退订成功");
}
@Override
public void run() {
MessageHandler handler = new MessageHandler();
Jedis jedis = new Jedis("localhost", 6379);
jedis.subscribe(handler, "channel1");
/**
* 使用下面会报错
* ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
*/
// TestTopic.jedis.subscribe(handler, "channel1");
}
}
Spring继承
本章节要介绍 Redis两种消息队列模式在Spring中的应用,依赖于spring-data-redis。所以在此之前,简单的了解以下spring-data-redis。
RedisMessageListenerContainer
RedisMessageListenerContainer在spring-data-redis中负责消息监听。客户程序需要自己实现MessageListener,并以指定的topic注册到RedisMessageListenerContainer,这样,在指定的topic上如果有消息,RedisMessageListenerContainer便会通知该MessageListener。好了,有关于对 spring-data-redis 的了解到这里就可以结束了,因为本文的主要目的是介绍Redis两种消息队列模式在Hap中的应用,其它的可以根据自己兴趣选择看或者不看。
配置文件
Hap中提供了两种消息队列支持:Redis 和 RabbitMQ。相对来说 RabbitMQ 相对 Redis 更重量级一些,如果只是一些简单的业务场景,使用 Redis 作为消息队列也足够了。RabbitMQ不属本文的讲解范文,所以主要看看 Redis 消息队列的配置文件。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<bean id="mapSerializer" class="org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer">
<constructor-arg type="java.lang.Class" value="java.util.HashMap"/>
<property name="objectMapper" ref="objectMapper"/>
</bean>
<!-- 发布消息工具类 -->
<bean class="com.hand.hap.message.impl.MessagePublisherImpl"/>
<!--发布/订阅监听-->
<bean class="com.hand.hap.message.TopicListenerContainer">
<property name="connectionFactory" ref="v2redisConnectionFactory"/>
<property name="recoveryInterval" value="10000"/>
<!--<property name="messageListeners" ref="messageListeners"/>-->
</bean>
<bean id="simpleQueueConsumer" class="com.hand.hap.message.impl.SimpleMessageConsumer"/>
<!--队列监听-->
<bean class="com.hand.hap.message.QueueListenerContainer">
<property name="connectionFactory" ref="v2redisConnectionFactory"/>
<property name="recoveryInterval" value="5000"/>
<property name="stringRedisSerializer" ref="stringRedisSerializer"/>
<property name="listeners">
<list>
<!-- auto detect bean with annotation QueueMonitor -->
</list>
</property>
</bean>
</beans>
v2redisConnectionFactory 在application-redis.x配置文件中已经定义过了,这里没必要刨根问底这些配置项对应的的各个含义,没有太大意思。SimpleMessageConsumer 在这里没什么做哦有那个,所以,这里主要关注的内容有个:MessagePublisherImpl、TopicListenerContainer、、QueueListenerContainer。除了MessagePublisherImpl是和消息发布有关,其它两个都是属于消息监听,刚好对应Redis消息队列的两种模式,TopicListenerContainer 对应 Topic 模式, QueueListenerContainer对应 Queue模式。
MessagePublisherImpl
这其实是一个发布消息工具类。不太清楚为什么要在配置文件里面配置,加个注解不是会更好吗?或许也是为了方便我们理解吧。其源码如下
/*
* #{copyright}#
*/
package com.hand.hap.message.impl;
import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hand.hap.message.IMessagePublisher;
@Component
public class MessagePublisherImpl implements IMessagePublisher {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private Logger logger = LoggerFactory.getLogger(MessagePublisherImpl.class);
@Override
public void publish(String channel, Object message) {
//添加前缀
channel = ChannelAndQueuePrefix.addPrefix(channel);
if (message instanceof String || message instanceof Number) {
redisTemplate.convertAndSend(channel, message.toString());
} else {
try {
redisTemplate.convertAndSend(channel, objectMapper.writeValueAsString(message));
} catch (JsonProcessingException e) {
if (logger.isErrorEnabled()) {
logger.error("publish message failed.", e);
}
}
}
}
@Override
public void rPush(String list, Object message) {
message(list, message);
}
@Override
public void message(String name, Object message) {
if (message instanceof String || message instanceof Number) {
redisTemplate.opsForList().rightPush(name, message.toString());
} else {
try {
redisTemplate.opsForList().rightPush(name, objectMapper.writeValueAsString(message));
} catch (JsonProcessingException e) {
if (logger.isErrorEnabled()) {
logger.error("push data failed.", e);
}
}
}
}
}
发布消息的时候,也是调用spring-data-redis的API,这里就不过多说明。
TopicListenerContainer
这个类要具体分析,起源吗如下:
/*
* Copyright Hand China Co.,Ltd.
*/
package com.hand.hap.message;
import com.hand.hap.core.AppContextInitListener;
import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author shengyang.zhou@hand-china.com
*/
public class TopicListenerContainer extends RedisMessageListenerContainer implements AppContextInitListener {
private Logger logger = LoggerFactory.getLogger(TopicListenerContainer.class);
@Override
public boolean isAutoStartup(){
return false;
}
@Override
public void contextInitialized(ApplicationContext applicationContext) {
Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>();
monitors.forEach((k, v) -> {
Class<?> clazz = v.getClass();
TopicMonitor tm = clazz.getAnnotation(TopicMonitor.class);
String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
List<Method> avaMethods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
if (avaMethods.isEmpty()) {
if (logger.isErrorEnabled()) {
logger.error("can not find proper method of name '{}' for bean {}", mn, v);
}
return;
}
SimpleMessageListener adaptor = new SimpleMessageListener(v, avaMethods.get(0));
List<Topic> topics = new ArrayList<>();
for (String t : tm.channel()) {
//添加前缀
t = ChannelAndQueuePrefix.addPrefix(t);
Topic topic = new PatternTopic(t);
topics.add(topic);
}
listeners.put(adaptor, topics);
});
setMessageListeners(listeners);
// start(); // auto call
// if (listeners != null) {
// for (ITopicMessageListener receiver : listeners) {
// MessageListenerAdapter messageListener = new MessageListenerAdapter(receiver, "onTopicMessage");
// if (receiver.getRedisSerializer() != null) {
// messageListener.setSerializer(receiver.getRedisSerializer());
// }
// messageListener.afterPropertiesSet();
// List<Topic> topics = new ArrayList<>();
// for (String t : receiver.getTopic()) {
// Topic topic = new PatternTopic(t);
// topics.add(topic);
// }
// listeners.put(messageListener, topics);
// }
// }
}
private static class SimpleMessageListener implements MessageListener {
private RedisSerializer redisSerializer;
private Object target;
private Method method;
private Logger logger;
SimpleMessageListener(Object target, Method method) {
this.target = target;
this.method = method;
Class p0 = method.getParameterTypes()[0];
redisSerializer = MethodReflectUtils.getProperRedisSerializer(p0);
logger = LoggerFactory.getLogger(target.getClass());
}
@Override
public void onMessage(Message message, byte[] pattern) {
try {
Object obj = redisSerializer.deserialize(message.getBody());
String p = new String(pattern, "UTF-8");
//去掉前缀
p = ChannelAndQueuePrefix.removePrefix(p);
method.invoke(target, obj, p);
} catch (Exception e) {
Throwable thr = e;
while (thr.getCause() != null) {
thr = thr.getCause();
}
if (logger.isErrorEnabled()) {
logger.error(thr.getMessage(), thr);
}
}
}
}
}
实现了 AppContextInitListener 类,所以在项目启动的时候 contextInitialized 方法将被调用。整体思路就是:在项目启动的时候,根据注解找到所以的 订阅者,然后维护“订阅者”和“主题”的关系,然后交给消息监听器。
Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
就是通过这行代码找到所有的消息订阅者。TopicMonitor 注解如下,包括两个属性:channel 和 method。channel 就是主题,;可能大家对method 的作用不是很理解,我们知道,实现了 MessageListener 的消息监听器,在收到消息的时候,会执行 onMessage() 方法,这个 method ,就是为了让消息监听器收到消息时可以执行别的方法,而这个方法名就是通过 method 属性定义。
package com.hand.hap.message;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicMonitor {
String[] channel() default {};
/**
* default empty,auto detect depends on object type.
* <p>
* IQueueMessageListener:onQueueMessage<br>
* OTHERS:onMessage
*
*/
String method() default "";
}
获取到所有的消息执订阅者之后,然后进行遍历,下面就是获取方法名的具体逻辑
String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
public static String getTopicMethodName(String mn, Object target) {
if (org.apache.commons.lang.StringUtils.isBlank(mn)) {
if (target instanceof ITopicMessageListener) {
mn = ITopicMessageListener.DEFAULT_METHOD_NAME;
} else {
mn = IMessageConsumer.DEFAULT_METHOD_NAME;
}
}
return mn;
}
也就是说,如果代码中是同通过 实现 ITopicMessageListener 来实现消息监听的话,则会执行 onTopicMessage 方法,或者会执行 onMessage 方法。
如上所示,在获取到方法名之后,组装 "消息订阅者" 和 "Topic" 为Map对象, 因为 一个 订阅者 可以订阅多个 Topic,所以是一对多的关系。组装之后,然后将 listeners 交给 spring-data-redi管理,最后通过反射执行具体的逻辑代码。
QueueListenerContainer
QueueListenerContainer没有继承RedisMessageListenerContainer,所以它的实现方式有些不同,相当于是框架为我们封装了一套API,其具体实现如下:
/*
* #{copyright}#
*/
package com.hand.hap.message;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import com.hand.hap.core.AppContextInitListener;
import redis.clients.jedis.Jedis;
public class QueueListenerContainer implements AppContextInitListener, DisposableBean, SmartLifecycle {
private Logger logger = LoggerFactory.getLogger(QueueListenerContainer.class);
private RedisConnectionFactory connectionFactory;
private static final int PHASE = 9999;
private static final long MIN_RECOVERY_INTERVAL = 2000L;
private static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
/**
* 100ms.
*/
private static final long IDLE_SLEEP_TIME = 100L;
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
private volatile boolean running = false;
private ExecutorService executorService;
private List<IQueueMessageListener<?>> listeners;
private List<MonitorTask> monitorTaskList = new ArrayList<>();
private RedisSerializer<String> stringRedisSerializer;
public RedisConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public long getRecoveryInterval() {
return recoveryInterval;
}
public void setRecoveryInterval(long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
if (recoveryInterval < MIN_RECOVERY_INTERVAL) {
if (logger.isWarnEnabled()) {
logger.warn("minimum for recoveryInterval is {}", MIN_RECOVERY_INTERVAL);
}
this.recoveryInterval = MIN_RECOVERY_INTERVAL;
}
}
public List<IQueueMessageListener<?>> getListeners() {
return listeners;
}
public void setListeners(List<IQueueMessageListener<?>> listeners) {
this.listeners = listeners;
}
public RedisSerializer<String> getStringRedisSerializer() {
return stringRedisSerializer;
}
@Autowired
public void setStringRedisSerializer(RedisSerializer<String> stringRedisSerializer) {
this.stringRedisSerializer = stringRedisSerializer;
}
@Override
public void destroy() throws Exception {
stop();
}
@Override
public void contextInitialized(ApplicationContext applicationContext) {
if (listeners == null) {
listeners = new ArrayList<>();
}
Map<String, Object> lts = applicationContext.getBeansWithAnnotation(QueueMonitor.class);
lts.forEach((k, v) -> {
Class clazz = v.getClass();
QueueMonitor qm = (QueueMonitor) clazz.getAnnotation(QueueMonitor.class);
final String queue = qm.queue();
String mn = MethodReflectUtils.getQueueMethodName(qm.method(), v);
List<Method> methods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
if (methods.isEmpty()) {
if (logger.isErrorEnabled()) {
logger.error("can not find proper method of name '{}' for bean {}", mn, v);
}
return;
}
final Method method = methods.get(0);
IQueueMessageListener qml = new SimpleQueueListener(queue, v, method);
listeners.add(qml);
});
executorService = Executors.newFixedThreadPool(listeners.size());
for (IQueueMessageListener<?> receiver : listeners) {
MonitorTask task = new MonitorTask(receiver);
monitorTaskList.add(task);
executorService.execute(task);
}
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (!running) {
running = true;
if (logger.isDebugEnabled()) {
logger.debug("startup success");
}
}
}
@Override
public void stop() {
if (isRunning()) {
running = false;
monitorTaskList.forEach(MonitorTask::stop);
executorService.shutdownNow();
if (logger.isDebugEnabled()) {
logger.debug("shutdown complete");
}
}
}
@Override
public boolean isRunning() {
return running;
}
@Override
public int getPhase() {
return PHASE;
}
private static class SimpleQueueListener implements IQueueMessageListener {
private String queue;
private Object target;
private Method method;
private RedisSerializer redisSerializer;
private Logger logger;
SimpleQueueListener(String queue, Object target, Method method) {
this.queue = queue;
this.target = target;
this.method = method;
this.redisSerializer = MethodReflectUtils.getProperRedisSerializer(method.getParameterTypes()[0]);
this.logger = LoggerFactory.getLogger(target.getClass());
}
@Override
public String getQueue() {
return queue;
}
@Override
public RedisSerializer getRedisSerializer() {
return redisSerializer;
}
@Override
public void onQueueMessage(Object message, String queue) {
try {
method.invoke(target, message, queue);
} catch (Exception e) {
Throwable thr = e;
while (thr.getCause() != null) {
thr = thr.getCause();
}
if (logger.isErrorEnabled()) {
logger.error(thr.getMessage(), thr);
}
}
}
}
/**
*
* @param <T>
*/
private class MonitorTask<T> implements SchedulingAwareRunnable {
private IQueueMessageListener<T> receiver;
private RedisConnection connection;
private boolean running = false;
MonitorTask(IQueueMessageListener<T> receiver) {
this.receiver = receiver;
Assert.notNull(receiver, "receiver is null.");
Assert.hasText(receiver.getQueue(), "queue is not valid");
}
public void stop() {
running = false;
safeClose(true);
}
@Override
public void run() {
running = true;
T message;
while (running) {
try {
if (connection == null) {
connection = connectionFactory.getConnection();
}
message = fetchMessage(connection, receiver.getQueue());
if (message == null) {
sleep_(IDLE_SLEEP_TIME);
continue;
}
} catch (Throwable thr) {
if (!running) {
break;
}
safeClose();
if (logger.isDebugEnabled()) {
logger.error("exception occurred while get message from queue [" + receiver.getQueue() + "]",
thr);
logger.debug("try recovery after {}ms", getRecoveryInterval());
}
sleep_(getRecoveryInterval());
continue;
}
try {
receiver.onQueueMessage(message, receiver.getQueue());
} catch (Throwable thr) {
if (logger.isWarnEnabled()) {
logger.warn("exception occurred while receiver consume message.", thr);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("stop monitor:" + this);
}
safeClose();
}
T fetchMessage(RedisConnection connection, String queue) {
List<byte[]> bytes = connection.bLPop(0, stringRedisSerializer.serialize(queue));
if (bytes == null || bytes.isEmpty()) {
return null;
}
return receiver.getRedisSerializer().deserialize(bytes.get(1));
}
void safeClose(boolean... closeNative) {
if (connection != null) {
try {
if (closeNative.length > 0 && closeNative[0]) {
// close native connection to interrupt blocked
// operation
((Jedis) connection.getNativeConnection()).disconnect();
}
connection.close();
} catch (Exception e) {
// if (logger.isErrorEnabled()) {
// logger.error(e.getMessage(), e);
// }
}
}
connection = null;
}
void sleep_(long time) {
try {
Thread.sleep(time);
} catch (Exception e) {
// if (logger.isErrorEnabled()) {
// logger.error(e.getMessage(), e);
// }
}
}
@Override
public boolean isLongLived() {
return true;
}
}
}
前面的实现基本类似,区别如果实现了IQueueMessageListener 类,消息监听器收到消息的时候会执行 onQueueMessage 方法;另一个区别就是这里用了另外一个 注解 QueueMonitor
package com.hand.hap.message;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueMonitor {
String queue() default "";
/**
* default empty,auto detect depends on object type.
* <p>
* ITopicMessageListener:onTopicMessage<br>
* OTHERS:onMessage
*
*/
String method() default "";
}
获取到所有的消息监听器之后,开启多线程执行消息监听。因为存在多个消费者监听同一个队列的情况,存在消费者竞争,所以需要判断一下消息是否还在。
使用方式
package com.hand.hap.activiti.manager;
import com.hand.hap.activiti.util.ActivitiUtils;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.Position;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.mapper.PositionMapper;
import com.hand.hap.message.IMessageConsumer;
import com.hand.hap.message.TopicMonitor;
import org.activiti.engine.identity.Group;
import org.activiti.engine.impl.persistence.entity.UserEntity;
import org.activiti.engine.impl.persistence.entity.data.impl.MybatisUserDataManager;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author shengyang.zhou@hand-china.com
*/
@TopicMonitor(channel = "employee.change")
public class CustomUserDataManager extends MybatisUserDataManager
implements IMessageConsumer<Employee>, InitializingBean {
@Autowired
private PositionMapper positionMapper;
@Autowired
private EmployeeMapper employeeMapper;
public Map<String, UserEntity> userCache = new HashMap<>();
@Autowired
private SpringProcessEngineConfiguration pec;
public CustomUserDataManager() {
super(null);
}
@Override
public List<Group> findGroupsByUser(String userId) {
List<Position> positions = positionMapper.getPositionByEmployeeCode(userId);
List<Group> gs = new ArrayList<>();
for (Position position : positions) {
gs.add(ActivitiUtils.toActivitiGroup(position));
}
return gs;
}
/**
* 这个方法使用非常频繁,做缓存支持
*
* @param entityId
* @return
*/
@Override
public UserEntity findById(String entityId) {
UserEntity userEntity = userCache.get(entityId);
if (userEntity == null) {
Employee employee = employeeMapper.queryByCode(entityId);
userEntity = ActivitiUtils.toActivitiUser(employee);
userCache.put(entityId, userEntity);
}
return userEntity;
}
@Override
public void onMessage(Employee message, String pattern) {
userCache.remove(message.getEmployeeCode());
}
@Override
public void afterPropertiesSet() throws Exception {
this.processEngineConfiguration = pec;
}
}
package com.hand.hap.hr.service.impl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.StringUtil;
import com.hand.hap.account.dto.User;
import com.hand.hap.account.dto.UserRole;
import com.hand.hap.account.mapper.UserRoleMapper;
import com.hand.hap.account.service.IUserRoleService;
import com.hand.hap.account.service.IUserService;
import com.hand.hap.cache.impl.UserCache;
import com.hand.hap.core.IRequest;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.UserAndRoles;
import com.hand.hap.hr.mapper.EmployeeAssignMapper;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.service.IEmployeeService;
import com.hand.hap.message.IMessagePublisher;
import com.hand.hap.message.TopicMonitor;
import com.hand.hap.mybatis.common.Criteria;
import com.hand.hap.system.dto.DTOStatus;
import com.hand.hap.system.service.impl.BaseServiceImpl;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Service
public class EmployeeServiceImpl extends BaseServiceImpl<Employee> implements IEmployeeService {
@Autowired
EmployeeMapper employeeMapper;
@Autowired
private IMessagePublisher messagePublisher;
@Autowired
private IUserRoleService userRoleService;
@Autowired
private IUserService userService;
@Autowired
private UserRoleMapper userRoleMapper;
@Autowired
private EmployeeAssignMapper employeeAssignMapper;
@Autowired
private UserCache userCache;
@Autowired
private ApplicationContext applicationContext;
@Override
public List<Employee> submit(IRequest request, List<Employee> list) {
self().batchUpdate(request, list);
for (Employee e : list) {
messagePublisher.publish("employee.change", e);
}
return list;
}
.......
}
当在界面修改了员工信息的时候,会调用 messagePublisher.publish("employee.change", e); 方法,因为 CustomUserDataManager 监听了 "employee.change" 主题的消息,所以在发布之后,会执行CustomUserDataManager .onMessage 的方法,消息队列的使用方式与之类似。
思考
在Hap中,定义过多的 Queue 消息监听器会影响系统性能吗?一个消息监听器就要开启一个线程,如果消息监听器太多,线程池够用吗?不知道自己的思路有没有问题,值得验证。