TaskGroupContainer中Task执行逻辑
这个和流量控制貌似没有关系,其实理解Task的执行逻辑对理解流量控制有很大的帮助。
TaskGroupContainer的架构:
TaskGroupContainer由JobContainer#Scheduler利用线程池启动,然后根据Channel的个数启动Task。一个Task会对应一个TaskExecutor,TaskExecutor包含一个Channel,一个ReaderRunner线程和一WriterRunner线程。
- ReaderRunner通过自己内部的BufferedRecordExchanger向channel中push数据,
- WriterRunner通过自己内部的BufferedRecordExchanger向Channel中pull数据。
BufferedRecordExchanger本身也带有一定的缓存功能,BufferedRecordExchanger是ReaderRunner和WriterRunner是线程私有的,同一时刻只有一个线程操作BufferedRecordExchanger,所以BufferedRecordExchanger不需要同步机制。
如果一次TaskExecutor执行失败会尝试shutdown这个TaskExecutor,TaskExecutor#shutdown会触发ReaderRunner#shutdwon和WriterRunner#shutdown最终完成对BufferedRecordExchanger和Channel中资源的清理。当有一个Task执行失败之后会清除其对应的TaskExecutor,如果Task配置了Fialover(失败自动切换)机制下次重新运行的时候,会重新构建一个TaskExecutor来执行当前的任务。。。并没有复用之前的TaskExecutor。
限流逻辑
可以看到做数据转换的地方是在Channel中,dataX中的限流是限制单个Task的对Channel的写入速度Channel的默认实现是MemoryChannel,将所有的数据保存在一个ArrayBlocking中。
Channel中相关数据指标:
可以看到dataX不仅配置了最大的Channel最大容纳的字节数和record数,还配置了写入速度byteSpeed
的默认值是1MB/s, recordSpeed
的默认值是10000条/s。具体限流逻辑是在Channel#statPush中,每次ReaderRunner执行push之后都会触发这个逻辑。currentCommunication
是当前Channel注册对应的Task注册在TaskGroup的communication,lastCommunication
是Channel的内部成员变量,用于保存上次push完之后所有数据的技术统计,包括读成功字节数、读成功record数、读失败字节数和读失败record数。每次push完数据之后,如果距上次时间超过了配置的flowControlInterval就会做流量监控,就是相关数据统计除以时间即可得到速度,如果当前的流量超过了配置的速度,ReaderRunner休眠下即可。
// Channel#statPush
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// 计算根据byteLimit得到的休眠时间
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// 计算根据recordLimit得到的休眠时间
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// 休眠时间取较大值
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}