dataX文档中关于Job和TaskGroup的描述为:
- JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker
- TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。
Yarn中的JobTracker和Yarn中的TaskTracker通过RPC进行通讯。
对于以standalone模式运行的dataX是如何进行通讯的,即JobContainer如何感知TaskGroupContainer和Task执行情况的呢?
通讯模块
统一口径
TaskGroupContainer向JobContainer上报信息,称JobContainer是TaskGroupContainer的上级;
Task向TaskGroupContainer上报信息,称TaskGroupContainer是Task的上级。
抽象类AbstractContainerCommunicator
dataX中提供了一个基类AbstractContainerCommunicator来处理JobContainer、TaskGroupContainer和Task的通讯。AbstractContainerCommunicator提供了注册、收集信息等接口,信息的单位是Communication(一个类)。AbstractContainerCommunicator主要将其功能委托给其主要包含两个模块:
AbstractContainerCommunicator#Collector
Collector负责管理下级注册到上级,搜集并合并下级所有的信息。 dataX提供一个基类AbstractCollector和一个实现类ProcessInnerCollector。实现类ProcessInnerCollector只实现 了一个方法collectFromTaskGroup。AbstractCollector同时包含将Task注册到TaskGroupContainer和将TaskGroupContainer注册到JobContainer的功能。
-
taskCommunicationMap
用于保存Task注册到TaskGroupContainer,当Task注册到TaskGroupContainer的时候将TaskId和新建的Communication对象保存进taskCommunicationMap
即可。 - TaskGroupContainer注册到JobContainer注册信息则是保存在全局变量LocalTGCommunicationManager中,便于全局访问。
此外AbstractCollector#collectFromTask提供搜集所有任务信息的功能;ProcessInnerCollector#collectFromTaskGroup提供搜集所有TaskGroupContainer的信息。
public abstract class AbstractCollector {
private Map<Integer, Communication> taskCommunicationMap = new ConcurrentHashMap<Integer, Communication>();
private Long jobId;
public Map<Integer, Communication> getTaskCommunicationMap() {
return taskCommunicationMap;
}
public Long getJobId() {
return jobId;
}
public void setJobId(Long jobId) {
this.jobId = jobId;
}
public void registerTGCommunication(List<Configuration> taskGroupConfigurationList) {
for (Configuration config : taskGroupConfigurationList) {
int taskGroupId = config.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
LocalTGCommunicationManager.registerTaskGroupCommunication(taskGroupId, new Communication());
}
}
public void registerTaskCommunication(List<Configuration> taskConfigurationList) {
for (Configuration taskConfig : taskConfigurationList) {
int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
this.taskCommunicationMap.put(taskId, new Communication());
}
}
public Communication collectFromTask() {
Communication communication = new Communication();
communication.setState(State.SUCCEEDED);
for (Communication taskCommunication :
this.taskCommunicationMap.values()) {
communication.mergeFrom(taskCommunication);
}
return communication;
}
public abstract Communication collectFromTaskGroup();
public Map<Integer, Communication> getTGCommunicationMap() {
return LocalTGCommunicationManager.getTaskGroupCommunicationMap();
}
public Communication getTGCommunication(Integer taskGroupId) {
return LocalTGCommunicationManager.getTaskGroupCommunication(taskGroupId);
}
public Communication getTaskCommunication(Integer taskId) {
return this.taskCommunicationMap.get(taskId);
}
}
public class ProcessInnerCollector extends AbstractCollector {
public ProcessInnerCollector(Long jobId) {
super.setJobId(jobId);
}
@Override
public Communication collectFromTaskGroup() {
return LocalTGCommunicationManager.getJobCommunication();
}
}
AbstractContainerCommunicator#Reporter
Reporter的主要功能是将收集到的信息上报给上级。dataX提供一个基类AbstractReporter和一个实现类ProcessInnerCollector.
- ProcessInnerCollector#reportJobCommunication将job信息汇报给上级,job在dataX中是最上级,所以该方法没有操作。
- ProcessInnerCollector#reportTGCommunication将TaskGroupContianer的信息汇报给上级,操作也很简单直接更新注册时分配给该TaskGroup的Communication(Map中的值)
public class ProcessInnerReporter extends AbstractReporter {
@Override
public void reportJobCommunication(Long jobId, Communication communication) {
// do nothing
}
@Override
public void reportTGCommunication(Integer taskGroupId, Communication communication) {
LocalTGCommunicationManager.updateTaskGroupCommunication(taskGroupId, communication);
}
}
具体实现类
StandAloneJobContainerCommunicator
StandAloneJobContainerCommunicator是AbstractContainerCommunicator一个实现类,主要处理JobContainer和TaskGroupContainer之间的信息传递。
- 每个TaskGroupContainer通过StandAloneJobContainerCommunicator#registerCommunication注册
- 注册之后TaskGroupContainer每隔一段时间通过StandAloneJobContainerCommunicator#Reporter#report向JobContainer发送自己的状态。
- JobContainer每隔一段时间通过StandAloneJobContainerCommunicator#collect获取TaskGroup的信息。最后调用StandAloneJobContainerCommunicator#report向上级汇报,这里JobContainer已经是最上级了,向日志中输出先关信息即可。
###########StandAloneJobContainerCommunicator
StandAloneJobContainerCommunicator是AbstractContainerCommunicator的另一个实现类,该类主要处理TaskGroupContainer和Task之间的信息,处理逻辑和StandAloneJobContainerCommunicator差不多。