前言
最近在学习Redis的时候看到Redis在执行命令的时候是单线程的。这意味如果客户端向服务器发送查询请求时,Redis客户端将以阻塞的方式等待服务器的响应。这样后面的客户端请求命令将不得不等待。
因此,当Redis客户端需要连续执行许多请求时,将有可能影响Redis的性能。
举个例子
客户:INCR X
服务器:1
客户:INCR X
服务器:2
客户:INCR X
服务器:3
客户:INCR X
服务器:4
Pipeline 简介
Pipe 是一种经典的设计思路。 他的主要思想为批量发送请求,批量处理请求。这样的话可以最大限度的减少网络的IO
开销.
实际上,pipeline不仅可以减少网络开销,也可以减少系统调用的次数。我们知道,redis客户端写操作和读操作都会涉及 read()和write()系统调用,这意味着从用户域到内核域, 而上下文切换是巨大的速度损失。
Pipeline 代码举例
ShardedJedis shardedJedis = redisClient.borrowJedis();
try {
ShardedJedisPipeline pipelined = shardedJedis.pipelined();
String key = getUserKey(userId);
pipelined.hset(key, String.valueOf(userMissionId), Strings.EMPTY);
pipelined.pexpire(key, EXPIRE_TIME);
pipelined.sync();
} finally {
redisClient.returnJedis(shardedJedis);
}
ShardedJedisPipeline pipelined = shardedJedis.pipelined();
String key = getUserMissionAnswerCountKey(userMissionId);
pipelined.incr(key);
pipelined.pexpire(key, EXPIRE_TIME);
pipelined.sync();
pipeline 源码阅读
注: 下面仅代表个人阅读Redis pipeline源码的理解,不保证正确哦
pipelined()
看上去就是new
了一个ShardedJedisPipeline
对象,并没有做其他的事情
public ShardedJedisPipeline pipelined() {
ShardedJedisPipeline pipeline = new ShardedJedisPipeline();
pipeline.setShardedJedis(this);
return pipeline;
}
pipelined.doxxx()
- 主要是将cmd指令和指令参数写到outputStream输出流
- 不同doXXX()都共享一个outputStream流。因此,Redis 客户端本地将维护一个outputStream流。用于写入所有的操作指令
- 在connect()方法中对outputStream 和inputStream完成了初始化
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
protected Connection sendCommand(final Command cmd, final byte[]... args) {
try {
connect(); // 检查是否连接。如果已经连接则 do nothing
Protocol.sendCommand(outputStream, cmd, args); //将cmd指令和指令参数写到outputStream输出流
pipelinedCommands++; //计数器+ 1
return this;
} catch (JedisConnectionException ex) {
...
}
sync()
/**
* Syncronize pipeline by reading all responses. This operation closes the pipeline. In order to
* get return values from pipelined commands, capture the different Response<?> of the
* commands you execute.
*/
public void sync() {
for (Client client : clients) {
generateResponse(client.getOne());
}
}
其中generateResponse方法比较简单。
generateResponse
主要用来维护一个Response对象。并且填充Response对象 data字段的值。
//data: 可从下面的源码中看到,data 为Redis客户端与服务端之间连接的输入流
protected Response<?> generateResponse(Object data) {
Response<?> response = pipelinedResponses.poll();
if (response != null) {
response.set(data);
}
return response;
}
client.getOne()
这个方法不知道为什么叫这个名字。 感觉getOne()
可能想表达的是一次性获取所有的执行结果?
public Object getOne() {
flush(); /// 一次性 将之前攒压的批量命令全部写入Redis server (以outputstream流的形式)
pipelinedCommands--; //pipelinedCommands计数器 -1
return readProtocolWithCheckingBroken(); // 读取intputStream, 返回Redis服务器返回的结果,并赋值给Response的data属性。
}
syncAndReturnAll
/**
* Syncronize pipeline by reading all responses. This operation closes the pipeline. Whenever
* possible try to avoid using this version and use ShardedJedisPipeline.sync() as it won't go
* through all the responses and generate the right response type (usually it is a waste of time).
* @return A list of all the responses in the order you executed them.
*/
public List<Object> syncAndReturnAll() {
List<Object> formatted = new ArrayList<Object>();
for (Client client : clients) {
formatted.add(generateResponse(client.getOne()).get());
}
return formatted;
}
syncAndReturnAll
方法和 sync
方法作用类似,区别是syncAndReturnAll
使用了client.getOne().get()
这个命令使得syncAndReturnAll命令自动返回了序列化的结果
get()
方法主要执行BuildFactory的build方法。该方法会根据传入的泛型执行对应的序列化。 具体执行的泛型T根据接受参数类型决定:
public static final Builder<String> STRING = new Builder<String>() {
public String build(Object data) {
//注意这里的encode是Redis自己写的编码方法,其实底层实现是对二进制进行decode
return data == null ? null : SafeEncoder.encode((byte[]) data);
}
public String toString() {
return "string";
}
总结1
-
pipeline
doXXX 会将本地命令批量写入outputStream -
sync()
方法 会批量将redis命令发送到服务器,执行命令,并以inputStream二进制流的形式返回给Response 的data字段。- 如果需要获取Response的值,可以使用
Response.get()
获取
- 如果需要获取Response的值,可以使用
-
syncAndReturnAll
方法同样会将redis命令批量发送到服务器。同时会将redis服务器返回的结果序列化。注意,如果不需要返回结果的化,最好不要执行这个命令。而用上面的那个命令
总结2
原生批量命令与Pipeline 对比
可以使用Pipeline
模拟出批量操作的效果,但是在使用时要注意它与原生批量命令的区别,主要包括如下几点:
- 原生批量命令是原子的,
Pipeline
是非原子的 - 原生批量命令是一个命令对应多个
key
,Pipeline
支持多个命令 - 原生批量命令是
Redis
服务端支持实现的,而Pipeline
需要服务端和客户端的共同配合