以单节点为例,看下Redisson是怎么实现到Redis服务器的连接的
Redisson 单节点模式
Redisson中支持Redis的多种模式,其中单节点模式是作为一种特殊的主从模式实现的
public class SingleConnectionManager extends MasterSlaveConnectionManager {
public SingleConnectionManager(SingleServerConfig cfg, ServiceManager serviceManager) {
super(create(cfg), serviceManager);
}
}
所以单节点的配置是直接继承主从模式的配置的,不一样的地方是读和订阅都会连接到主节点
newconfig.setReadMode(ReadMode.MASTER);
newconfig.setSubscriptionMode(SubscriptionMode.MASTER);
读写连接的初始化
Redisson中初始化到Redis服务器的连接时,会分别初始化读写连接和发布订阅连接,我们先看下读写连接是怎么初始化的
private CompletableFuture<RedisClient> setupMasterEntry(RedisClient client) {
CompletableFuture<InetSocketAddress> addrFuture = client.resolveAddr();
return addrFuture.thenCompose(res -> {
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
idleConnectionWatcher,
NodeType.MASTER,
config);
List<CompletableFuture<Void>> futures = new ArrayList<>();
CompletableFuture<Void> writeFuture = writeConnectionPool.initConnections(masterEntry);
futures.add(writeFuture);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}).whenComplete((r, e) -> {
if (e != null) {
client.shutdownAsync();
}
}).thenApply(r -> {
writeConnectionPool.addEntry(masterEntry);
return client;
});
}
再Redisson的源码中大量使用了CompletableFuture,所以读起来会相对晦涩一点,但是基本上都是异步调用、然后注册回调的模式
这里初始化连接的时候就是这样
先解析Redis的服务地址
解析成功后开始初始化读写连接
创建读写连接
与创建读写链接比较相关的几个配置:
masterConnectionMinimumIdleSize:最小空闲连接数量,默认值24
masterConnectionPoolSize:最大连接数量,默认值64
private void createConnection(ClientConnectionsEntry entry,
CompletableFuture<Void> initPromise, int minimumIdleSize, AtomicInteger initializedConnections) {
CompletableFuture<Void> f = acquireConnection(entry, null);
f.thenAccept(r -> {
CompletableFuture<T> promise = new CompletableFuture<T>();
createConnection(entry, promise);
promise.whenComplete((conn, e) -> {
// 正常创建连接
if (e == null) {
if (changeUsage()) {
conn.decUsage();
}
// 如果初始化还没完成,将刚刚创建的连接缓存起来
// 如果初始化已经完成了,说明刚刚创建的连接是多余的了,直接关闭连接
if (!initPromise.isDone()) {
entry.addConnection(conn);
} else {
conn.closeAsync();
}
}
// 连接创建成功了,释放最大连接数量,让后续的创建连接请求能够继续创建连接
releaseConnection(entry);
// 创建连接异常了
if (e != null) {
if (initPromise.isDone()) {
return;
}
// 关闭所有缓存的连接
for (RedisConnection connection : entry.getAllConnections()) {
if (!connection.isClosed()) {
connection.closeAsync();
}
}
entry.getAllConnections().clear();
// 异常初始化完成
initPromise.completeExceptionally(cause);
return;
}
// 创建的连接数是否达到了最小空闲连接数量
// 已经达到最小连接数量了,则初始化正常完成
// 还没达到最小连接数量, 则递归继续创建连接
int value = initializedConnections.decrementAndGet();
if (value == 0) {
if (initPromise.complete(null)) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
}
} else if (value > 0 && !initPromise.isDone()) {
createConnection(entry, initPromise, minimumIdleSize, initializedConnections);
}
});
});
}
这个是创建连接比较重要的一个方法,我们分别看下
- 尝试获取连接:
acquireConnection
会首先尝试获取连接,这里获取连接的含义是判断当前的连接数量是否达到了最大连接数量,如果没达到最大连接数了,这里的CompletableFuture是可以顺利完成的,如果已经达到了最大连接数量,那么这里的CompletableFuture是无法顺利完成的,会一直保存再listeners这个链表中
- 尝试获取连接:
public CompletableFuture<Void> acquire() {
CompletableFuture<Void> future = new CompletableFuture<>();
listeners.add(future);
tryRun();
return future;
}
private void tryRun() {
while (true) {
if (counter.decrementAndGet() >= 0) {
CompletableFuture<Void> future = listeners.poll();
if (future == null) {
counter.incrementAndGet();
return;
}
if (future.complete(null)) {
return;
}
}
if (counter.incrementAndGet() <= 0) {
return;
}
}
}
这里可以学习下这种用法,这里通过AtomicInteger
维护了最大连接数量,尝试获取连接时会先创建一个CompletableFuture,并加入到ConcurrentLinkedQueue这个链表中
tryRun的时候会先尝试减少最大连接数量,减少后仍大于等于0,则正常完成刚刚创建的CompletableFuture
如果减少后小于0了,说明不能创建更多连接了,则撤回本次减少后,一直保存再链表中,等待有连接释放后唤醒它
-
- 获取连接成功后才会开始创建连接,创建连接成功后
如果正常创建连接,如果初始化CompletableFuture还没完成,将刚刚创建的连接缓存起来,如果初始化已经完成了,说明刚刚创建的连接是多余的了,直接关闭连接
如果创建连接异常,则关闭所有缓存的连接,初始化CompletableFuture异常完成
最后判断创建的连接数是否达到了最小空闲连接数量,已经达到最小连接数量了,则初始化CompletableFuture正常完成,还没达到最小连接数量, 则递归继续创建连接
- 创建连接
private void createConnection(ClientConnectionsEntry entry, CompletableFuture<T> promise) {
CompletionStage<T> connFuture = connect(entry);
connFuture.whenComplete((conn, e) -> {
if (e != null) {
releaseConnection(entry);
promiseFailure(entry, promise, e);
return;
}
if (changeUsage()) {
promise.thenApply(c -> c.incUsage());
}
connectedSuccessful(entry, promise, conn);
});
}
public CompletionStage<RedisConnection> connect() {
CompletionStage<RedisConnection> future = client.connectAsync();
return future.whenComplete((conn, e) -> {
if (e != null) {
return;
}
allConnections.add(conn);
});
}
public RFuture<RedisConnection> connectAsync() {
// 解析处Redis服务器的地址
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> r = new CompletableFuture<>();
// 通过Netty客户端发送异步连接
ChannelFuture channelFuture = bootstrap.connect(res);
// 注册一个ChannelFutureListener,在连接建立后会进行回调
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (bootstrap.config().group().isShuttingDown()) {
RedisConnectionException cause = new RedisConnectionException("RedisClient is shutdown");
r.completeExceptionally(cause);
return;
}
if (future.isSuccess()) {
RedisConnection c = RedisConnection.getFrom(future.channel());
c.getConnectionPromise().whenComplete((res, e) -> {
bootstrap.config().group().execute(new Runnable() {
@Override
public void run() {
if (e == null) {
if (!r.complete(c)) {
c.closeAsync();
} else {
if (config.getConnectedListener() != null) {
config.getConnectedListener().accept(getAddr());
}
}
} else {
r.completeExceptionally(e);
c.closeAsync();
}
}
});
});
} else {
bootstrap.config().group().execute(new Runnable() {
public void run() {
r.completeExceptionally(future.cause());
}
});
}
}
});
return r;
});
return new CompletableFutureWrapper<>(f);
}
这里实际的创建链接过程
- 首先第一个方法注册的回调是:创建连接完成后,如果创建异常了,则连接CompletableFuture异常完成,并释放一个连接数量;如果没有异常,则正常完成连接CompletableFuture
public void release() {
counter.incrementAndGet();
tryRun();
}
释放连接数量就是对之前的AtomicInteger进行自增,然后再tryRun中尝试唤醒一个仍然链表中的CompletableFuture进行完成,让这个CompletableFuture完成然后继续它的创建连接过程
- 第二个方法注册的回调是:正常创建连接完成后,将连接加入到缓存中
- 第三个方法是实际的创建连接,注意这里也是异步创建连接,因此注册了一个ChannelFutureListener,在连接建立后进行回调,回调内容也是根据连接是否有异常对CompletableFuture进行完成
Netty客户端
上面的创建链接最终是通过Netty客户端来完成的,Netty客户端的Bootstrap在初始化的时候会创建
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap()
.resolver(config.getResolverGroup())
.channel(config.getSocketChannelClass())
.group(config.getGroup());
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
applyChannelOptions(config, bootstrap);
config.getNettyHook().afterBoostrapInitialization(bootstrap);
return bootstrap;
}
这里主要的处理器是RedisChannelInitializer
,我们重点看下这个
@Override
protected void initChannel(Channel ch) throws Exception {
initSsl(config, ch);
if (type == Type.PLAIN) {
ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
}
ch.pipeline().addLast(
connectionWatchdog,
new CommandEncoder(config.getCommandMapper()),
CommandBatchEncoder.INSTANCE);
if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandsQueue());
}
if (pingConnectionHandler != null) {
ch.pipeline().addLast(pingConnectionHandler);
}
if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));
}
ch.pipeline().addLast(new ErrorsLoggingHandler());
}
这里首先是初始化SSL,我们先不考虑SSL的连接
然后Netty客户端中InboundHandler和OutboundHandler的处理流程
- 首先会经过
RedisConnectionHandler
这个处理器
- 首先会经过
public void channelActive(ChannelHandlerContext ctx) {
List<CompletableFuture<Object>> futures = new ArrayList<>(5);
InetSocketAddress addr = redisClient.resolveAddr().getNow(null);
RedisClientConfig config = redisClient.getConfig();
CompletionStage<Object> f = config.getCredentialsResolver().resolve(addr)
.thenCompose(credentials -> {
String password = Objects.toString(config.getAddress().getPassword(),
Objects.toString(credentials.getPassword(), config.getPassword()));
if (password != null) {
future = connection.async(RedisCommands.AUTH, password);
return future;
}
return CompletableFuture.completedFuture(null);
});
futures.add(f.toCompletableFuture());
if (config.getPingConnectionInterval() > 0) {
CompletionStage<Object> future = connection.async(RedisCommands.PING);
futures.add(future.toCompletableFuture());
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete((res, e) -> {
if (e != null) {
...
connection.closeAsync();
connectionPromise.completeExceptionally(e);
return;
}
ctx.fireChannelActive();
connectionPromise.complete(connection);
});
}
这个处理器在连接建立后(channelActive),会根据我们知否配置了密码,来发送Redis命令AUTH
到服务器,同时根据是否配置了心跳检测,来发送Redis命令PING
,发送命令也是异步的
然后等待这两个命令CompletableFuture完成,如果正常完成,则对创建连接的CompletableFuture进行完成,返回的就是这个连接
如果CompletableFuture异常完成,则对创建连接的CompletableFuture进行异常完成,返回的是异常信息
- 然后是
ConnectionWatchdog
和PingConnectionHandler
这两个处理器,分别对应的是连接重连和PING处理器,暂时忽略
- 然后是
- 由于OutboundHandler的处理流程和添加过程是相反的,所以会先经过
CommandsQueue
这个处理器
- 由于OutboundHandler的处理流程和添加过程是相反的,所以会先经过
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
QueueCommandHolder holder = new QueueCommandHolder(data, promise);
Queue<QueueCommandHolder> queue = ctx.channel().attr(COMMANDS_QUEUE).get();
while (true) {
if (lock.compareAndSet(false, true)) {
try {
queue.add(holder);
try {
holder.getChannelPromise().addListener(future -> {
if (!future.isSuccess()) {
queue.remove(holder);
}
});
ctx.writeAndFlush(data, holder.getChannelPromise());
} catch (Exception e) {
queue.remove(holder);
throw e;
}
} finally {
lock.set(false);
}
break;
}
}
} else {
super.write(ctx, msg, promise);
}
}
这个处理器维护了一个双向队列,发送的Redis命令都会先进行入队操作,然后再发送这个命令到后面的处理器
可以学习下这里的写法,通过AtomicBoolean变量来确保并发时命令的顺序,只有先CAS操作成功的命令才会先入队
- 然后达到CommandBatchEncoder,它处理的对象是批量命令CommandsData,单独发送的命令类型的CommandData,所以这个处理器处理单个命令时不会生效,直接交给下个处理器
- 然后到达
CommandEncoder
,它处理的对象就是单个命令对象CommandData,它主要功能是对命令按照redis协议进行编码
- 然后到达
protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
try {
out.writeByte("*");
int len = 1 + msg.getParams().length;
if (msg.getCommand().getSubName() != null) {
len++;
}
out.writeBytes(longToString(len));
out.writeBytes(CRLF);
String name = commandMapper.map(msg.getCommand().getName());
writeArgument(out, name.getBytes(CharsetUtil.UTF_8));
if (msg.getCommand().getSubName() != null) {
writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
}
for (Object param : msg.getParams()) {
ByteBuf buf = encode(param);
writeArgument(out, buf);
if (!(param instanceof ByteBuf)) {
buf.release();
}
}
} catch (Exception e) {
msg.tryFailure(e);
throw e;
}
}
可以看到这里都是按照redis协议的数组进行编码
首先写入的是
*
然后写入的是数组长度,数组长度应该是命令占一个、子命令占一个、每个参数占一个
然后写入换行符
\r\n
然后每个命令或者参数按照redis协议多行字符串写入
private void writeArgument(ByteBuf out, byte[] arg) {
out.writeByte("$");
out.writeBytes(longToString(arg.length));
out.writeBytes(CRLF);
out.writeBytes(arg);
out.writeBytes(CRLF);
}
首先写入
$
然后写入字符串长度
然后写入换行符
\r\n
,然后写入字符串内容,最后再写入一个换行符\r\n
比如AUTH命令的格式就是:
*2\r\n
$4\r\nAUTH\r\n
$6\r\n123456\r\n
PING命令的格式就是:
*1\r\n
$4\r\nPING\r\n
- 最后收到响应后,到达
CommandDecoder
这个处理器,这里主要就是对redis的响应数据进行解码,可以看到首先会读取CommandsQueue
处理器中的双向队列中的队首命令(不是出队)
- 最后收到响应后,到达
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
QueueCommandHolder holder = getCommand(ctx);
QueueCommand data = null;
if (holder != null) {
data = holder.getCommand();
}
if (state() == null) {
state(new State());
}
if (data == null) {
while (in.writerIndex() > in.readerIndex()) {
int endIndex = skipCommand(in);
try {
decode(ctx, in, null, 0);
} catch (Exception e) {
in.readerIndex(endIndex);
throw e;
}
}
} else {
if (holder.getChannelPromise().isDone() && !holder.getChannelPromise().isSuccess()) {
sendNext(ctx.channel());
// throw REPLAY error
in.indexOf(Integer.MAX_VALUE/2, Integer.MAX_VALUE, (byte) 0);
return;
}
int endIndex = 0;
if (!(data instanceof CommandsData)) {
endIndex = skipCommand(in);
} else {
endIndex = skipBatchCommand(in, (CommandsData) data);
}
if (data.isExecuted()) {
in.readerIndex(endIndex);
sendNext(ctx.channel());
return;
}
decode(ctx, in, data, endIndex);
}
}
这里说明下几个方法有利于理解解码过程
首先是这个skipCommand方法,它主要功能是找到不同命令的响应结果之间的分隔下标,因为不同命令都是先加入到队列中,然后一起发出去的,所以同一通道不同命令的响应结果可能是一起解析的
protected int skipCommand(ByteBuf in) throws Exception {
in.markReaderIndex();
skipDecode(in);
int res = in.readerIndex();
in.resetReaderIndex();
return res;
}
protected void skipDecode(ByteBuf in) throws IOException{
int code = in.readByte();
if (code == '+') {
skipString(in);
} else if (code == '-') {
skipString(in);
} else if (code == ':') {
skipString(in);
} else if (code == '$') {
skipBytes(in);
} else if (code == '*') {
long size = readLong(in);
for (int i = 0; i < size; i++) {
skipDecode(in);
}
}
}
private void skipString(ByteBuf in) {
int len = in.bytesBefore((byte) '\r');
in.skipBytes(len + 2);
}
这里只是为了找到不同命令响应数据之间的分隔下标,所以会通过markReaderIndex先把读下标保存起来,然后找到分隔下标后,再通过resetReaderIndex重置读下标,一遍再后续的解码中仍然能够处理到当前命令的响应结果
然后redis协议对响应数据的规范,单行字符串响应以+
开头、错误响应以-
开头、整数响应以:
开头、多行字符串响应以$
开头、数组响应以*
开头
最后skipString就是读到\r\n
换行符,这中间的长度就是第一个命令的完整响应长度了
记录这个长度是为了第一个命令处理响应结果处理异常时,方便处理第二个命令的响应结果
try {
decode(in, cmd, null, channel, false, null);
sendNext(channel, data);
} catch (Exception e) {
// 上一个命令解码异常,直接设置读下标为分隔下标
in.readerIndex(endIndex);
sendNext(channel);
cmd.tryFailure(e);
throw e;
}
protected void sendNext(Channel channel) {
Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
queue.poll();
state(null);
}
再解码的时候,如果上一个命令解码异常,直接设置读下标为分隔下标,然后进行出队操作移除队首的上一个命令(这里才是出队)
这样再解码下一个命令的响应结果时,就可以直接从下一个命令的响应结果开始处理了
最后命令的响应数据的解码操作就是重新读取这个命令的响应结果,这主要是因为在找分隔下标的时候通过暂存读下标和重置读下标来完成
然后按照第一个字符区别处理即可
如单行字符串,就是先读取第一个字节,然后一直读取到换行符\r\n
得到结果,然后通过命令结果的转换器对结果进行转换,最后使用这个结果完成通道CompletableFuture
int code = in.readByte();
if (code == '+') {
int len = in.bytesBefore((byte) '\r');
String result = in.toString(in.readerIndex(), len, CharsetUtil.UTF_8);
in.skipBytes(len + 2);
if (data != null && !skipConvertor) {
result = data.getCommand().getConvertor().convert(result);
}
if (parts != null) {
parts.add(result);
} else {
if (data != null) {
data.getPromise().complete(result);
}
}
}
如AUTH命令的响应结果:
+OK\r\n
再如AUTH和PING再同一通道一起发送时的响应结果,这时候找到的分隔下标就是6
+OK\r\n+PONG\r\n
至此,才完成了读写连接的创建,创建的连接会缓存下来,并且会同时缓存最小空闲连接数量(24)个连接
发布订阅连接的初始化
发布订阅连接的初始化过程和读写连接的初始化过程很多地方都是一致的,只在一些地方通过子类进行了重写
与发布订阅连接比较相关的几个配置:
subscriptionConnectionMinimumIdleSize:最小空闲连接数量,默认值1
subscriptionConnectionPoolSize:最大连接数量,默认值50
在创建发布订阅连接时也会遵循这个配置
在创建连接时,除了类型不一样,其它的都是一样的
bootstrap = createBootstrap(copy, Type.PLAIN);
pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
对于类型Type.PUBSUB
,处理器会有些不同
@Override
protected void initChannel(Channel ch) throws Exception {
initSsl(config, ch);
else {
ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
}
ch.pipeline().addLast(
connectionWatchdog,
new CommandEncoder(config.getCommandMapper()),
CommandBatchEncoder.INSTANCE);
else {
ch.pipeline().addLast(new CommandsQueuePubSub());
}
if (pingConnectionHandler != null) {
ch.pipeline().addLast(pingConnectionHandler);
}
else {
ch.pipeline().addLast(new CommandPubSubDecoder(config));
}
ch.pipeline().addLast(new ErrorsLoggingHandler());
}
- 首先是RedisPubSubConnectionHandler这个处理器,它的处理逻辑和
RedisConnectionHandler
是一致的,他们都是BaseConnectionHandler
的子类,因此也会发送AUTH命令和PING命令;不同的是这个处理器创建的是RedisPubSubConnection
RedisPubSubConnection createConnection(ChannelHandlerContext ctx) {
return new RedisPubSubConnection(redisClient, ctx.channel(), connectionPromise);
}
- 然后是CommandsQueuePubSub这个处理器,它的处理逻辑相对比较绕一点,在发送命令时首先会经过这个处理器
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
QueueCommandHolder holder = queue.peek();
if (holder != null && holder.getCommand() == data) {
super.write(ctx, msg, promise);
} else {
queue.add(new QueueCommandHolder(data, promise));
sendData(ctx.channel());
}
} else {
super.write(ctx, msg, promise);
}
}
private void sendData(Channel ch) {
QueueCommandHolder holder = queue.peek();
if (holder != null && holder.trySend()) {
QueueCommand data = holder.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) {
for (Object channel : cd.getParams()) {
ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);
}
}
} else {
ch.attr(CURRENT_COMMAND).set(holder);
}
holder.getChannelPromise().addListener(listener);
ch.writeAndFlush(data, holder.getChannelPromise());
}
}
这里也同时维护了一个链表队列和一个通道属性,发送命令时
- 如果队列为空,则会把当前命令加入到队列中
- 如果是SUBSCRIBE订阅命令,那么它的参数肯定都是频道名称,会把这些频道名称都加入到解码器中、以便后续对响应数据解码
- 如果是其它的非订阅命令,则直接将命令设置到通道属性里
看到这里可能有点懵,但是先别急,再看下解码的处理器就能理解了
-
最后是CommandPubSubDecoder这个处理器,它负责响应数据的解码,他是
CommandDecoder
的子类,也就是它也遵循普通命令的解码逻辑,只不过对部分逻辑进行了重写- 首先是获取当前命令,对于普通的非订阅命令,它直接从通道属性中获取,这对应上面编码时候会将非订阅的普通命令直接设置到通道属性里;如果是订阅命令,那这里获取到的就是null了
@Override protected QueueCommandHolder getCommand(ChannelHandlerContext ctx) { return ctx.channel().attr(CommandsQueuePubSub.CURRENT_COMMAND).get(); }
-
然后是解码方法也重写了,这里的data就是上面获取到的当前命令,这里的endIndex就是不同命令响应结果之间的分隔下标,计算方法在
CommandDecoder
中已经详解过了;这里data为null对应的就是订阅命令了,其它情况就是普通的命令了
@Override protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data, int endIndex) throws Exception { if (data == null) { try { while (in.writerIndex() > in.readerIndex()) { decode(in, null, null, channel, false, null); } sendNext(channel); } catch (Exception e) { log.error("Unable to decode data. channel: {}, reply: {}", channel, LogHelper.toString(in), e); sendNext(channel); throw e; } } else if (data instanceof CommandData) { CommandData<Object, Object> cmd = (CommandData<Object, Object>) data; try { while (in.writerIndex() > in.readerIndex()) { decode(in, cmd, null, channel, false, null); } sendNext(channel, data); } catch (Exception e) { log.error("Unable to decode data. channel: {}, reply: {}", channel, LogHelper.toString(in), e); cmd.tryFailure(e); sendNext(channel); throw e; } } }
- 然后是触发解码下一个命令的响应数据,这里又回到了CommandsQueuePubSub编码流程里,会先把通道属性清空、然后队首出队,然后再通过sendData来发送下一个命令;可以看到这个编码流程和之前普通命令里的编码流程不一样了,普通命令里的编码流程是一起入队,然后一起发送,解码的时候再一起解析;而这里的流程是一起入队,然后发送队首,队首响应后进行解码,然后再发送下一个命令
public void sendNextCommand(Channel channel) { QueueCommandHolder holder = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).getAndSet(null); if (holder != null) { queue.poll(); } else { QueueCommandHolder c = queue.peek(); if (c != null) { QueueCommand data = c.getCommand(); List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations(); if (!pubSubOps.isEmpty()) { queue.poll(); } } } sendData(channel); }
最后再看下订阅响应数据的解码过程,需要注意的是这里的命令data是为null的;SUBSCRIBE命令的响应一般情况下是长这样
*3\r\n
$9\r\nsubscribe\r\n
$7\r\nmyTopic\r\n
:1\r\n
是一个长度为3的数组,第一个元素是多行字符串,订阅成功时固定为subscribe
;第二个也是多行字符串,为订阅的频道名称;第三个是整数,为订阅的频道数量
因此再解码的时候会首先解析code=*
,然后再递归解析每一个数组元素,对应的就会解析到code=$
和code=:
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor, List<CommandData<?, ?>> commandsData) throws IOException {
int code = in.readByte();
} else if (code == ':') {
Long result = readLong(in);
handleResult(data, parts, result, false);
} else if (code == '$') {
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
Decoder<Object> decoder = selectDecoder(data, parts);
result = decoder.decode(buf, state());
}
handleResult(data, parts, result, false);
} else if (code == '*') {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>(Math.max((int) size, 0));
state().incLevel();
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel, skipConvertor, null);
}
state().decLevel();
}
}
通过上面的解析之后,解析到的数据都会先保存在respParts
这个数组中,然后还需要获取订阅命令的解码器,这时会遇到一个问题是由于当前命令data为null,无法知道它的解码器是什么;但是在CommandsQueuePubSub
编码的时候,已经将订阅的频道名称和订阅命令之间的映射关系保存在CommandPubSubDecoder
处理器中,而频道的名称现在已经解析到了,就可以通过频道名称获取到当前命令了
@Override
protected MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts) {
if (parts.isEmpty() || parts.get(0) == null) {
return null;
}
// 响应的第一个元素在订阅成功时固定为subscribe
String command = parts.get(0).toString();
if ("subscribe".contains(command)) {
// 响应的第二个元素是频道名称
ChannelName channelName = new ChannelName((byte[]) parts.get(1));
PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = commands.get(key);
if (commandData == null) {
return null;
}
return commandData.getCommand().getReplayMultiDecoder();
}
}
对于SUBSCRIBE命令,它的解码器就是PubSubStatusDecoder
,可以看到就是通过第一个元素和第二个元素构建一个PubSubStatusMessage对象
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
public class PubSubStatusDecoder implements MultiDecoder<Object> {
@Override
public PubSubStatusMessage decode(List<Object> parts, State state) {
PubSubType type = PubSubType.valueOf(parts.get(0).toString().toUpperCase());
ChannelName name = new ChannelName((byte[]) parts.get(1));
return new PubSubStatusMessage(type, name);
}
}
最后再对这个结果进行分发,PubSubStatusMessage
是实现了Message
接口的
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
Object result) throws IOException {
if (result instanceof Message) {
checkpoint();
// 先把频道名称、当前命令信息保存到entries这个Map里
RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
ChannelName channelName = ((Message) result).getChannel();
if (result instanceof PubSubStatusMessage) {
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = commands.get(key);
if ("SUBSCRIBE".contains(d.getCommand().getName())) {
commands.remove(key);
entries.put(channelName, new PubSubEntry(d.getMessageDecoder()));
}
}
// 发布订阅的消息是否需要按顺序发送,默认是true
if (config.isKeepPubSubOrder()) {
PubSubEntry entry = entries.get(channelName);
if (entry != null) {
enqueueMessage(result, pubSubConnection, entry);
}
} else {
config.getExecutor().execute(new Runnable() {
@Override
public void run() {
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else if (result instanceof PubSubPatternMessage) {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
}
});
}
} else {
if (data != null && data.getCommand().getName().equals("PING")) {
super.decodeResult(data, parts, channel, result);
}
}
}
这里的处理流程
先将结果放到一个Map里
然后判断消息是否需要按顺序接收,默认的true
-
如果不需要按顺序接收,那么直接使用新线程消费这个消息,消费这个消息,就是把这个消息投递到我们注册的监听器里
public void onMessage(PubSubStatusMessage message) { for (RedisPubSubListener<Object> redisPubSubListener : listeners) { redisPubSubListener.onStatus(message.getType(), message.getChannel()); } }
-
如果需要按顺序消费,那么会先把这个消息放到队列里,然后启动新线程从队列中依次读取消息然后消费;这里通过CAS操作设置原子变量的方式,确保只会启动一个线程读取消息
private void enqueueMessage(Object res, RedisPubSubConnection pubSubConnection, PubSubEntry entry) { if (res != null) { entry.getQueue().add((Message) res); } if (!entry.getSent().compareAndSet(false, true)) { return; } config.getExecutor().execute(() -> { try { while (true) { Message result = entry.getQueue().poll(); if (result != null) { if (result instanceof PubSubStatusMessage) { pubSubConnection.onMessage((PubSubStatusMessage) result); } else if (result instanceof PubSubMessage) { pubSubConnection.onMessage((PubSubMessage) result); } else if (result instanceof PubSubPatternMessage) { pubSubConnection.onMessage((PubSubPatternMessage) result); } } else { break; } } } finally { entry.getSent().set(false); if (!entry.getQueue().isEmpty()) { enqueueMessage(null, pubSubConnection, entry); } } }); }
至此就完成了Redisson客户端的初始化流程
源码阅读最大的感受是Redisson中基本看不到同步锁操作(也有),大量使用无锁CAS思维来解决并发问题,值得借鉴学习
另外大量使用CompletableFuture,这种异步编程思路也值得借鉴学习