redis只能支持简单的消息发布订阅,如果消息订阅需求复杂,可以选择其他MQ进行整合。
/**
* Redis 订阅发布功能
*
* @author whucke
* @since 2018/12/17 11:07
*/
public class PubSubscribe {
private Jedis jedis;
public PubSubscribe(Jedis jedis) {
this.jedis = jedis;
}
/**
* 发送订阅消息
*
* @param channel 通道名称
* @param message 消息
* @return
*/
public boolean publish(String channel, String message) {
Long result = jedis.publish(channel, message);
return result != null && result > 0;
}
}
/**
* 消息订阅实现
*
* @author whucke
* @since 2018/12/17 13:58
*/
public class Subscriber extends JedisPubSub {
/**
* 收到订阅消息处理
*
* @param channel
* @param message
*/
@Override
public void onMessage(String channel, String message) {
System.out.printf("通道%s 接收到信息:%s \r\n", channel, message);
}
/**
* 订阅通道调用
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.printf("订阅通道信息调用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
}
/**
* 取消订阅调用
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.printf("取消订阅信息调用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
}
}
/**
* 单独开启一个线程监听订阅消息
* @author whucke
* @since 2018/12/17 14:25
*/
public class SubThread extends Thread {
private Subscriber subscriber;
private Jedis jedis;
private String[] channels;
public SubThread(Jedis jedis, Subscriber subscriber, String[] channels) {
this.subscriber = subscriber;
this.jedis = jedis;
this.channels = channels;
}
@Override
public void run() {
jedis.subscribe(subscriber, channels);
}
}
/**
* 消息发布订阅测试
*
* @author whucke
* @since 2018/12/17 14:19
*/
public class PubSubscribeTest {
@Test
public void testPub() {
Jedis jedis = RedisFactory.getRedisClient();
PubSubscribe subscribe = new PubSubscribe(jedis);
for (int i = 0; i < 10; i++) {
boolean result = subscribe.publish("channel1", "消息发布测试_"+i);
System.out.println(result ? i+"_消息发送成功" : i+"_消息发送失败");
}
jedis.close();
}
public static void main(String[] args) {
Subscriber subscriber = new Subscriber();
Jedis jedis = RedisFactory.getRedisClient();
String[] channels = {"channel1"};
SubThread subThread = new SubThread(jedis, subscriber, channels);
subThread.start();
}
}