Redis pipeline 简介

前言

最近在学习Redis的时候看到Redis在执行命令的时候是单线程的。这意味如果客户端向服务器发送查询请求时,Redis客户端将以阻塞的方式等待服务器的响应。这样后面的客户端请求命令将不得不等待。

因此,当Redis客户端需要连续执行许多请求时,将有可能影响Redis的性能。

举个例子

客户:INCR X
服务器:1
客户:INCR X
服务器:2
客户:INCR X
服务器:3
客户:INCR X
服务器:4

Pipeline 简介

Pipe 是一种经典的设计思路。 他的主要思想为批量发送请求,批量处理请求。这样的话可以最大限度的减少网络的IO开销.
实际上,pipeline不仅可以减少网络开销,也可以减少系统调用的次数。我们知道,redis客户端写操作和读操作都会涉及 read()和write()系统调用,这意味着从用户域到内核域, 而上下文切换是巨大的速度损失。

image.png

Pipeline 代码举例

ShardedJedis shardedJedis = redisClient.borrowJedis();
        try {
            ShardedJedisPipeline pipelined = shardedJedis.pipelined();
            String key = getUserKey(userId);
            pipelined.hset(key, String.valueOf(userMissionId), Strings.EMPTY);
            pipelined.pexpire(key, EXPIRE_TIME);
            pipelined.sync();
        } finally {
            redisClient.returnJedis(shardedJedis);
 }
ShardedJedisPipeline pipelined = shardedJedis.pipelined();
 String key = getUserMissionAnswerCountKey(userMissionId);
pipelined.incr(key);
pipelined.pexpire(key, EXPIRE_TIME);
pipelined.sync();

pipeline 源码阅读

注: 下面仅代表个人阅读Redis pipeline源码的理解,不保证正确哦

pipelined()

看上去就是new了一个ShardedJedisPipeline对象,并没有做其他的事情

public ShardedJedisPipeline pipelined() {
    ShardedJedisPipeline pipeline = new ShardedJedisPipeline();
    pipeline.setShardedJedis(this);
    return pipeline;
}

pipelined.doxxx()

  • 主要是将cmd指令和指令参数写到outputStream输出流
  • 不同doXXX()都共享一个outputStream流。因此,Redis 客户端本地将维护一个outputStream流。用于写入所有的操作指令
  • 在connect()方法中对outputStream 和inputStream完成了初始化
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
protected Connection sendCommand(final Command cmd, final byte[]... args) {
    try {
      connect(); // 检查是否连接。如果已经连接则 do nothing
      Protocol.sendCommand(outputStream, cmd, args);  //将cmd指令和指令参数写到outputStream输出流
      pipelinedCommands++; //计数器+ 1 
      return this; 
    } catch (JedisConnectionException ex) {
          ...
}

sync()

  /**
   * Syncronize pipeline by reading all responses. This operation closes the pipeline. In order to
   * get return values from pipelined commands, capture the different Response<?> of the
   * commands you execute.
   */
  public void sync() {
    for (Client client : clients) {
      generateResponse(client.getOne());
    }
  }

其中generateResponse方法比较简单。

generateResponse

主要用来维护一个Response对象。并且填充Response对象 data字段的值。

//data:  可从下面的源码中看到,data 为Redis客户端与服务端之间连接的输入流
  protected Response<?> generateResponse(Object data) {
    Response<?> response = pipelinedResponses.poll();
    if (response != null) {
      response.set(data);
    }
    return response;
  }

client.getOne()

这个方法不知道为什么叫这个名字。 感觉getOne() 可能想表达的是一次性获取所有的执行结果?

  public Object getOne() {
    flush(); /// 一次性 将之前攒压的批量命令全部写入Redis server (以outputstream流的形式)
    pipelinedCommands--; //pipelinedCommands计数器 -1 
    return readProtocolWithCheckingBroken(); // 读取intputStream, 返回Redis服务器返回的结果,并赋值给Response的data属性。
  }

syncAndReturnAll

  /**
   * Syncronize pipeline by reading all responses. This operation closes the pipeline. Whenever
   * possible try to avoid using this version and use ShardedJedisPipeline.sync() as it won't go
   * through all the responses and generate the right response type (usually it is a waste of time).
   * @return A list of all the responses in the order you executed them.
   */
  public List<Object> syncAndReturnAll() {
    List<Object> formatted = new ArrayList<Object>();
    for (Client client : clients) {
      formatted.add(generateResponse(client.getOne()).get());
    }
    return formatted;
  }

syncAndReturnAll 方法和 sync方法作用类似,区别是syncAndReturnAll使用了client.getOne().get()

这个命令使得syncAndReturnAll命令自动返回了序列化的结果
get()方法主要执行BuildFactory的build方法。该方法会根据传入的泛型执行对应的序列化。 具体执行的泛型T根据接受参数类型决定:

  public static final Builder<String> STRING = new Builder<String>() {
    public String build(Object data) {
     //注意这里的encode是Redis自己写的编码方法,其实底层实现是对二进制进行decode
      return data == null ? null : SafeEncoder.encode((byte[]) data);
    }

    public String toString() {
      return "string";
    }

总结1

  • pipeline doXXX 会将本地命令批量写入outputStream
  • sync() 方法 会批量将redis命令发送到服务器,执行命令,并以inputStream二进制流的形式返回给Response 的data字段。
    • 如果需要获取Response的值,可以使用Response.get()获取
  • syncAndReturnAll 方法同样会将redis命令批量发送到服务器。同时会将redis服务器返回的结果序列化。注意,如果不需要返回结果的化,最好不要执行这个命令。而用上面的那个命令

总结2

原生批量命令与Pipeline 对比
可以使用Pipeline模拟出批量操作的效果,但是在使用时要注意它与原生批量命令的区别,主要包括如下几点:

  • 原生批量命令是原子的,Pipeline是非原子的
  • 原生批量命令是一个命令对应多个keyPipeline支持多个命令
  • 原生批量命令是Redis服务端支持实现的,而Pipeline需要服务端和客户端的共同配合
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容