dataX中任务分配任务组的过程

分片完毕之后会在JobContianer种生成一系列的Task,因为dataX中执行任务的单元是任务组TaskGroup,在执行任务之前会将这些任务公平的分给分配给任务组TaskGroup。具体的分配逻辑是在JonAssignUtil#assignFairly,主要包含以下几个部分。

TaskGroup个数的确定

分配之前需要确定TaskGroup的个数

  • channelNumber 是分片这篇文章(https://www.jianshu.com/writer#/notebooks/47375836/notes/75672025
    )确定的needChannelNumber和taskNumber的较小值;
  • channelsPerTaskGroup是用户配置的core.container.taskGroup.channel的值;
  • taskGroupNumber = needChannelNumber/channelsPerTaskGroup
  • Math.ceil(1.0 *5 / 2) = 3
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
int channelsPerTaskGroup = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);

任务分组的过程

任务分组之前会进行分片,分片的时候会设置每个任务的reader.parameter.loadBalanceResourceMark属性,默认是每个任务对应的table所在数据库的IP。如果所有的任务都没有设置reader.parameter.loadBalanceResourceMark这个属性,那么在JonAssignUtil#assignFairly中会将所有任务的reader.parameter.loadBalanceResourceMark这个属性设置为"aFakeResourceMarkForLoadBalance"。
任务有了reader.parameter.loadBalanceResourceMark之后会将所有的任务放入一个LinkedHashMap<String, List<Integer>>,key为reader.parameter.loadBalanceResourceMark对应的值,value为key相同的任务的taskId组成的List。
有了上述的LinkedHashMap的之后便可以对任务进行分组了。

以一个例子进行说明分配的过程,源码的逻辑是在JobAssignUtil#doAssign

  1. 假设当前LinkedHashMap有3组值分别为:
    172.10.10.1 --> [0, 1, 2]
    172.10.10.2 --> [ 3, 4]
    172.10.10.3 --> [5, 6, 7]

  2. 当前的有4个任务组分别为group0, group1, group2和group3.

分配过程为:

  • 将任务0分配给groupo,将任务3分配给group1,将任务5分配给gorup2,将任务1分配给gorup3;
  • 将任务4分配给group1,将任务6分配给group2,将任务2分配给group3;将任务7分配给gorup3.

即以列优先遍历LinkedHashMap#values组成的这个二维数组,将遍历得到的task依次分配给group.
通常我们自己配置的时候会只有一个IP,那就是直接遍历一个一维数组,将遍历得到的task依次分配给group,可以看做是roundrobin了。

给每个任务组分配channel的个数

具体逻辑是在JobAssginUtil#adjustChannelNumPerTaskGroup,过程很简单:将用户配置的所有channel个数平均分配给所有的任务组,除不尽怎么办?那么会有几个任务组多分配一个channel。

// JobAssginUtil#adjustChannelNumPerTaskGroup
private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
        int taskGroupNumber = taskGroupConfig.size();
        int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
        int remainderChannelCount = channelNumber % taskGroupNumber;
        // 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;
        // (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup

        int i = 0;
        for (; i < remainderChannelCount; i++) {
            taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
        }

        for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
            taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
        }
    }

一个栗子

  • 设置job的channel=10: job.setting.speed.channel = 10 (配置10的目的是让taskNumber起作用);
  • 设置分片数=5:querySql模式弄5个sql即可;
  • 任务组对应的channel=2:core.container.taskGroup.channel = 2;
  • 任务组的个数为: (int) Math.ceil(1.0*5 / 2) = 3

配置文件如下

{
    "core": {
        "container": {
            "taskGroup": {
                "channel": 2
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                 "channel": 10
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": ["*"],
                        "connection": [
                            {
                                "querySql": ["select* from user where id = 1",
                                             "select* from user where id = 2",
                                             "select* from user where id = 3",
                                             "select* from user where id = 4",
                                             "select* from user where id = 5"
                                            ],
                                "jdbcUrl": ["jdbc:mysql://172.10.10.231:3306/test"]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.10.10.231:3306/test1",
                                "table": [
                                    "user1"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。