dataX中Job、TaskGroup和Task的通讯机制

dataX文档中关于Job和TaskGroup的描述为:

  1. JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker
  2. TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。
    Yarn中的JobTracker和Yarn中的TaskTracker通过RPC进行通讯。

对于以standalone模式运行的dataX是如何进行通讯的,即JobContainer如何感知TaskGroupContainer和Task执行情况的呢?

通讯模块

统一口径

TaskGroupContainer向JobContainer上报信息,称JobContainer是TaskGroupContainer的上级;
Task向TaskGroupContainer上报信息,称TaskGroupContainer是Task的上级。

抽象类AbstractContainerCommunicator

dataX中提供了一个基类AbstractContainerCommunicator来处理JobContainer、TaskGroupContainer和Task的通讯。AbstractContainerCommunicator提供了注册、收集信息等接口,信息的单位是Communication(一个类)。AbstractContainerCommunicator主要将其功能委托给其主要包含两个模块:

AbstractContainerCommunicator#Collector

Collector负责管理下级注册到上级,搜集并合并下级所有的信息。 dataX提供一个基类AbstractCollector和一个实现类ProcessInnerCollector。实现类ProcessInnerCollector只实现 了一个方法collectFromTaskGroup。AbstractCollector同时包含将Task注册到TaskGroupContainer和将TaskGroupContainer注册到JobContainer的功能。

  • taskCommunicationMap用于保存Task注册到TaskGroupContainer,当Task注册到TaskGroupContainer的时候将TaskId和新建的Communication对象保存进taskCommunicationMap即可。
  • TaskGroupContainer注册到JobContainer注册信息则是保存在全局变量LocalTGCommunicationManager中,便于全局访问。

此外AbstractCollector#collectFromTask提供搜集所有任务信息的功能;ProcessInnerCollector#collectFromTaskGroup提供搜集所有TaskGroupContainer的信息。

public abstract class AbstractCollector {
    
    private Map<Integer, Communication> taskCommunicationMap = new ConcurrentHashMap<Integer, Communication>();
    private Long jobId;

    public Map<Integer, Communication> getTaskCommunicationMap() {
        return taskCommunicationMap;
    }

    public Long getJobId() {
        return jobId;
    }

    public void setJobId(Long jobId) {
        this.jobId = jobId;
    }
    
    public void registerTGCommunication(List<Configuration> taskGroupConfigurationList) {
        for (Configuration config : taskGroupConfigurationList) {
            int taskGroupId = config.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
            LocalTGCommunicationManager.registerTaskGroupCommunication(taskGroupId, new Communication());
        }
    }

    public void registerTaskCommunication(List<Configuration> taskConfigurationList) {
        for (Configuration taskConfig : taskConfigurationList) {
            int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
            this.taskCommunicationMap.put(taskId, new Communication());
        }
    }

    public Communication collectFromTask() {
        Communication communication = new Communication();
        communication.setState(State.SUCCEEDED);

        for (Communication taskCommunication :
                this.taskCommunicationMap.values()) {
            communication.mergeFrom(taskCommunication);
        }

        return communication;
    }

    public abstract Communication collectFromTaskGroup();

    public Map<Integer, Communication> getTGCommunicationMap() {
        return LocalTGCommunicationManager.getTaskGroupCommunicationMap();
    }

    public Communication getTGCommunication(Integer taskGroupId) {
        return LocalTGCommunicationManager.getTaskGroupCommunication(taskGroupId);
    }

    public Communication getTaskCommunication(Integer taskId) {
        return this.taskCommunicationMap.get(taskId);
    }
}


public class ProcessInnerCollector extends AbstractCollector {

    public ProcessInnerCollector(Long jobId) {
        super.setJobId(jobId);
    }

    @Override
    public Communication collectFromTaskGroup() {
        return LocalTGCommunicationManager.getJobCommunication();
    }

}
AbstractContainerCommunicator#Reporter

Reporter的主要功能是将收集到的信息上报给上级。dataX提供一个基类AbstractReporter和一个实现类ProcessInnerCollector.

  • ProcessInnerCollector#reportJobCommunication将job信息汇报给上级,job在dataX中是最上级,所以该方法没有操作。
  • ProcessInnerCollector#reportTGCommunication将TaskGroupContianer的信息汇报给上级,操作也很简单直接更新注册时分配给该TaskGroup的Communication(Map中的值)
public class ProcessInnerReporter extends AbstractReporter {

    @Override
    public void reportJobCommunication(Long jobId, Communication communication) {
        // do nothing
    }

    @Override
    public void reportTGCommunication(Integer taskGroupId, Communication communication) {
        LocalTGCommunicationManager.updateTaskGroupCommunication(taskGroupId, communication);
    }
}

具体实现类

StandAloneJobContainerCommunicator

StandAloneJobContainerCommunicator是AbstractContainerCommunicator一个实现类,主要处理JobContainer和TaskGroupContainer之间的信息传递。

  • 每个TaskGroupContainer通过StandAloneJobContainerCommunicator#registerCommunication注册
  • 注册之后TaskGroupContainer每隔一段时间通过StandAloneJobContainerCommunicator#Reporter#report向JobContainer发送自己的状态。
  • JobContainer每隔一段时间通过StandAloneJobContainerCommunicator#collect获取TaskGroup的信息。最后调用StandAloneJobContainerCommunicator#report向上级汇报,这里JobContainer已经是最上级了,向日志中输出先关信息即可。
    ###########StandAloneJobContainerCommunicator
    StandAloneJobContainerCommunicator是AbstractContainerCommunicator的另一个实现类,该类主要处理TaskGroupContainer和Task之间的信息,处理逻辑和StandAloneJobContainerCommunicator差不多。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。