RocketMQ Connect Runtime: 一些优化的想法

前言

最近整理了一下RocketMQ Connect Runtime中处理connector/task 的config的逻辑,然后也尝试提出了几个优化RocketMQ Connect Runtime的想法,其中有不少问题,还希望大家一起讨论。同时本文里提出的一个可能的办法依赖于我对ServiceThread
的理解是对的:ServiceThread 周期性地执行一个函数,或者在其他线程调用wakeup()时立即执行一次函数。

  • 当前RocketMQ connect runtime的config处理逻辑
  • 如何理解Data Plane和Control Plane的解耦
  • 如何实现强制退出connector/task
  • 如何增强当前有的异常处理机制
  • 资源隔离相关的思考

config update/remove的流程

当前新增connector是由putConnectorConfig()方法完成的,主要步骤如下

  1. 调用connectorKeyValueStore.put(connectorName, configs); ,将新的connector 以及对应的config添加到当前worker内存中的keyValueStore中
  2. 触发一次recomputeTaskConfigs(connectorName, connector, currentTimestamp);
  3. recomputeTaskConfig调用某个Connector的taskConfig() 方法获得该connector对应的task配置,然后添加到当前worker内存中的taskKeyValueStore
  4. 将更新后的connector/task config 发送至其他worker节点。该步骤逻辑上实现了不同worker间的config同步
  5. triggerListener():通知所有正在监听config改变事件的listener,这里面的listener主要是rebalanceService。 当rebalanceService 收到config发生变化的通知时,就会根据更新后的config触发rebalance()

而目前删除config的流程是

  1. 将待删除的connector的config设置为DELETED(并不删除keyValueStore中对应的entry,只是将config覆盖成DELETED)
  2. 同时将待删除该connector下所对应的所有task也设置为DELETED
  3. 将更新后的connector/task config 发送至其他worker节点。
  4. 没有调用triggerListener()

为什么在removeConfig的时候并没有调用triggerListener()嘞?也许是有特别的设计考量,不过考虑到remove和put的行为应当一致,或许可以考虑加上triggerListener()。

值得注意的是,被标记为 DELETED config并没有被完全删除。这里的没有被完全删除,指的是,无论是内存中的keyValueStore中所对应的那个map,还是磁盘上的json文件,都仍有被删除的connector所对应的一条entry。

不过这些没有被删除的config并不会造成影响,理由如下

  • 所有取得当前的config的操作,几乎都是经由getConnectorConfigs()过滤之后的,也就是说getConnectorConfigs() 不包含已经被DELETED的config。而当putConnectorConfig()的时候,也会去检查configMap中是否已经含有这个connector所对应的config了,有的话也会直接覆盖掉。整体来说,要不要定期的清理被DELETED的config对现在runtime使用影响不大。

另外一个思考,config 的persist行为只是周期性的,并不是发生修改后立即就触发一次persist,为什么没有必要发生修改后立即触发呢?对于Runtime来说keyValueStore的persist过程应该是透明的,具体什么时候persist不由runtime来决定,类似于操作系统里page的flush过程。

另外startConnectors() 所传进来的connector config永远不可能包含是带有DELETED标志位的config, 这是因为startConnectors() 所使用进来的connector config 是经过了getConnectorConfig() 的filter的,因此这个if 语句里有一半永远不会被执行到。

 for (WorkerConnector workerConnector : workingConnectors) {
            String connectorName = workerConnector.getConnectorName();
            ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
            // TODO here keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED) can never be 0, because 
            if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) 

当然这样写也没有什么问题,我们这个方法不应该依赖于connectorConfig包不包含DELETED config。

Control Plane以及Data Plane的解耦

Worker类管理WorkerConnector/WorkerSinkTask的生命周期,同时Worker类还包含TaskPoitionCommitService所能用到的commitTaskPosition()方法。必须把commitTaskPosition()放到Worker类中的原因是:

    public void commitTaskPosition
        Map<ByteBuffer, ByteBuffer> positionData = new HashMap<>();
        Map<ByteBuffer, ByteBuffer> offsetData = new HashMap<>();
        for (Runnable task : workingTasks) {
            if (task instanceof WorkerSourceTask) {
                positionData.putAll(((WorkerSourceTask) task).getPositionData());
                positionManagementService.putPosition(positionData);
            } else if (task instanceof WorkerSinkTask) {
                offsetData.putAll(((WorkerSinkTask) task).getOffsetData());
                offsetManagementService.putPosition(offsetData);
            }
        }
    }

该方法依赖到了Worker的成员变量 workingTasks。这意味着Worker类还要负责提交消费进度。理论上Worker只负责管理connecor/task的生命周期,而把所有和pull messages, commit consume position等等和业务逻辑相关的代码都不由其管理.

Worker还负责管理producer的生命周期(所有的WorkerSourceTask都用的同一个producer, 不知道有没有办法像consumer一样每个WorkerSourceTask一个consumer的实例,我猜这是由RokcetMQ的性质决定的)

Worker类中还管理了WorkerSourceTask,不过和WorkerSinkTask类似,就不再重复叙述了

taskCommitService问题的本质是,虽然taskCommitSerice和task的生命周期无关,但是taskCommitService需要获取当前所有的task的信息。这就涉及到了thread-safe publish。之前的策略是由Worker并不直接publish 成员变量,而是在Worker里提供一个实现了具体功能的方法给其他类。这样的好处是并发管理相对简单,缺点是可扩展性低,且牺牲了一定的隔离性。

WorkerSinkTask类中也有耦合。在WorkerSinkTask的run() 方法里,既调用SinkTask#start(), 也有很多和从RocketMQ 存取数据相关的业务逻辑代码。其中和RocketMQ存取数据相关的业务逻辑代码是WorkerSinkTask的本职工作。接下来我们开始分析要怎么解耦。

目前stop一个WorkerSinkTask的代码是这样的

    public void stop() {
        isStopping.set(true);
        consumer.shutdown();
        sinkTask.stop();
    }

前两步都是确定的行为,如果consumer.shutdown()出现异常,Worker有能力对其进行处理,问题就在sinkTask.stop()中。

SinkTask#start(), SinkTask#initialize() 可能会创建一部分依赖资源,比如数据库连接,而这些依赖是Runtime,或者说Worker类不知情的:Worker不可能提前知道某个Task会依赖哪些资源。SinkTask#stop() 同理,关闭资源的时候会上层的SinkTask会做什么是事先不知道的。因此SinkTask#stop() 的行为对于Worker来说完全是nondeterministic的。举个例子,SinkTask#stop()完全有可能一直调用sleep(),从而阻塞整个stop() 流程,我们要保证的就是这样的事不会发生。

之前在创建DatabaseConnection的时候会遇到创建DatabaseConnection报错了,但是因为没有catch住异常,或者因为DatabaseConnection会无限制的重试,我们的SinkConnector.start()被永久的block住了,没有办法退出也没有办法继续

还遇到过,connector依赖的资源没有创建好,但是仍然有对应task在运行。我们有必要确保只有connector处于健康的状态的时候,再去执行对应的task

我们仔细再想想,如果要让Worker管理WorkerSinkTask的生命周期,那么Worker必须要知道SinkTask#start() 或者SinkTask#stop() 是否成功,而这个成功与否又是应该由SinkTask来判断的,每个SinkTask对成功的定义都不一样。

想到SinkTask的状态有点像Kubernets里每一个pod的生命周期/状态,在k8s里每个pod可能处于

  • Pending: 尝试创建
  • Running: 正在工作
  • Succeeded: 执行完毕
  • Failed : 要么是出现了错误,要么是等待的某个资源不可获得,k8s负责定位具体错误原因的责任
  • Unknown: 没法识别的错误就可以归到Unknown里去

类似的,也许我们可以给每个SinkTask定义一些状态。具体的状态转移要怎么完成,我可能没有太多的这类问题的经验,所以希望大家给点建议。不过核心的决策点在于

  • 是由Runtime主动轮询SinkTask的状态,
  • 还是由SinkTask主动通知runtime自己处于什么状态。

那么回到k8s。我们知道Kubernetes的容器生命周期有容器探针的参与(比如在pod里跑一个脚本根据执行后的返回值判断容器是否健康),Kubernetes也会去捕捉容器在创建过程中的错误,因此我猜测Kubernetes的整体实现是前者。

个人意见,Kubernetes之所以能够使用前一种方法,原因在于容器依赖的资源的创建(包括CPU资源分配,volume资源的创建,以及ip地址的分配)都是由kubernetes管理的,这也就意味着kubernetes有足够的信息判断一个pod是否创建成功:如果pod所依赖的资源的创建过程中出错了,那么Kubernetes可以很明确的将pod置于Failed状态或者重试。

回到我们的问题,SinkTask所依赖的资源是由SinkTask管理的,底层的runtime对此并不知情,因此也就没有足够的手段和信息来判断上层SinkTask应该处于什么状态。即使通过try catch能够捕捉到一部分的异常情况,我们也很难说所有的failure case都一定能被异常检测机制给侦测出来。如前所述,Kubernetes在无法得知用户自定义的依赖是否正常时,将决定pod状态的责任交由用户自己决定(用提供用户自定义probe(探针)的方式)

因此由Runtime去监测SinkTask的状态是可行的办法,我们再来看看第二种由SinkTask通知runtime的实现。

理论上底层runtime的实现应该对上层的SinkTask保持透明,也就是说,如果上层SinkTask想要改变其在runtime中对应的状态的话,只能通过openmessaging里定义的SinkTaskContext 来实现。不幸的是,openmessaing里的SinkTaskContext并没有定义相关接口。

SinkConnectorContext 或者 SinkTaskContext 可以看作上层connector/task 和底层runtime交互的接口, 比如在jdbc-sink-connector 里就会通过调用context的requestTaskReconfiguration() 方法来通知runtime需要重新划分该connector下的task。

因此我们可以这样设计

  • WorkerSinkTask负责管理SinkTask的状态,有New, Pending, Running, Stopping, Stopped, Error这样六种状态
  • 把SinkTask#start() 和 SinkTask#stop() 都放到WorkerSinkTask#run() 方法中。这样做的原因是为了更好的隔离性:所有具有不确定行为的方法都应该丢到WorkerSinkTask#run()中,在一个独立的线程中运行,也就是说Worker的主线程不负责保证SinkTask的依赖被成功创建或者销毁。
  • 在WorkerSinkTask#run() 中检测是否SinkTask#start()或者SinkTask#stop() 成功执行,比如说,如果SinkTask#start()执行完毕前处于Pending状态,如果没有异常的执行完毕则将状态改变为Running,如果出现了Exception则状态转移至Error状态
  • 由Worker在每次startTasks()时管理每个task的状态

Connector可以用类似的思路,更详细的设计文档在
设计文档

当WorkerConnector和WorkerSinkTask的生命周期管理太复杂之后,我们也许可以考虑把Worker拆分成两个类,分别管理Connector和Task的生命周期,就像是Kubernetes里的Deployment(ReplicaSet) 与 Pod之间的关系。

当前Worker#stop() 并不管正在跑的线程的状态,如果采用了这种状态机的方式的话,我们可能得想办法把每个task的状态持久化下来

强制退出

每个WorkerSinkTasks也是一个Runnable,在被Worker提交到线程池之后对应的是ExecutorService中的一个线程,start就是把WorkerSinkTask提交到ExecutorService,然而目前的WorkerSinkTask.stop()并不是 cancel掉这个Runnable,而是通过设置线程内的一个isStopping 变量让这个Task完成执行从而退出。问题就出现在这里,WorkerSinkTask会调用SinkTask对象中的一些方法,比如put() 来向目标数据库存放数据,因此WorkerSinkTask完全有可能SinkTask的某个方法给block住。stop() 是non-blocking的,并不能得知WorkerSinkTask 是否正常完成。

...
workerSourceTask.stop(); // This is a non-blocking
stoppedTasks.add(workerSourceTask);
...
workingTasks.removeAll(stoppedTasks);

而假设stopedTask并没有退出,那么workingTask.removeAll(stoppedTasks) 就会丢失对stoppedTask的引用,而在ExecutorService中仍然保留着对stoppedTask的引用,这就造成了我们没有办法cancel掉本应stop但因为错误而没有stop的WorkerSinkTask。

几个问题

  • 如何判断某个WorkerSinkTask无法正常退出,必须由Runtime强制退出
  • 如果定位到WorkerSinkTask无法正常退出的原因,如果有Exception应该怎么处理
  • 用户是否应该有权不经过gracefully的stop尝试,直接cancel掉某个WorkerSinkTask

结合之前状态机的思路,如果一个task长期处于Stopping状态,那么超过一定的实现之后我们可以认为它timeout了,那么我们就可以直接cancel掉这个正在执行的task。这个方法要求我们持有每个ExecutorService.submit(workerSinktask) 所返回的Future对象。

WorkerConnector的强制关闭可以采取和WorkerSinkTask相同的办法

资源隔离

我暂时还没想这个,因为task都是在线程池里运行的,已经有一定的隔离性了,我还没有想好更多的资源隔离方案。

异常处理

我们讨论异常处理主要有两方面的目的,

  1. 是确保更好的隔离性,尽量让所有的异常都能被正确的处理
  2. 在强制关闭那一节里提及的,runtime可以使用异常处理来判断connector所处于的状态
    异常处理确实是一个很麻烦的问题,我们要理清楚有哪些类是需要重点关注的。
  • Worker
  • WorkerConnector
  • WorkerSinkTask
  • WorkerSourceTask
  • 所有需要load class的地方

对于WorkerSinkTask,我们可以try catch每个task submit后返回的Future对象的get() 方法,来判断任务执行过程中是否出现了exception,比如

for (Runnable runnable: errorTasks) {
            WorkerTask workerTask = (WorkerTask) runnable;
            workerTask.cleanup();
            Future future = taskToFutureMap.get(runnable);
            try {
                future.get(1000, TimeUnit.MILLISECONDS);
            } catch (ExecutionException e) {
                Throwable t = e.getCause();
                t.printStackTrace();
            } catch (CancellationException e) {
            } catch (TimeoutException e) {
            } catch (InterruptedException e) {
            }
            finally {
                taskToFutureMap.remove(runnable);
            }
        }

安全关闭RocketMQ Connect Runtime

我们可以在Worker#stop() 中安全的关闭ExecutorService

用户交互

如果引入状态管理了,我们也许可以增强一下当前的RESTful接口,使得用户可以看到每个task/connector的状态.

其他问题

  • 现在Worker的stop() 似乎并不会清空所有正在跑的task(),而是单纯的释放了一些资源,我觉得这近似于暴力退出。我们可以思考一下Worker的stop() 应该具有什么样的语义,以及如何优雅的退出。(现在确实存在退出前正在跑的task会在重新启动runtime之后又出现)(另外这个涉及到interruption的处理问题,有机会了可以去看看我们现在的程序的interrupt处理机制是什么样的)

  • 突然意识到一个挺严重的问题,如果reload balance

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352