分片完毕之后会在JobContianer种生成一系列的Task,因为dataX中执行任务的单元是任务组TaskGroup,在执行任务之前会将这些任务公平的分给分配给任务组TaskGroup。具体的分配逻辑是在JonAssignUtil#assignFairly,主要包含以下几个部分。
TaskGroup个数的确定
分配之前需要确定TaskGroup的个数
-
channelNumber
是分片这篇文章(https://www.jianshu.com/writer#/notebooks/47375836/notes/75672025
)确定的needChannelNumber和taskNumber的较小值; -
channelsPerTaskGroup
是用户配置的core.container.taskGroup.channel
的值; -
taskGroupNumber
=needChannelNumber
/channelsPerTaskGroup
- Math.ceil(1.0 *5 / 2) = 3
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
任务分组的过程
任务分组之前会进行分片,分片的时候会设置每个任务的reader.parameter.loadBalanceResourceMark
属性,默认是每个任务对应的table所在数据库的IP
。如果所有的任务都没有设置reader.parameter.loadBalanceResourceMark
这个属性,那么在JonAssignUtil#assignFairly中会将所有任务的reader.parameter.loadBalanceResourceMark
这个属性设置为"aFakeResourceMarkForLoadBalance"。
任务有了reader.parameter.loadBalanceResourceMark
之后会将所有的任务放入一个LinkedHashMap<String, List<Integer>>,key为reader.parameter.loadBalanceResourceMark
对应的值,value为key相同的任务的taskId组成的List。
有了上述的LinkedHashMap的之后便可以对任务进行分组了。
以一个例子进行说明分配的过程,源码的逻辑是在JobAssignUtil#doAssign
假设当前LinkedHashMap有3组值分别为:
172.10.10.1
--> [0, 1, 2]
172.10.10.2
--> [ 3, 4]
172.10.10.3
--> [5, 6, 7]当前的有4个任务组分别为group0, group1, group2和group3.
分配过程为:
- 将任务0分配给groupo,将任务3分配给group1,将任务5分配给gorup2,将任务1分配给gorup3;
- 将任务4分配给group1,将任务6分配给group2,将任务2分配给group3;将任务7分配给gorup3.
即以列优先遍历LinkedHashMap#values组成的这个二维数组
,将遍历得到的task依次分配给group.
通常我们自己配置的时候会只有一个IP
,那就是直接遍历一个一维数组,将遍历得到的task依次分配给group,可以看做是roundrobin了。
给每个任务组分配channel的个数
具体逻辑是在JobAssginUtil#adjustChannelNumPerTaskGroup,过程很简单:将用户配置的所有channel个数平均分配给所有的任务组,除不尽怎么办?那么会有几个任务组多分配一个channel。
// JobAssginUtil#adjustChannelNumPerTaskGroup
private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
int taskGroupNumber = taskGroupConfig.size();
int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
int remainderChannelCount = channelNumber % taskGroupNumber;
// 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;
// (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup
int i = 0;
for (; i < remainderChannelCount; i++) {
taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
}
for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
}
}
一个栗子
- 设置job的channel=10:
job.setting.speed.channel
= 10 (配置10的目的是让taskNumber起作用); - 设置分片数=5:querySql模式弄5个sql即可;
- 任务组对应的channel=2:
core.container.taskGroup.channel
= 2; - 任务组的个数为: (int) Math.ceil(1.0*5 / 2) = 3
配置文件如下
{
"core": {
"container": {
"taskGroup": {
"channel": 2
}
}
},
"job": {
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": ["*"],
"connection": [
{
"querySql": ["select* from user where id = 1",
"select* from user where id = 2",
"select* from user where id = 3",
"select* from user where id = 4",
"select* from user where id = 5"
],
"jdbcUrl": ["jdbc:mysql://172.10.10.231:3306/test"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://172.10.10.231:3306/test1",
"table": [
"user1"
]
}
]
}
}
}
]
}
}