jedis高级特性初探

首先我们要谈一下redis,他是一款常见且常用的缓存数据库。他有几个显著的特性让开发与DBA所喜爱。

1. Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。即使宕掉,数据也不会丢失。

2. Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。这样就给开发同学提供了更多的选择。

3. Redis支持数据的备份,即master-slave模式的数据备份,数据稳定存在

jedis是redis的一个性能良好的客户端,里面包含了所有的redis的特性,用起来相当爽,本文着重讲述jedis中的高级特性。本文实例是在2.9.0版本上测试通过。自己测试时可以引入下面的内容,下面开始。

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

一. Publish/Subscribe(发布订阅)
观察者模式的具体体现。常用来解耦合,有很多版本都实现了此类的功能,比如说Rxjava,EventBus,Zk等。jedis也有自己的实现,首先说一下原理:首先客户端与服务端建立连接,客户端注册感兴趣的事件,同时定义一个当感兴趣的事件过来时,需要如何处理的回调(继承JedisPubSub类,覆写里面的方法,例如onMessage,是消息接受的方法)。客户端或设置与服务端的socket链接超时时间为0(不超时),然后阻塞在RedisInputStream中的read方法,等待消息的到来(也就是说RedisInputStream中有数据)。当有的客户端发布了感兴趣的信息,那么redis 的 server端就会查询哪些client,关注了这个消息,然后,将结果放入该客户端的RedisInputStream中,客户端就接收了感兴趣的消息,然后处理自己的逻辑。下面是测试代码实例:

static class MyListener extends JedisPubSub {

    @Override
    public void onMessage(String channel, String message) {
        System.out.println(channel + "============" + message);
        super.onMessage(channel, message);
    }
}
 
@Test
public void testSubscribe() {
    jedis.subscribe(new MyListener(), "channel1");
}

@Test
public void testPublish() {
    jedis.publish("channel1", "message1");
}

现在跟着代码看一下,关键性的步骤都增加了注释

public void proceed(Client client, String... channels) {
  this.client = client;
  client.subscribe(channels);   //订阅
  client.flush();               //数据flush,发送给server端
  process(client);              //do while 获取数据
}
private void process(Client client) {

  do {
    List<Object> reply = client.getRawObjectMultiBulkReply();    //阻塞获取server端传递的数据,根据数据,触发相应的回调
    final Object firstObj = reply.get(0);  //得到的列表中的第一个元素是服务端推送过来信息的事件类型,用来触发客户端定义的回调方法
    if (!(firstObj instanceof byte[])) {
      throw new JedisException("Unknown message type: " + firstObj);
    }
    final byte[] resp = (byte[]) firstObj;
    if (Arrays.equals(SUBSCRIBE.raw, resp)) {
      subscribedChannels = ((Long) reply.get(2)).intValue();
      final byte[] bchannel = (byte[]) reply.get(1);
      final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
      onSubscribe(strchannel, subscribedChannels);  //订阅上的回调
    } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
      subscribedChannels = ((Long) reply.get(2)).intValue();
      final byte[] bchannel = (byte[]) reply.get(1);
      final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
      onUnsubscribe(strchannel, subscribedChannels); //取消后的回调
    } else if (Arrays.equals(MESSAGE.raw, resp)) {
      final byte[] bchannel = (byte[]) reply.get(1);
      final byte[] bmesg = (byte[]) reply.get(2);
      final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
      final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
      onMessage(strchannel, strmesg);  //接收到信息的回调
    } else if (Arrays.equals(PMESSAGE.raw, resp)) {
      final byte[] bpattern = (byte[]) reply.get(1);
      final byte[] bchannel = (byte[]) reply.get(2);
      final byte[] bmesg = (byte[]) reply.get(3);
      final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
      final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
      final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
      onPMessage(strpattern, strchannel, strmesg);
    } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
      subscribedChannels = ((Long) reply.get(2)).intValue();
      final byte[] bpattern = (byte[]) reply.get(1);
      final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
      onPSubscribe(strpattern, subscribedChannels);
    } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
      subscribedChannels = ((Long) reply.get(2)).intValue();
      final byte[] bpattern = (byte[]) reply.get(1);
      final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
      onPUnsubscribe(strpattern, subscribedChannels);
    } else if (Arrays.equals(PONG.raw, resp)) {
      final byte[] bpattern = (byte[]) reply.get(1);
      final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
      onPong(strpattern);
    } else {
      throw new JedisException("Unknown message type: " + firstObj);
    }
  } while (isSubscribed());

  /* Invalidate instance since this thread is no longer listening */
  this.client = null;

  /*
   * Reset pipeline count because subscribe() calls would have increased it but nothing
   * decremented it.
   */
  client.resetPipelinedCount();
}
private static Object process(final RedisInputStream is) {

  final byte b = is.readByte();   //阻塞获取server端传递的数据,这里有必要点一下,因为这是核心,传统的IO流再获取数据的时候会阻塞,直到有数据过来。
  if (b == PLUS_BYTE) {
    return processStatusCodeReply(is);
  } else if (b == DOLLAR_BYTE) {
    return processBulkReply(is);
  } else if (b == ASTERISK_BYTE) {
    return processMultiBulkReply(is);
  } else if (b == COLON_BYTE) {
    return processInteger(is);
  } else if (b == MINUS_BYTE) {
    processError(is);
    return null;
  } else {
    throw new JedisConnectionException("Unknown reply: " + (char) b);
  }
}

二. pipeline(管道)
传统的交互方式是request->response request->response request->response,而pipeline是 request->request->request-> response->response->response,将多次请求的报文放到一个请求体中,服务端也是一样,将多次的响应信息合并成一次返回,所以 ,能不快吗?!这也就是pipeline的基本原理。下面是测试代码实例:

@Test
public void test36() {
    Pipeline pipelined = jedis.pipelined();
    pipelined.set("test1", "22");
    pipelined.set("test2", "33");

    pipelined.sync();   //无返回值

    pipelined.get("test1");
    pipelined.get("test2");

    List<Object> objects = pipelined.syncAndReturnAll();  //有返回值
    for (Object o : objects) {
        System.out.print(o + " ");
    }
}
 
result: 22 33

跟着代码看一下:

public Response<String> set(byte[] key, byte[] value) {
  getClient(key).set(key, value);     //执行set命令
  return getResponse(BuilderFactory.STRING);   //注意设置响应信息的顺序
}
protected Connection sendCommand(final Command cmd, final byte[]... args) {
  try {
    connect(); //与server端形成链接,实例化输入输出流
    Protocol.sendCommand(outputStream, cmd, args);   //往输出流中按照协议写入key与value,但不发送,调用sync()或syncAndReturnAll()时发送
    pipelinedCommands++;
    return this;
  } catch (JedisConnectionException ex) {
    /*
     * When client send request which formed by invalid protocol, Redis send back error message
     * before close connection. We try to read it to provide reason of failure.
     */
    try {
      String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
      if (errorMessage != null && errorMessage.length() > 0) {
        ex = new JedisConnectionException(errorMessage, ex.getCause());
      }
    } catch (Exception e) {
      /*
       * Catch any IOException or JedisConnectionException occurred from InputStream#read and just
       * ignore. This approach is safe because reading error message is optional and connection
       * will eventually be closed.
       */
    }
    // Any other exceptions related to connection?
    broken = true;
    throw ex;
  }
}
protected <T> Response<T> getResponse(Builder<T> builder) {
  Response<T> lr = new Response<T>(builder);  //初始化一个Response
  pipelinedResponses.add(lr);    //放入列表中,保证顺序
  return lr;
}

当调用syncAndReturnAll时

public List<Object> syncAndReturnAll() {
  if (getPipelinedResponseLength() > 0) {    //判断Response列表中的数量
    List<Object> unformatted = client.getAll();    //获取所有的结果,阻塞获取
    List<Object> formatted = new ArrayList<Object>();

    for (Object o : unformatted) {
      try {
        formatted.add(generateResponse(o).get());    //解析每一个得到的信息,因为原来都是byte[],需要解析成String
      } catch (JedisDataException e) {
        formatted.add(e);
      }
    }
    return formatted;
  } else {
    return java.util.Collections.<Object> emptyList();
  }
}
public List<Object> getAll(int except) {
  List<Object> all = new ArrayList<Object>();
  flush();
  while (pipelinedCommands > except) {
    try {
      all.add(readProtocolWithCheckingBroken());  //根据pipelinedCommands值判断all里面需要多少跳数据
    } catch (JedisDataException e) {
      all.add(e);
    }
    pipelinedCommands--;
  }
  return all;
}

readProtocolWithCheckingBroken还是阻塞获取


private static Object process(final RedisInputStream is) {

  final byte b = is.readByte();  //阻塞获取值,判断第一个字节的类型,从而进入不同的处理
  if (b == PLUS_BYTE) {
    return processStatusCodeReply(is);
  } else if (b == DOLLAR_BYTE) {
    return processBulkReply(is);
  } else if (b == ASTERISK_BYTE) {
    return processMultiBulkReply(is);
  } else if (b == COLON_BYTE) {
    return processInteger(is);
  } else if (b == MINUS_BYTE) {
    processError(is);
    return null;
  } else {
    throw new JedisConnectionException("Unknown reply: " + (char) b);
  }
}

三. Transaction(事务)
1. 事务为提交之前出现异常,那么事务内的操作都不执行。这个很好理解,操作同pipeline,事务提交的时候才会flush数据同服务器交互。
2. 结合watch。事务开始前观察某个值,在事务期间如果其他的线程改了所观察的值,事务不执行!

@Test
public void testTransaction() {
    //要在事务执行之前进行监视
    jedis.watch("key100", "key101");
    Transaction multi = jedis.multi();

    multi.set("key100", "key100");
    multi.set("key101", "key101");

    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
    }

     multi.exec();

    String key6 = jedis.get("key100");
    System.out.println(key6);
    String key7 = jedis.get("key101");
    System.out.println(key7);

}

@Test
public void test() {
    jedis.set("key100", "value3");
    String key10 = jedis.get("key100");
    System.out.println(key10);
}

先执行testTransaction,再执行test,test的结果

value3

10s后testTransaction的结果

value3
null
  1. redis的事务不支持回滚。因为这种复杂的功能和redis追求的简单高效的设计主旨不符合,并且他认为,redis事务的执行时错误通常都是编程错误造成的,这种错误通常只会出现在开发环境中,而很少会在实际的生产环境中出现,所以他认为没有必要为redis开发事务回滚功能。

总结: jedis这个类库比较轻便,可以帮助我们已更加原汁原味的方式操作redis。首先客户端与服务端的交互,第一个核心点就是协议的约定。服务端通过协议解析客户端传递过来的数据包,客户端根据协议解析服务端的数据。第二个就是网络编程,比较轻便也通俗易懂,方便我们入手网络,这也是我想写的第二个原因。有太多的时候我们接触更多的上层业务,忽略了基础构建,仿佛学习武功,只学习招式,不学习内功,最终顶多算是个杂耍者,永远成不了高手。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • 本文将从Redis的基本特性入手,通过讲述Redis的数据结构和主要命令对Redis的基本能力进行直观介绍。之后概...
    kelgon阅读 61,154评论 23 625
  • 转载:Redis 宝典 | 基础、高级特性与性能调优 本文由 DevOpsDays 本文由简书作者kelgon供稿...
    meng_philip123阅读 3,125评论 1 34
  • 1.给罗辑思维投稿 拖延症那篇 从知识到行动那篇 修订之后投稿 2.有系列文章《我读《好好学习》》 关于能力圈的结...
    王立刚_Leon阅读 143评论 0 0
  • 台灯 你以为 你开了一盏灯 其实是 开启了寂寞 你以为 开了灯 就能感受到 光明 事实是 心里的黑暗 一直在
    月色袭人阅读 127评论 0 2