【1、前言】最近项目遇到一个消息订阅的需求,感觉kafka和activeMQ用起来太繁琐,正好项目中都有用到redis做数据源,就之前用redis充当了消息订阅的中间介质,用了之后感觉非常方便(尽管功能上不及其他真正的MQ,但是完全满足当前业务需求),分享一下。
【2、场景描述】
项目组承接多个业务方,每个业务方接入后都需要所有的项目知道该业务方的接入,而后处理。由于项目多,开发语言的不同,前期都是在一个业务方接入之后,每个工程进行同步的数据添加此业务方信息(有加SQL、有加配置、还有加静态常量的)。总之,一个业务方的接入会造成每个系统的更改,无形中造成很多工作量。所以需要一个系统去统一的管理业务方信息而后统一调配。很明显消息订阅是最适合也最容易处理这种场景。
【3、流程设计】
【4、代码实现】
4.1:依赖
dependencies {
compile('org.springframework.boot:spring-boot-starter-data-redis')
compile('redis.clients:jedis:2.9.0')
}
4.2:application配置
spring.redis.database=0
spring.redis.host=10.168.99.149
spring.redis.port=6379
#spring.redis.password=crs-pgudbomq:Sogou@123
spring.redis.pool.max-active=50
## 连接池耗尽时,等待获得新连接的时间,10秒
spring.redis.pool.max-wait=10000
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
## 连接超时时间,200毫秒
spring.redis.timeout=200
4.3:redis服务类(工具类)
@Service
public class RedisService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${global.env}")
private String env;
public int keys(String regEx){
try {
return redisTemplate.keys(regEx).size();
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
return -1;
}
public Object getValue(String key) {
try {
BoundValueOperations valueOperations = redisTemplate.boundValueOps(key);
Object retValue = valueOperations.get();
logger.info("get from redis {}:{}", key, retValue);
return retValue;
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
public void setValue(String key, Object value) {
try {
BoundValueOperations valueOperations = redisTemplate.boundValueOps(key);
valueOperations.set(String.valueOf(value), 1, TimeUnit.DAYS);
logger.info("set to redis {}:{}", key, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void setValue(String key, Object value,int timeOut) {
try {
BoundValueOperations valueOperations = redisTemplate.boundValueOps(key);
valueOperations.set(String.valueOf(value), timeOut, TimeUnit.SECONDS);
logger.info("set to redis {}:{}", key, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* 自增1
*/
public long increment(String key) {
// return redisTemplate.opsForValue().increment(env.concat("-").concat(key), 1);
logger.info("increment redis {}", key);
return redisTemplate.opsForValue().increment(key, 1);
}
/**
* 初始化0
*/
public void init(String key){
redisTemplate.opsForValue().set(env.concat("-").concat(key), String.valueOf(0));
}
public void delteKey(String key){
try {
redisTemplate.delete(key);
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* 键是否存在
*/
public boolean hasKey(String key){
return redisTemplate.hasKey(key);
}
/**
* 设置hash并设置有效时间
*/
public void hset(String key, String field, String value, int seconds){
try {
HashOperations<String, String, String> vo = redisTemplate.opsForHash();
vo.put(key,field,value);
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* hash添加
*/
public void hset(String key, String field, String value){
try {
HashOperations<String, String, String> vo = redisTemplate.opsForHash();
vo.put(key,field,value);
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* hash获取
*/
public String hget(String key, String field){
String value = null;
try {
HashOperations<String, String, String> vo = redisTemplate.opsForHash();
if(vo.hasKey(key,field)){
value = vo.get(key,field);
}
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
return value;
}
/**
* hash获取所有
*/
public Map<String,String> hgetall(String key){
HashOperations<String, String, String> vo = redisTemplate.opsForHash();
Map<String,String> map = new HashMap<>();
for(String haskey:vo.keys(key)){
String value = vo.get(key,haskey);
map.put(haskey,value);
}
return map;
}
}
4.4:消息发布类服务
/**
* @Author: LvFang
* @Date: Created in 2018/7/25.
* @Description:Redis消息生产类
*/
@Service
public class PublisherService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 发布消息接口
* @param channel
* @param message
*/
public boolean sendMessage(String channel, Serializable message){
try {
RcBusinessInfo info = JsonUtil.readJson2Entity(message.toString(),RcBusinessInfo.class);
redisTemplate.convertAndSend(channel, message);
logger.info("主题:{}",channel);
logger.info("消息内容:{}", info.getBusinessUid()+":"+info.getBusinessDesc()+":"+info.getMd5Salt());
return true;
} catch (Exception e) {
logger.info("消息发送失败!");
e.printStackTrace();
return false;
}
}
}
4.5:业务代码类
/**
* 添加业务方信息
* @param info
* @return
*/
public CallResult addRcBusinessInfo(RcBusinessInfo info){
//参数校验
//业务校验
//补全信息
//执行
Integer count = rcBusinessInfoDao.addRcBusinessInfo(info);
logger.info("方法addRcBusinessInfo进行了redis操作:{}",info.getBusinessUid()+":"+info.getBusinessDesc());
//-------------------------start-【Redis操作(持久化和消息发送)】-------------------------------
//同步NOSQL-redis
redisService.hset(Constants.REDIS_BUSINESS_KEY,info.getBusinessUid(), JsonUtil.toJson(info));
//发送消息
boolean falg = publisherService.sendMessage(Constants.REDIS_TOPIC_RC_CIKU,message);
if(!falg) return CallResult.fail(500,Constants.SEND_MESSAGE_FAIL);
//-------------------------end-【Redis操作(持久化和消息发送)】---------------------------------
return CallResult.success(count);
};
4.6:消息订阅类服务
/**
* @Author: LvFang
* @Date: Created in 2018/7/25.
* @Description:Redis消息消费类
*/
@Component("receiverService")
public class ReceiverService {
private static Logger logger = LoggerFactory.getLogger(ReceiverService.class);
public void receiveMessage(String message) {
try {
RcBusinessInfo info = JsonUtil.readJson2Entity(message.toString(),RcBusinessInfo.class);
logger.info("收到的mq消息:{}" ,info);
//消息处理
}catch (Exception e){
e.printStackTrace();
}
}
}
4.7:配置redis监听主题
/**
* @Author: LvFang
* @Date: Created in 2018/7/25.
* @Description:用于redis消息监听支持
*/
@Configuration
public class RedisReivceConfig {
private static Logger logger = LoggerFactory.getLogger(ReceiverService.class);
//---------------------------------------消息监听-----------------------------------------------------
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean //相当于xml中的bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫具体的 的通道
container.addMessageListener(listenerAdapter, new PatternTopic(Constants.REDIS_TOPIC_RC_OUTERAPI));
//这个container 可以添加多个 messageListener
logger.info("redis 对监听主题是:{}",Constants.REDIS_TOPIC_RC_OUTERAPI);
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(ReceiverService receiver) {
/*这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
这里实质就是监听调用ReceiverService类中的receiveMessage方法
*/
logger.info("redis 的监听处理类方法为:{}","com.autohome.outerapi.service.ReceiverService.receiveMessage()");
return new MessageListenerAdapter(receiver, "receiveMessage");
}
4.8:日志展示
4.8.1、消息生产方
2018-07-31 16:40:25.894 INFO 97080 --- [o-auto-1-exec-1] c.a.r.c.RcBusinessInfoController : 请求参数:RcBusinessInfo{id=null, interfaceType=2, businessUid='test_31', businessDesc='测试_31', md5Salt='3ef2a564efa4a23c7ac7944bfbeaf213', linkMan='lvfang', callbackUrl='http://www.baidu.com', managerUsername='admin1', managerPassword='123', aesSalt='这是一个手机号', scoreType=1, usableDate=Tue Jul 31 16:40:25 CST 2018, limitDay=10000, limitTotal=150000, infoState=1, createTime=null, modifyTime=null}
2018-07-31 16:40:26.032 INFO 97080 --- [o-auto-1-exec-1] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} inited
2018-07-31 16:40:26.943 INFO 97080 --- [o-auto-1-exec-1] c.a.rc.service.RcBusinessInfoService : 方法addRcBusinessInfo进行了redis操作:test_31:测试_31
2018-07-31 16:40:27.024 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 主题:redis_topic_rc_ciku
2018-07-31 16:40:27.025 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 消息内容:test_31:测试_31:3ef2a564efa4a23c7ac7944bfbeaf213
2018-07-31 16:40:27.026 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 主题:redis_topic_rc_onlineapi
2018-07-31 16:40:27.026 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 消息内容:test_31:测试_31:3ef2a564efa4a23c7ac7944bfbeaf213
2018-07-31 16:40:27.028 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 主题:redis_topic_rc_nlpbe
2018-07-31 16:40:27.028 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 消息内容:test_31:测试_31:3ef2a564efa4a23c7ac7944bfbeaf213
2018-07-31 16:40:27.030 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 主题:redis_topic_rc_outerapi
2018-07-31 16:40:27.030 INFO 97080 --- [o-auto-1-exec-1] c.autohome.rc.service.PublisherService : 消息内容:test_31:测试_31:3ef2a564efa4a23c7ac7944bfbeaf213
2018-07-31 16:40:27.068 INFO 97080 --- [pool-2-thread-1] c.a.rc.service.RcBusinessInfoService : 数据同步mysql-redis , businessInfos size : 31
{"code":200,"message":"default success","result":1,"success":true}
2018-07-31 16:40:27.199 INFO 97080 --- [ Thread-4] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5456afaa: startup date [Tue Jul 31 16:40:15 CST 2018]; root of context hierarchy
2018-07-31 16:40:27.246 INFO 97080 --- [ Thread-4] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} closed
4.8.2、消息消费方
2018-07-31 16:39:22.876 [main] [INFO ] com.autohome.RcOnlineOuterapiApplication {StartupInfoLogger.java:59} - Started RcOnlineOuterapiApplication in 18.732 seconds (JVM running for 20.351)
2018-07-31 16:40:27.190 [container-2] [INFO ] com.autohome.outerapi.service.ReceiverService {ReceiverService.java:40} - 收到的mq消息:com.autohome.outerapi.po.RcBusinessInfo@5cc9f755
2018-07-31 16:40:27.705 [container-2] [INFO ] com.alibaba.druid.pool.DruidDataSource {DruidDataSource.java:785} - {dataSource-1} inited
Tue Jul 31 16:40:27 CST 2018 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2018-07-31 16:40:28.989 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.findInfoByUid {BaseJdbcLogger.java:159} - ==> Preparing: select * from rc_admin_api_profile WHERE uid = ?
2018-07-31 16:40:29.021 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.findInfoByUid {BaseJdbcLogger.java:159} - ==> Parameters: test_31(String)
2018-07-31 16:40:29.057 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.findInfoByUid {BaseJdbcLogger.java:159} - <== Total: 0
2018-07-31 16:40:29.060 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.addProfile {BaseJdbcLogger.java:159} - ==> Preparing: INSERT INTO rc_admin_api_profile (uid, contact, contact_dev, md5_salt, aes_salt, limit_cnt, is_risk, ctime, utime) VALUES (?,?,?,?,?,?,?,?,?)
2018-07-31 16:40:29.063 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.addProfile {BaseJdbcLogger.java:159} - ==> Parameters: test_31(String), 测试_31(String), lvfang(String), 3ef2a564efa4a23c7ac7944bfbeaf213(String), 这是一个手机号(String), 150000(Integer), 1(String), 2018-07-31 16:40:29.059(Timestamp), 2018-07-31 16:40:29.059(Timestamp)
2018-07-31 16:40:29.348 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.addProfile {BaseJdbcLogger.java:159} - <== Updates: 1
2018-07-31 16:40:29.348 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.addProfile!selectKey {BaseJdbcLogger.java:159} - ==> Preparing: SELECT LAST_INSERT_ID() AS ID
2018-07-31 16:40:29.349 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.addProfile!selectKey {BaseJdbcLogger.java:159} - ==> Parameters:
2018-07-31 16:40:29.355 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.addProfile!selectKey {BaseJdbcLogger.java:159} - <== Total: 1
2018-07-31 16:40:29.356 [container-2] [INFO ] com.autohome.outerapi.service.ReceiverService {ReceiverService.java:84} - add-rcAdminApiProfile消息对象持久化成功-success:test_31:测试_31
2018-07-31 16:40:29.357 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.selectAll {BaseJdbcLogger.java:159} - ==> Preparing: select id, uid, contact, contact_dev, md5_salt, aes_salt, limit_cnt,is_risk,ctime, utime from rc_admin_api_profile
2018-07-31 16:40:29.357 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.selectAll {BaseJdbcLogger.java:159} - ==> Parameters:
2018-07-31 16:40:29.366 [container-2] [DEBUG] com.autohome.outerapi.dao.RcAdminApiProfileDao.selectAll {BaseJdbcLogger.java:159} - <== Total: 18
The daemon will cancel the build.
over!!!!!!!!!!!!!