redis实现消息发布订阅

redis只能支持简单的消息发布订阅,如果消息订阅需求复杂,可以选择其他MQ进行整合。

/**
 * Redis 订阅发布功能
 *
 * @author whucke
 * @since 2018/12/17 11:07
 */
public class PubSubscribe {

    private Jedis jedis;

    public PubSubscribe(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     * 发送订阅消息
     *
     * @param channel 通道名称
     * @param message 消息
     * @return
     */
    public boolean publish(String channel, String message) {
        Long result = jedis.publish(channel, message);
        return result != null && result > 0;
    }

}
/**
 * 消息订阅实现
 *
 * @author whucke
 * @since 2018/12/17 13:58
 */
public class Subscriber extends JedisPubSub {

    /**
     * 收到订阅消息处理
     *
     * @param channel
     * @param message
     */
    @Override
    public void onMessage(String channel, String message) {
        System.out.printf("通道%s 接收到信息:%s \r\n", channel, message);
    }

    /**
     * 订阅通道调用
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.printf("订阅通道信息调用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
    }

    /**
     * 取消订阅调用
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.printf("取消订阅信息调用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
    }
}

/**
 * 单独开启一个线程监听订阅消息
 * @author whucke
 * @since 2018/12/17 14:25
 */
public class SubThread extends Thread {

    private Subscriber subscriber;
    private Jedis jedis;
    private String[] channels;

    public SubThread(Jedis jedis, Subscriber subscriber, String[] channels) {
        this.subscriber = subscriber;
        this.jedis = jedis;
        this.channels = channels;
    }

    @Override
    public void run() {
        jedis.subscribe(subscriber, channels);
    }
}

/**
 * 消息发布订阅测试
 *
 * @author whucke
 * @since 2018/12/17 14:19
 */
public class PubSubscribeTest {

    @Test
    public void testPub() {
        Jedis jedis = RedisFactory.getRedisClient();
        PubSubscribe subscribe = new PubSubscribe(jedis);
        for (int i = 0; i < 10; i++) {
            boolean result = subscribe.publish("channel1", "消息发布测试_"+i);
            System.out.println(result ? i+"_消息发送成功" : i+"_消息发送失败");
        }
        jedis.close();
    }


    public static void main(String[] args) {
        Subscriber subscriber = new Subscriber();
        Jedis jedis = RedisFactory.getRedisClient();
        String[] channels = {"channel1"};
        SubThread subThread = new SubThread(jedis, subscriber, channels);
        subThread.start();
    }

}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容