前言
最近整理了一下RocketMQ Connect Runtime中处理connector/task 的config的逻辑,然后也尝试提出了几个优化RocketMQ Connect Runtime的想法,其中有不少问题,还希望大家一起讨论。同时本文里提出的一个可能的办法依赖于我对ServiceThread
的理解是对的:ServiceThread 周期性地执行一个函数,或者在其他线程调用wakeup()时立即执行一次函数。
- 当前RocketMQ connect runtime的config处理逻辑
- 如何理解Data Plane和Control Plane的解耦
- 如何实现强制退出connector/task
- 如何增强当前有的异常处理机制
- 资源隔离相关的思考
config update/remove的流程
当前新增connector是由putConnectorConfig()方法完成的,主要步骤如下
- 调用
connectorKeyValueStore.put(connectorName, configs);
,将新的connector 以及对应的config添加到当前worker内存中的keyValueStore中 - 触发一次
recomputeTaskConfigs(connectorName, connector, currentTimestamp);
- recomputeTaskConfig调用某个Connector的taskConfig() 方法获得该connector对应的task配置,然后添加到当前worker内存中的taskKeyValueStore
- 将更新后的connector/task config 发送至其他worker节点。该步骤逻辑上实现了不同worker间的config同步
- triggerListener():通知所有正在监听config改变事件的listener,这里面的listener主要是rebalanceService。 当rebalanceService 收到config发生变化的通知时,就会根据更新后的config触发rebalance()
而目前删除config的流程是
- 将待删除的connector的config设置为DELETED(并不删除keyValueStore中对应的entry,只是将config覆盖成DELETED)
- 同时将待删除该connector下所对应的所有task也设置为DELETED
- 将更新后的connector/task config 发送至其他worker节点。
- 没有调用triggerListener()
为什么在removeConfig的时候并没有调用triggerListener()嘞?也许是有特别的设计考量,不过考虑到remove和put的行为应当一致,或许可以考虑加上triggerListener()。
值得注意的是,被标记为 DELETED config并没有被完全删除。这里的没有被完全删除,指的是,无论是内存中的keyValueStore中所对应的那个map,还是磁盘上的json文件,都仍有被删除的connector所对应的一条entry。
不过这些没有被删除的config并不会造成影响,理由如下
- 所有取得当前的config的操作,几乎都是经由getConnectorConfigs()过滤之后的,也就是说getConnectorConfigs() 不包含已经被DELETED的config。而当putConnectorConfig()的时候,也会去检查configMap中是否已经含有这个connector所对应的config了,有的话也会直接覆盖掉。整体来说,要不要定期的清理被DELETED的config对现在runtime使用影响不大。
另外一个思考,config 的persist行为只是周期性的,并不是发生修改后立即就触发一次persist,为什么没有必要发生修改后立即触发呢?对于Runtime来说keyValueStore的persist过程应该是透明的,具体什么时候persist不由runtime来决定,类似于操作系统里page的flush过程。
另外startConnectors() 所传进来的connector config永远不可能包含是带有DELETED标志位的config, 这是因为startConnectors() 所使用进来的connector config 是经过了getConnectorConfig() 的filter的,因此这个if 语句里有一半永远不会被执行到。
for (WorkerConnector workerConnector : workingConnectors) {
String connectorName = workerConnector.getConnectorName();
ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
// TODO here keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED) can never be 0, because
if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED))
当然这样写也没有什么问题,我们这个方法不应该依赖于connectorConfig包不包含DELETED config。
Control Plane以及Data Plane的解耦
Worker类管理WorkerConnector/WorkerSinkTask的生命周期,同时Worker类还包含TaskPoitionCommitService所能用到的commitTaskPosition()方法。必须把commitTaskPosition()放到Worker类中的原因是:
public void commitTaskPosition
Map<ByteBuffer, ByteBuffer> positionData = new HashMap<>();
Map<ByteBuffer, ByteBuffer> offsetData = new HashMap<>();
for (Runnable task : workingTasks) {
if (task instanceof WorkerSourceTask) {
positionData.putAll(((WorkerSourceTask) task).getPositionData());
positionManagementService.putPosition(positionData);
} else if (task instanceof WorkerSinkTask) {
offsetData.putAll(((WorkerSinkTask) task).getOffsetData());
offsetManagementService.putPosition(offsetData);
}
}
}
该方法依赖到了Worker的成员变量 workingTasks
。这意味着Worker类还要负责提交消费进度。理论上Worker只负责管理connecor/task的生命周期,而把所有和pull messages, commit consume position等等和业务逻辑相关的代码都不由其管理.
Worker还负责管理producer的生命周期(所有的WorkerSourceTask都用的同一个producer, 不知道有没有办法像consumer一样每个WorkerSourceTask一个consumer的实例,我猜这是由RokcetMQ的性质决定的)
Worker类中还管理了WorkerSourceTask,不过和WorkerSinkTask类似,就不再重复叙述了
taskCommitService问题的本质是,虽然taskCommitSerice和task的生命周期无关,但是taskCommitService需要获取当前所有的task的信息。这就涉及到了thread-safe publish。之前的策略是由Worker并不直接publish 成员变量,而是在Worker里提供一个实现了具体功能的方法给其他类。这样的好处是并发管理相对简单,缺点是可扩展性低,且牺牲了一定的隔离性。
WorkerSinkTask类中也有耦合。在WorkerSinkTask的run() 方法里,既调用SinkTask#start(), 也有很多和从RocketMQ 存取数据相关的业务逻辑代码。其中和RocketMQ存取数据相关的业务逻辑代码是WorkerSinkTask的本职工作。接下来我们开始分析要怎么解耦。
目前stop一个WorkerSinkTask的代码是这样的
public void stop() {
isStopping.set(true);
consumer.shutdown();
sinkTask.stop();
}
前两步都是确定的行为,如果consumer.shutdown()出现异常,Worker有能力对其进行处理,问题就在sinkTask.stop()中。
SinkTask#start(), SinkTask#initialize() 可能会创建一部分依赖资源,比如数据库连接,而这些依赖是Runtime,或者说Worker类不知情的:Worker不可能提前知道某个Task会依赖哪些资源。SinkTask#stop() 同理,关闭资源的时候会上层的SinkTask会做什么是事先不知道的。因此SinkTask#stop() 的行为对于Worker来说完全是nondeterministic的。举个例子,SinkTask#stop()完全有可能一直调用sleep(),从而阻塞整个stop() 流程,我们要保证的就是这样的事不会发生。
之前在创建DatabaseConnection的时候会遇到创建DatabaseConnection报错了,但是因为没有catch住异常,或者因为DatabaseConnection会无限制的重试,我们的SinkConnector.start()被永久的block住了,没有办法退出也没有办法继续
还遇到过,connector依赖的资源没有创建好,但是仍然有对应task在运行。我们有必要确保只有connector处于健康的状态的时候,再去执行对应的task
我们仔细再想想,如果要让Worker管理WorkerSinkTask的生命周期,那么Worker必须要知道SinkTask#start() 或者SinkTask#stop() 是否成功,而这个成功与否又是应该由SinkTask来判断的,每个SinkTask对成功的定义都不一样。
想到SinkTask的状态有点像Kubernets里每一个pod的生命周期/状态,在k8s里每个pod可能处于
- Pending: 尝试创建
- Running: 正在工作
- Succeeded: 执行完毕
- Failed : 要么是出现了错误,要么是等待的某个资源不可获得,k8s负责定位具体错误原因的责任
- Unknown: 没法识别的错误就可以归到Unknown里去
类似的,也许我们可以给每个SinkTask定义一些状态。具体的状态转移要怎么完成,我可能没有太多的这类问题的经验,所以希望大家给点建议。不过核心的决策点在于
- 是由Runtime主动轮询SinkTask的状态,
- 还是由SinkTask主动通知runtime自己处于什么状态。
那么回到k8s。我们知道Kubernetes的容器生命周期有容器探针的参与(比如在pod里跑一个脚本根据执行后的返回值判断容器是否健康),Kubernetes也会去捕捉容器在创建过程中的错误,因此我猜测Kubernetes的整体实现是前者。
个人意见,Kubernetes之所以能够使用前一种方法,原因在于容器依赖的资源的创建(包括CPU资源分配,volume资源的创建,以及ip地址的分配)都是由kubernetes管理的,这也就意味着kubernetes有足够的信息判断一个pod是否创建成功:如果pod所依赖的资源的创建过程中出错了,那么Kubernetes可以很明确的将pod置于Failed状态或者重试。
回到我们的问题,SinkTask所依赖的资源是由SinkTask管理的,底层的runtime对此并不知情,因此也就没有足够的手段和信息来判断上层SinkTask应该处于什么状态。即使通过try catch能够捕捉到一部分的异常情况,我们也很难说所有的failure case都一定能被异常检测机制给侦测出来。如前所述,Kubernetes在无法得知用户自定义的依赖是否正常时,将决定pod状态的责任交由用户自己决定(用提供用户自定义probe(探针)的方式)
因此由Runtime去监测SinkTask的状态是可行的办法,我们再来看看第二种由SinkTask通知runtime的实现。
理论上底层runtime的实现应该对上层的SinkTask保持透明,也就是说,如果上层SinkTask想要改变其在runtime中对应的状态的话,只能通过openmessaging里定义的SinkTaskContext 来实现。不幸的是,openmessaing里的SinkTaskContext并没有定义相关接口。
SinkConnectorContext 或者 SinkTaskContext 可以看作上层connector/task 和底层runtime交互的接口, 比如在jdbc-sink-connector 里就会通过调用context的
requestTaskReconfiguration()
方法来通知runtime需要重新划分该connector下的task。
因此我们可以这样设计
- WorkerSinkTask负责管理SinkTask的状态,有New, Pending, Running, Stopping, Stopped, Error这样六种状态
- 把SinkTask#start() 和 SinkTask#stop() 都放到WorkerSinkTask#run() 方法中。这样做的原因是为了更好的隔离性:所有具有不确定行为的方法都应该丢到WorkerSinkTask#run()中,在一个独立的线程中运行,也就是说Worker的主线程不负责保证SinkTask的依赖被成功创建或者销毁。
- 在WorkerSinkTask#run() 中检测是否SinkTask#start()或者SinkTask#stop() 成功执行,比如说,如果SinkTask#start()执行完毕前处于Pending状态,如果没有异常的执行完毕则将状态改变为Running,如果出现了Exception则状态转移至Error状态
- 由Worker在每次startTasks()时管理每个task的状态
Connector可以用类似的思路,更详细的设计文档在
设计文档
当WorkerConnector和WorkerSinkTask的生命周期管理太复杂之后,我们也许可以考虑把Worker拆分成两个类,分别管理Connector和Task的生命周期,就像是Kubernetes里的Deployment(ReplicaSet) 与 Pod之间的关系。
当前Worker#stop() 并不管正在跑的线程的状态,如果采用了这种状态机的方式的话,我们可能得想办法把每个task的状态持久化下来
强制退出
每个WorkerSinkTasks也是一个Runnable,在被Worker提交到线程池之后对应的是ExecutorService中的一个线程,start就是把WorkerSinkTask提交到ExecutorService,然而目前的WorkerSinkTask.stop()并不是 cancel掉这个Runnable,而是通过设置线程内的一个isStopping 变量让这个Task完成执行从而退出。问题就出现在这里,WorkerSinkTask会调用SinkTask对象中的一些方法,比如put() 来向目标数据库存放数据,因此WorkerSinkTask完全有可能SinkTask的某个方法给block住。stop() 是non-blocking的,并不能得知WorkerSinkTask 是否正常完成。
...
workerSourceTask.stop(); // This is a non-blocking
stoppedTasks.add(workerSourceTask);
...
workingTasks.removeAll(stoppedTasks);
而假设stopedTask并没有退出,那么workingTask.removeAll(stoppedTasks)
就会丢失对stoppedTask的引用,而在ExecutorService中仍然保留着对stoppedTask的引用,这就造成了我们没有办法cancel掉本应stop但因为错误而没有stop的WorkerSinkTask。
几个问题
- 如何判断某个WorkerSinkTask无法正常退出,必须由Runtime强制退出
- 如果定位到WorkerSinkTask无法正常退出的原因,如果有Exception应该怎么处理
- 用户是否应该有权不经过gracefully的stop尝试,直接cancel掉某个WorkerSinkTask
结合之前状态机的思路,如果一个task长期处于Stopping状态,那么超过一定的实现之后我们可以认为它timeout了,那么我们就可以直接cancel掉这个正在执行的task。这个方法要求我们持有每个ExecutorService.submit(workerSinktask) 所返回的Future对象。
WorkerConnector的强制关闭可以采取和WorkerSinkTask相同的办法
资源隔离
我暂时还没想这个,因为task都是在线程池里运行的,已经有一定的隔离性了,我还没有想好更多的资源隔离方案。
异常处理
我们讨论异常处理主要有两方面的目的,
- 是确保更好的隔离性,尽量让所有的异常都能被正确的处理
- 在强制关闭那一节里提及的,runtime可以使用异常处理来判断connector所处于的状态
异常处理确实是一个很麻烦的问题,我们要理清楚有哪些类是需要重点关注的。
- Worker
- WorkerConnector
- WorkerSinkTask
- WorkerSourceTask
- 所有需要load class的地方
对于WorkerSinkTask,我们可以try catch每个task submit后返回的Future对象的get() 方法,来判断任务执行过程中是否出现了exception,比如
for (Runnable runnable: errorTasks) {
WorkerTask workerTask = (WorkerTask) runnable;
workerTask.cleanup();
Future future = taskToFutureMap.get(runnable);
try {
future.get(1000, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Throwable t = e.getCause();
t.printStackTrace();
} catch (CancellationException e) {
} catch (TimeoutException e) {
} catch (InterruptedException e) {
}
finally {
taskToFutureMap.remove(runnable);
}
}
安全关闭RocketMQ Connect Runtime
我们可以在Worker#stop() 中安全的关闭ExecutorService
用户交互
如果引入状态管理了,我们也许可以增强一下当前的RESTful接口,使得用户可以看到每个task/connector的状态.
其他问题
现在Worker的stop() 似乎并不会清空所有正在跑的task(),而是单纯的释放了一些资源,我觉得这近似于暴力退出。我们可以思考一下Worker的stop() 应该具有什么样的语义,以及如何优雅的退出。(现在确实存在退出前正在跑的task会在重新启动runtime之后又出现)(另外这个涉及到interruption的处理问题,有机会了可以去看看我们现在的程序的interrupt处理机制是什么样的)
突然意识到一个挺严重的问题,如果reload balance