Redisson 中的 CommandExecutor

上篇Redisson 分布式锁实现分析中提到了RedissonLock中的redis命令都是通过CommandExecutor来发送到redis服务执行的,本篇就来了解一下它的实现方式。

先来看其源码

public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {
}

可以看到它同时继承了 同步和异步(sync/async) 两种调用方式。

Note:

  • 在分布式锁的实现中是用了同步的 CommandExecutor,是因为锁的获取和释放是有强一致性要求的,需要实时知道结果方可进行下一步操作。
  • 上篇分布式锁分析中我提到 Redisson 的同步实现实际上是基于异步实现的,这在下文中也会得到解释。

在Redisson中,除了提供同步和异步的方式执行命令之外,还通过 Reactive Streams 实现了 Reactive 方式的命令执行器。见下图

预备知识

Redisson 大量使用了 Redis 的 EVAL 命令来执行 Lua 脚本,所以先要对 EVAL 有所了解。

EVAL命令格式和示例

EVAL script numkeys key [key ...] arg [arg ...]
> eval "return redis.call('set',KEYS[1],ARGV[1])" 1 foo bar
OK

从 Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。

参数的说明本文不再详述,可查阅 Redis命令参考

重点是这个:Redis 使用单个 Lua 解释器去运行所有脚本,并且 Redis 也保证脚本会以原子性(atomic)的方式执行,当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。所以 Redisson 中使用了 EVAL 来保证执行命令操作数据时的安全性。

例子

这里就使用 Redisson 参考文档中的一个 RAtomicLong 对象的例子吧。

RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// 同步方式
longObject.compareAndSet(3, 401);
// 异步方式
longObject.compareAndSetAsync(3, 401);

RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive方式
longObject.compareAndSet(3, 401);

根据此例,我们分别来看 compareAndSet/compareAndSetAsync 的实现,其他命令原理都一样。

异步

既然同步和Reactive的实现都继承了异步的实现,那我们就先来看看CommandAsyncService吧。

例子中的 longObject.compareAndSetAsync(3, 401); 实际执行的是 RedissonAtomicLong 实现类的 compareAndSetAsync 方法,如下

public Future<Boolean> compareAndSetAsync(long expect, long update) {
    return commandExecutor.evalWriteAsync(getName(),
                                          StringCodec.INSTANCE,
                                          RedisCommands.EVAL_BOOLEAN,
                                          "...此处省略...",
                                          Collections.<Object>singletonList(getName()),
                                          expect, update);
}

此处的 evalWriteAsync 就是在 CommandAsyncService 中实现的,如下

public <T, R> Future<R> evalWriteAsync(String key,
                                       Codec codec,
                                       RedisCommand<T> evalCommandType,
                                       String script,
                                       List<Object> keys,
                                       Object ... params) {
    NodeSource source = getNodeSource(key);
    return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}

private <T, R> Future<R> evalAsync(NodeSource nodeSource,
                                   boolean readOnlyMode,
                                   Codec codec,
                                   RedisCommand<T> evalCommandType,
                                   String script,
                                   List<Object> keys,
                                   Object ... params) {
    Promise<R> mainPromise = connectionManager.newPromise();
    List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
    args.add(script);
    args.add(keys.size());
    args.addAll(keys);
    args.addAll(Arrays.asList(params));
    async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
    return mainPromise;
}

追根溯源,最后来看看 async 方法的实现,

protected <V, R> void async(final boolean readOnlyMode,
                            final NodeSource source,
                            final Codec codec,
                            final RedisCommand<V> command,
                            final Object[] params,
                            final Promise<R> mainPromise,
                            final int attempt) {
    // ....省略部分代码....
    // AsyncDetails 是一个包装对象,用来将异步调用过程中的对象引用包装起来方便使用
    final AsyncDetails<V, R> details = AsyncDetails.acquire();
    details.init(connectionFuture, attemptPromise,
            readOnlyMode, source, codec, command, params, mainPromise, attempt);

    // retryTimerTask 用来实现 Redisson 提供的重试机制
    final TimerTask retryTimerTask = new TimerTask() {

        @Override
        public void run(Timeout t) throws Exception {
            // ....省略部分代码....
            int count = details.getAttempt() + 1;
            // ....省略部分代码....
            async(details.isReadOnlyMode(), details.getSource(),
                    details.getCodec(), details.getCommand(),
                    details.getParams(), details.getMainPromise(), count);
            AsyncDetails.release(details);
        }
    };
    // 启用重试机制
    Timeout timeout = connectionManager.newTimeout(retryTimerTask,
            connectionManager.getConfig().getRetryInterval(),
            TimeUnit.MILLISECONDS);
    details.setTimeout(timeout);

    // checkConnectionFuture 用于检查客户端是否与服务端集群建立连接,如果连接建立
    // 则可发送命令到服务端执行
    if (connectionFuture.isDone()) {
        checkConnectionFuture(source, details);
    } else {
        connectionFuture.addListener(new FutureListener<RedisConnection>() {
            @Override
            public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
                checkConnectionFuture(source, details);
            }
        });
    }

    // ....省略部分代码....
}

private <R, V> void checkConnectionFuture(final NodeSource source,
        final AsyncDetails<V, R> details) {
    // ....省略部分代码....
    // 获取客户端与服务端集群建立的连接
    final RedisConnection connection = details.getConnectionFuture().getNow();

    if (details.getSource().getRedirect() == Redirect.ASK) {
        // 客户端接收到 ASK 转向, 先发送一个 ASKING 命令,然后再发送真正的命令请求
        // ....省略部分代码....
    } else {
        // ....省略部分代码....
        // 客户端发送命令到服务端
        ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
                details.getCodec(), details.getCommand(), details.getParams()));
        details.setWriteFuture(future);
    }
    // ....省略部分代码....
    // 释放本次连接
    releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
            details.getAttemptPromise(), details);
}

由于代码太长,我只贴出了和执行命令有关的部分代码,我们可以从上面代码中看到

  • Redisson 对每次操作都提供了重试机制,可配置 retryAttempts 来控制重试次数(缺省为3次),可配置 retryInterval 来控制重试间隔(缺省为 1000 ms)。Redisson 中使用了 Netty 的 TimerTaskTimeout 工具来实现其重试机制。
  • Redisson 中也大量使用了 Netty 实现的异步工具 FutureFutureListener,使得异步调用执行完成后能够立刻做出对应的操作。
  • RedissonConnection 是基于 Netty 实现的,发送命令的 send 方法实现是使用 Netty 的 Channel.writeAndFlush 方法。

以上便是 Redisson 的异步实现。

同步

Redisson 里的同步都是基于异步来实现的,为什么这么说,来看看 RedissonAtomicLongcompareAndSet 方法,

public boolean compareAndSet(long expect, long update) {
    return get(compareAndSetAsync(expect, update));
}

可见是在之前的异步方法外套了一个 get 方法,而这个 get 方法实际上也是在异步实现类 CommandAsyncService 中实现的,至于同步的实现类 CommandSyncService 有兴趣大家可以去看看,基本上都是在异步实现返回的 Future 外套了一个 get 方法。那么我们就看看 get 的实现,

public <V> V get(Future<V> future) {
    final CountDownLatch l = new CountDownLatch(1);
    future.addListener(new FutureListener<V>() {
        @Override
        public void operationComplete(Future<V> future) throws Exception {
            l.countDown();
        }
    });
    try {
        // 阻塞当前线程
        l.await();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    if (future.isSuccess()) {
        return future.getNow();
    }
    throw convertException(future);
}

原来是利用了 CountDownLatch 在异步调用结果返回前将当前线程阻塞,然后通过 Netty 的 FutureListener 在异步调用完成后解除阻塞,并返回调用结果。

Reactive

从例子中可以看到,Reactive 的客户端和对象实现都是独立的,先来看看 RedissonAtomicLongReactivecompareAndSet 方法,

public Publisher<Boolean> compareAndSet(long expect, long update) {
    return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE,
            RedisCommands.EVAL_BOOLEAN,
            "if redis.call('get', KEYS[1]) == ARGV[1] then "
                 + "redis.call('set', KEYS[1], ARGV[2]); "
                 + "return 1 "
               + "else "
                 + "return 0 end",
            Collections.<Object>singletonList(getName()), expect, update);
}

它调用的是 CommandReactiveService 中实现的 evalWriteReactive 方法,

public <T, R> Publisher<R> evalWriteReactive(String key, Codec codec,
        RedisCommand<T> evalCommandType, String script, List<Object> keys,
        Object... params) {
  Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
  return new NettyFuturePublisher<R>(f);
}

我们看到这里还是基于异步调用实现的,只是将异步调用返回的 Future 封装在了一个 NettyFuturePublisher 对象中返回,这个对象继承了 Reactive Streams 中的 Stream,所以我的解读也只能到此为止了,Reactive Streams 的相关知识目前我还不具备。

总结

  • Redisson 提供了 同步、异步 和 Reactive 三种命令执行方式。
  • 同步 和 Reactive 的实现是基于 异步 的实现的。
  • Redisson 使用 Netty 连接 Redis 服务,并依赖 Netty 异步工具类来实现异步通信、重试机制、阻塞等特性。
  • Redisson 使用 Reactive Streams 来实现 Reactive 特性。

本文同时发布于我的微信订阅号


无罔
无罔
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 本文将从Redis的基本特性入手,通过讲述Redis的数据结构和主要命令对Redis的基本能力进行直观介绍。之后概...
    kelgon阅读 61,147评论 23 626
  • 感受父亲小时候感受过的景色,听他似孩子般讲述印象中的童年景色与趣事。就是生活中的确幸。
    若愚233阅读 292评论 0 0
  • 最初建立时间:2017年1月1日 这是你第一次把你的愿望用文字全面呈现,那么这一天就是你人生中的历史时刻。 最后更...
    精良疆域阅读 489评论 0 0
  • 勤俭节约自古以来就一直是一种美德,可自古以来就一直出现一种现象~贫富差距过来。即使到了近代也一然出现这种现象,...
    徐猛_Merlin阅读 537评论 0 1