dataX的流量控制

TaskGroupContainer中Task执行逻辑

这个和流量控制貌似没有关系,其实理解Task的执行逻辑对理解流量控制有很大的帮助。
TaskGroupContainer的架构:


image.png

TaskGroupContainer由JobContainer#Scheduler利用线程池启动,然后根据Channel的个数启动Task。一个Task会对应一个TaskExecutor,TaskExecutor包含一个Channel,一个ReaderRunner线程和一WriterRunner线程。

  • ReaderRunner通过自己内部的BufferedRecordExchanger向channel中push数据,
  • WriterRunner通过自己内部的BufferedRecordExchanger向Channel中pull数据。

BufferedRecordExchanger本身也带有一定的缓存功能,BufferedRecordExchanger是ReaderRunner和WriterRunner是线程私有的,同一时刻只有一个线程操作BufferedRecordExchanger,所以BufferedRecordExchanger不需要同步机制。

如果一次TaskExecutor执行失败会尝试shutdown这个TaskExecutor,TaskExecutor#shutdown会触发ReaderRunner#shutdwon和WriterRunner#shutdown最终完成对BufferedRecordExchanger和Channel中资源的清理。当有一个Task执行失败之后会清除其对应的TaskExecutor,如果Task配置了Fialover(失败自动切换)机制下次重新运行的时候,会重新构建一个TaskExecutor来执行当前的任务。。。并没有复用之前的TaskExecutor。

限流逻辑

可以看到做数据转换的地方是在Channel中,dataX中的限流是限制单个Task的对Channel的写入速度Channel的默认实现是MemoryChannel,将所有的数据保存在一个ArrayBlocking中。
Channel中相关数据指标:


image.png

可以看到dataX不仅配置了最大的Channel最大容纳的字节数和record数,还配置了写入速度byteSpeed的默认值是1MB/s, recordSpeed的默认值是10000条/s。具体限流逻辑是在Channel#statPush中,每次ReaderRunner执行push之后都会触发这个逻辑。currentCommunication是当前Channel注册对应的Task注册在TaskGroup的communication,lastCommunication是Channel的内部成员变量,用于保存上次push完之后所有数据的技术统计,包括读成功字节数、读成功record数、读失败字节数和读失败record数。每次push完数据之后,如果距上次时间超过了配置的flowControlInterval就会做流量监控,就是相关数据统计除以时间即可得到速度,如果当前的流量超过了配置的速度,ReaderRunner休眠下即可。

// Channel#statPush
private void statPush(long recordSize, long byteSize) {
        currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
                recordSize);
        currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
                byteSize);
        //在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数

        currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
        currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);

        boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
        boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
        if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
            return;
        }

        long lastTimestamp = lastCommunication.getTimestamp();
        long nowTimestamp = System.currentTimeMillis();
        long interval = nowTimestamp - lastTimestamp;
        if (interval - this.flowControlInterval >= 0) {
            long byteLimitSleepTime = 0;
            long recordLimitSleepTime = 0;
            if (isChannelByteSpeedLimit) {
                long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
                        CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
                if (currentByteSpeed > this.byteSpeed) {
                    // 计算根据byteLimit得到的休眠时间
                    byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
                            - interval;
                }
            }

            if (isChannelRecordSpeedLimit) {
                long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
                        CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
                if (currentRecordSpeed > this.recordSpeed) {
                    // 计算根据recordLimit得到的休眠时间
                    recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
                            - interval;
                }
            }

            // 休眠时间取较大值
            long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
                    recordLimitSleepTime : byteLimitSleepTime;
            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
                    currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
            lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
                    currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
            lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
                    currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
            lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
                    currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
            lastCommunication.setTimestamp(nowTimestamp);
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。