Redis总结(五)redis发布订阅模式

本篇将向大家介绍怎么通过redis来实现订阅和发布功能

首先介绍一下实现功能的主要几个命令:

  1. SUBSCRIBE 命令,这个命令可以让我们订阅任意数量的频道
  2. PUBLISH 命令,此命令是用来发布消息
  3. PSUBSCRIBE命令,此命令用来支持模糊订阅的功能

在展示具体的demo之前,我们先简单了解下这其中的原理:

在redisServer结构中的其中一个属性pubsub_channels是用来记录channel和客户端之间的关系,是使用key-->List的数据格式。如图:


图1.png

在我们使用SUBSCRIBE 命令在客户端client10086订阅了channel1 channel2,channel3

订阅:

SUBSCRIBE channel1 channel2,channel3

这时pubsub_channels的数据将会变为,如图:


图2.png

这就可以看出来执行SUBSCRIBE 命令就是将客户端信息添加到对应的channel对应列表的尾部。

模式订阅:
模式订阅设计到redisServer的另一个属性pubsub_patterns,也是一个链表,里面存储着客户端订阅的所有模式。结构如下图:

图3.png

当客户端订阅了一个模式,此时结构变为:


图4.png

发布:

PUBLISH 命令发布消息将消息推送到对应的客户端

在执行PUBLISH 命令发布消息的时候,首先会在pubsub_channels上找到对应的channel,遍历其中所有的client信息,将消息发送到所有client;同时也会在pubsub_patterns上遍历找到匹配的模式,发给对应的客户端

取消订阅:

UNSUBSCRIBE命令取消对应客户端的订阅

当执行UNSUBSCRIBE命令时则将对应的client从channel列表中移除

具体demo如下:

首先我们创建两个客户端执行体:

package com.example.redisdemo.service;


import redis.clients.jedis.JedisPubSub;

/**
 * 订阅消息消费体.
 * @author yzlu
 */
public class OneJedisPubSub extends JedisPubSub {

   //接收到消息时执行
    @Override
    public void  onMessage(String channel, String message){
        System.out.println("oneJedisPubSub message is" + message);
    }

    //接收到模式消息时执行
    @Override
    public void onPMessage(String pattern, String channel, String message){
        System.out.println("oneJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message);
    }

    //订阅时执行
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("oneJedisPubSub订阅成功");
    }

    //取消订阅时执行
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels){
        System.out.println("oneJedisPubSub取消订阅"+channel);
    }


    //取消模式订阅时执行
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("oneJedisPubSub取消多订阅"+pattern);
    }

}

package com.example.redisdemo.service;

import redis.clients.jedis.JedisPubSub;

public class SecondJedisPubSub extends JedisPubSub {

  //接收到消息时执行
    @Override
    public void  onMessage(String channel, String message){
        System.out.println(" SecondJedisPubSub message is" + message);
    }

    //接收到模式消息时执行
    @Override
    public void onPMessage(String pattern, String channel, String message){
        System.out.println("SecondJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message);
    }

    //取消订阅时执行
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels){
        System.out.println("SecondJedisPubSub 取消订阅"+channel);
    }

    //订阅时执行
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("SecondJedisPubSub订阅成功");
    }

    //取消模式订阅时执行
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("SecondJedisPubSub 取消多订阅"+pattern);
    }

}

然后我们开始给这两个客户端订阅消息

@RestController
@RequestMapping("test")
@Slf4j
public class TestController {
    @Autowired
    private RedisClient redisClient;
    private final OneJedisPubSub oneJedisPubSub = new OneJedisPubSub();

    private final SecondJedisPubSub secondJedisPubSub = new SecondJedisPubSub();
    
    @PostMapping("subscribe")
    public void  subscribe(@RequestBody QueueTest queueTest){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if("1".equals(queueTest.getTopic())){
                        redisClient.subscribe(oneJedisPubSub,"topic1","topic2");
                    }
                    if("2".equals(queueTest.getTopic())){
                        redisClient.subscribe(secondJedisPubSub,"topic2");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    
    
}

请求情况如下图:


图5.png

图7.png

请求结果如图,可以看出我们将两个客户端都订阅了一定channel


图6.png
图8.png

此时OneJedisPubSub订阅了topic1和topic2,SecondJedisPubSub订阅了topic2,我们尝试推送消息,demo如下:

  @PostMapping("push")
    public void push(@RequestBody QueueTest queueTest){
        log.info("发布一条消息");
        Long publish = redisClient.publish(queueTest.getTopic(), queueTest.getName());
        System.out.println("消费者数量"+publish);
    }

请求如下:


图9.png

结果如下:


图10.png

可以看到我们往topic1发布了消息只有OneJedisPubSub接收到了消息,接下来我们往topic2发布消息

请求如下:

图11.png

结果如下:


图12.png

可以看到此时两个客户端都接收到了消息

在测试完毕客户端接收消息的能力,我们这时取消SecondJedisPubSub订阅topic2,demo如下:

  @PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消订阅消息");
        try {
            secondJedisPubSub.unsubscribe(queueTest.getTopic());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

请求如下:


图13.png

结果如下:


图14.png

在取消后我们再往topic2推送消息,可以看到只有一个客户端接收消息

如图:


图15.png
图16.png

至此我们实验了大部分场景,至于模式订阅由于贴图太麻烦,我就将代码提供出来,大家可以自己实验:

   @PostMapping("subscribe")
    public void  subscribe(@RequestBody QueueTest queueTest){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if("1".equals(queueTest.getTopic())){
                        redisClient.pubsubPattern(oneJedisPubSub,"topic*");
                    }
                    if("2".equals(queueTest.getTopic())){
                        redisClient.subscribe(secondJedisPubSub,"topic2");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
   @PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消模式订阅消息");
        try {
            secondJedisPubSub.punsubscribe(queueTest.getTopic());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

最后将redisclient的代码提供给大家

@Component("redisClient")
@Slf4j
public class RedisClient {
    @Resource
    private JedisPool jedisPool;
    
       /**
     * 发布消息
     * @param topic
     * @param message
     */
    public Long publish(String topic,String message){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.publish(topic, message);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
    }

    /**
     * 订阅消息
     * @param jedisPubSub
     * @param topics
     */
    public void subscribe(JedisPubSub jedisPubSub, String... topics) throws Exception {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(jedisPubSub,topics);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }

        }
    }

    /**
     * 模式匹配订阅消息
     * @param topic
     */
    public void pubsubPattern(JedisPubSub jedisPubSub,String topic){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.psubscribe(jedisPubSub,topic);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
    }
}

以上就是reids的发布订阅功能,当然此篇和上一篇一样只是为了熟悉redis的一些功能,真正需要这种场景时我们完全可以使用mq的交换机同样可以实现,而且更专业,但是如果大家懒得维护mq那么这种情况也可以使用。

等redis系列写完后续也会陆续写出mq的系列文章,但是由于最近上班的开发任务比较众,所以能够抽出时间写文章的时间不多,更新可能比较慢,大家谅解。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容