Flink数据传输源码及原理

Flink数据交换源码及其原理

前言

对于里面内容我们需要一定的netty只是,在涉及网络交互的时候是基于netty的,所以不了解netty可能让我们难以理解其中逻辑

对于一些相关组件我们提前介绍一下,后面会常提到

ResultPartition: 算子的数据会写入ResultPartition,包含多个ResultSubPartition

ResultSubPartition : 每个task消费一个ResultSubPartition,task不会只消费一个ResultSubPartition

InputGate : 对数据输入的封装,Gate(翻译 : 门)就很容易理解,其实InputGate主要还是inputChannel的封装,InputGate中包含多个InputChannel

InputChannel : 实际工作的内容,分为本地和远程,每一个InputChannel接收一个ResultSubPartition输出

Task构建简单流程

我们从构建出来Task之后开始读取数据发送数据的流程,今天聊一聊Task是如何传递数据,这里是讨论数据交互前所需要的组件构建的过程以及交互过程

在chain中直接调用下游算子的processElement方法即可,如果是taskManger和跨网络中,会对数据进行序列化以及反序列写入到buffer(buffer包含一个MemorySegment)中,会通过bufferBuilder来讲数据写入到MemorySegment中,与BufferBuilder想对应的时候BufferConsumer位于下游task,负责读取MemorySegment的数据,一个bufferBuilder对应一个BufferConsumer

// 这是flink调度任务的一个具体执行类,继承runnable,表示一个task由一个线程执行
public  class  Task implements 
                                 Runnable,TaskSlotPayload,TaskActions,
                                     PartitionProducerStateProvider,
                   CheckpointListener,BackPressureSampleableTask{    }

    // Task.run,可以看到直接调用doRun方法,doRun方法也是主要逻辑存在的地方
  public void run() {
        try {
            doRun();
        } finally {
            terminationFuture.complete(executionState);
        }
    }

  // 这里主要保留了 最主要的几行代码,其他的都删掉了,这里通过invokable调用invoke方法来真正的启动任务
    // AbstractInvokable 一些实现类 StreamTask DataSrouceTask等.
    // 我们通过api的方式source生成SourceStreamTask,其他算子根据不同的输入生成不同的StreamTask
private void doRun() {
                    AbstractInvokable invokable = null;
            // 构建AbstractInvokable,实际上就是StreamTask,当然我说的是我这里的栗子,不同的模式可能会创建不同的AbstractInvokable,里面通过具体nameOfInvokableClass全限定类名来反射构建具体的AbstractInvokable对象,比如这里构建的是OneInputStreamTask
            invokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
            this.invokable = invokable;
            // 调用invoke开始执行
            invokable.invoke();
    }


// 在进入invoke的方法前,我们看看AbstractInvokable是如何构建出来的,以OneInputStreamTask为栗子
// 这里通过反射的方式构建了传入的Environment
  public OneInputStreamTask(Environment env) throws Exception {
            // 调用父类构造
        super(env);
    }
// 父类构造
protected StreamTask(Environment env) throws Exception {
        this(env, null);
    }
// 层层调用在这里注意一点
protected StreamTask(
            Environment environment,
            @Nullable TimerService timerService,
            Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
            StreamTaskActionExecutor actionExecutor)
            throws Exception {
        this(
                environment,
                timerService,
                uncaughtExceptionHandler,
                actionExecutor,
                    // 创建了一个Mailbox对象,后面task的数据接收和发送都依赖于该对象,并将构建当前线程作为参数传入
                    // 表示当前线程才可以读取mail --- 毕竟这是属于你的邮件 
                new TaskMailboxImpl(Thread.currentThread()));
    }

// 最终执行到该构造方法中
// 只标注重点内容
  protected StreamTask(
            Environment environment,
            @Nullable TimerService timerService,
            Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
            StreamTaskActionExecutor actionExecutor,
            TaskMailbox mailbox)
            throws Exception {
                // 调用父类构造即AbstractInvokable
        super(environment);

        this.configuration = new StreamConfig(getTaskConfiguration());
    
            // TODO 重点 后面需要
            // private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
            // 构建recordWriter,实际构建的是RecordWriterDelegate,通过getRecordWriter方法获取具体的recordWriter
            // 下面介绍怎么构建出来recordWriter的
        this.recordWriter = createRecordWriterDelegate(configuration, environment);
    
            // 处理基于mailbox之外的一些动作,比如发送事件等动作
        this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
    
            // TODO 重点
            // 构建mailbox处理器,用于处理mailbox收到的mail信息
            // 参数1:MailboxDefaultAction实现类,由lambda表达式方式编写,即调用当前类的processInput方法(即OneInputStreamTask)
            // 参数2:mailbox   --- 处理邮箱,你首先得有一个邮箱对吧
            // 参数3:处理邮箱的线程
        this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
        this.mailboxProcessor.initMetric(environment.getMetricGroup());
            // 用于执行mail的executor,类似于java的线程池,但不医院
        this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
        this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
            // 异步处理snapshot的线程,嘿嘿嘿
        this.asyncOperationsThreadPool =
                Executors.newCachedThreadPool(
                        new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));

        this.stateBackend = createStateBackend();
                // task处理checkpoint的conordinator
        this.subtaskCheckpointCoordinator =
                new SubtaskCheckpointCoordinatorImpl(
                        stateBackend.createCheckpointStorage(getEnvironment().getJobID()),
                        getName(),
                        actionExecutor,
                        getCancelables(),
                        getAsyncOperationsThreadPool(),
                        getEnvironment(),
                        this,
                        configuration.isUnalignedCheckpointsEnabled(),
                        this::prepareInputSnapshot);

        if (timerService == null) {
            ThreadFactory timerThreadFactory =
                    new DispatcherThreadFactory(
                            TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
            this.timerService =
                    new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory);
        } else {
            this.timerService = timerService;
        }
                
            // 用于处理io的线程,比如恢复state
        this.channelIOExecutor =
                Executors.newSingleThreadExecutor(
                        new ExecutorThreadFactory("channel-state-unspilling"));

        injectChannelStateWriterIntoChannels();
    }

// 在上面我们已经了解了streamTask的构造方法,现在我们把recordWirter构建的过程介绍一下
// StreamTask.createRecordWriterDelegate
    public static <OUT>
            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
                    createRecordWriterDelegate(
                            StreamConfig configuration, Environment environment) {
        List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites =
                // 构建task对应的writers,通常是1个
                createRecordWriters(configuration, environment);
        // 判断writer的输出数量,一般情况下就是1 也就是构建的都是SingleRecordWriter
        if (recordWrites.size() == 1) {
            // 直接获取对其包装一下即可
            return new SingleRecordWriter<>(recordWrites.get(0));
        } else if (recordWrites.size() == 0) {
            return new NonRecordWriter<>();
        } else {
            // 比如
            // 在下面中,map生成的时候就会生成MultipleRecordWriters
            // DataStreamSource<Object> source =....'
            // SingleOutputStreamOperator<U> map =source.map;
            // source.connect(map);
            return new MultipleRecordWriters<>(recordWrites);
        }
    }

// 构建task对应的recordWriter
// StreamTask.createRecordWriters
private static <OUT>
  List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
  StreamConfig configuration, Environment environment) {
  List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
    new ArrayList<>();
  // 找到operator的所有出边
  List<StreamEdge> outEdgesInOrder =
    configuration.getOutEdgesInOrder(
    environment.getUserCodeClassLoader().asClassLoader());
    
  // 遍历出边集合,为每一个出边构建一个recordWriter
  for (int i = 0; i < outEdgesInOrder.size(); i++) {
    StreamEdge edge = outEdgesInOrder.get(i);
    recordWriters.add(
      // 构建recordWriter
      // 具体不看了,简单介绍一下
      // 获取task对应的partitioner,获取对应的ResultPartitionWriter,然后通过构造者模式构建RecordWriter
      // 在内部会判断是不是广播的,如果是广播的则构建BroadcastRecordWriter,否则是ChannelSelectorRecordWriter(基本都是他)
      // ChannelSelectorRecordWriter的构造过程中最终要的两个参数ResultPartitionWriter, ChannelSelector
      createRecordWriter(
        edge,
        i,
        environment,
        environment.getTaskInfo().getTaskName(),
        edge.getBufferTimeout()));
  }
  return recordWriters;
}


在上面我们已经了解到了task的构建和recordWriter的构建了,现在我们要进入invoke方法,来看看怎么执行的了,当invoke调用的时候,Task会做一些执行前的准备工作,然后真正的开始调用userFunction读取数据发送数据的过程了,那么我们现在开始通过StreamTask来看内部如何实现的

// StreamTask是抽象类
// StreamTask.invoke方法
@Override
public final void invoke() throws Exception {
    try {
        // 最主要的两个方法
      
        // 在里面会初始化 input和output,来明确数据的输入和输出
        // 通过不同的输入输出来确定数据的交互方式,比如线程内,taskManager的task之间或者基于网络传输
        beforeInvoke();
        
        // 运行mail,开始持续的读取数据发送下游,后面看
        runMailboxLoop();
    }
  
    // 不重要代码去掉了
  protected void beforeInvoke() throws Exception {
            // 重点
            // 构建operatorChain,我们需要深入去看,        recordWriter在构造方法是构建的,前面已经讲解
        operatorChain = new OperatorChain<>(this, recordWriter);
            // 即运行在task里的具体的operator
        mainOperator = operatorChain.getMainOperator();
                
            // 做一些task的初始化工作
        actionExecutor.runThrowing(() -> {
                    // 读取state的reader
                    SequentialChannelStateReader reader =
                            getEnvironment().getTaskStateManager()
                                    .getSequentialChannelStateReader();
                    reader.readOutputData(getEnvironment().getAllWriters(), false);
                                        // operator初始化,会调用的算子的open方法
                    operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
                                        // 读取state数据
                    channelIOExecutor.execute(() -> {
                                try {
                                    reader.readInputData(getEnvironment().getAllInputGates());
                                } catch (Exception e) {
                                    asyncExceptionHandler.handleAsyncException(
                                            "Unable to read channel state", e);
                                }
                            });
                });
        isRunning = true;
    }
  

  // beforeInvoke的最终要一点就是构建的OperatorChain,我们一起深入看看operatorChain的构造方法
  // 代码很多,由于我们只关注数据交换的内容,其他的地方不做讲解
  public OperatorChain(
    StreamTask<OUT, OP> containingTask,
    RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {

    this.operatorEventDispatcher =
      new OperatorEventDispatcherImpl(
      containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
      containingTask.getEnvironment().getOperatorCoordinatorEventGateway());

    final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
    final StreamConfig configuration = containingTask.getConfiguration();

    StreamOperatorFactory<OUT> operatorFactory =
      configuration.getStreamOperatorFactory(userCodeClassloader);

    // we read the chained configs, and the order of record writer registrations by output name
    Map<Integer, StreamConfig> chainedConfigs =
      configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);

    // create the final output stream writers
    // we iterate through all the out edges from this job vertex and create a stream output
    List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
    Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
      new HashMap<>(outEdgesInOrder.size());
    this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];

    // from here on, we need to make sure that the output writers are shut down again on failure
    boolean success = false;
    try {
      createChainOutputs(
        outEdgesInOrder,
        recordWriterDelegate,
        chainedConfigs,
        containingTask,
        streamOutputMap);

      // we create the chain of operators and grab the collector that leads into the chain
      List<StreamOperatorWrapper<?, ?>> allOpWrappers =
        new ArrayList<>(chainedConfigs.size());
      this.mainOperatorOutput =
            // 重点
            // 构建output,即数据的输出
        createOutputCollector(
        containingTask,
        configuration,
        chainedConfigs,
        userCodeClassloader,
        streamOutputMap,
        allOpWrappers,
        containingTask.getMailboxExecutorFactory());

      if (operatorFactory != null) {
        Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
          StreamOperatorFactoryUtil.createOperator(
          operatorFactory,
          containingTask,
          configuration,
          mainOperatorOutput,
          operatorEventDispatcher);

        OP mainOperator = mainOperatorAndTimeService.f0;
        mainOperator
          .getMetricGroup()
          .gauge(
          MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
          mainOperatorOutput.getWatermarkGauge());
        this.mainOperatorWrapper =
          createOperatorWrapper(
          mainOperator,
          containingTask,
          configuration,
          mainOperatorAndTimeService.f1,
          true);

        // add main operator to end of chain
        allOpWrappers.add(mainOperatorWrapper);

        this.tailOperatorWrapper = allOpWrappers.get(0);
      } else {
        checkState(allOpWrappers.size() == 0);
        this.mainOperatorWrapper = null;
        this.tailOperatorWrapper = null;
      }

      this.chainedSources =
        createChainedSources(
        containingTask,
        configuration.getInputs(userCodeClassloader),
        chainedConfigs,
        userCodeClassloader,
        allOpWrappers);

      this.numOperators = allOpWrappers.size();

      firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);

      success = true;
    } finally {
      // make sure we clean up after ourselves in case of a failure after acquiring
      // the first resources
      if (!success) {
        for (RecordWriterOutput<?> output : this.streamOutputs) {
          if (output != null) {
            output.close();
          }
        }
      }
    }
  }

 // 在这里面构建output
 // OperatorChain.createOutputCollector
    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
            MailboxExecutorFactory mailboxExecutorFactory) {
        List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
                new ArrayList<>(4);

        // 当调用了startNewChain或进行chain之后会进入这里,构建非chain的output
        // 这里的需要跨网络传输或者线程之间传输
        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
            RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }

        // 默认情况下开启chain,都会进入到该逻辑,生成对应的output
        // 开启chain会进入到这个循环,这个循环里构建的output都是在一个task中,即一个线程中执行
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
                        /*
            这里简单介绍一下这个里的递归操作
                假设 : chain1 =>  source->map1->map2   | chain2 flatMap->sink
                          我们跟踪chain1的构建过程
                当构建operatorChain的output时候,首先构建source的output,由于会进入chained的循环,开始递归,进入map1的递归,
                此时map1的也会进入chained的循环,继续递归,此时构建map2的output,由于map2后面是非chained(因为是新的chain)一
                起的,所以此时map2进入进入nonChained的循环,构建出需要网络的传输的output,构建完成map2的output,开始弹栈,对之
                前入栈的chained的构建output的方法,开始完成递归的构造output
                
 结果 :    
 调用 :    input.processElement()        input.processElement()      (包装了RecordWriterOuput)序列化发送buffer
chain1 :  source(CopyingChainingOutput)->map1(CopyingChainingOutput)->map2(CountingOuput(RecordWriterOuput)) 
            */
            WatermarkGaugeExposingOutput<StreamRecord<T>> output =
                        // 这个方法会递归的调用createOutputCollector(当前方法)方法
                        // 因为需要对chain在一起的每个operator创建一个对应的output
                        // 具体构建代码我们就不看了,简单介绍一下
                        // 内部条件判断,是否开启了reuse,如果开启了则创建ChainingOutput,否则创建CopyingChainingOutput
                        // CopyingChainingOutput(通过StreamRecord.copy创建一个新的)和ChainingOutput(复用StreamRecord)
                    createOperatorChain(
                            containingTask,
                            chainedOpConfig,
                            chainedConfigs,
                            userCodeClassloader,
                            streamOutputs,
                            allOperatorWrappers,
                            outputEdge.getOutputTag(),
                            mailboxExecutorFactory);
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }
                
      // 下面进行返回,如果上面构建了output,基本都会进入该逻辑,如果广播/sink则会进行下面的逻辑
        if (allOutputs.size() == 1) {
            return allOutputs.get(0).f0;
        } else {
         
            Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
            for (int i = 0; i < allOutputs.size(); i++) {
                asArray[i] = allOutputs.get(i).f0;
            }
                        // 如果是sink asArray一定是0,所以下面的collector不会输出任何结果,嘿嘿嘿
            if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                return closer.register(new CopyingBroadcastingOutputCollector<>(asArray, this));
            } else {
                return closer.register(new BroadcastingOutputCollector<>(asArray, this));
            }
        }
    }




通过上面我们已经了解到了数据传输所需要的相关组件,现在根据不同的交互方式来观察一下数据的走向

一. 线程内数据传递

在上面我们看到了构建output的过程,那么数据发送到下游就是通过output来完成,在上面了解都开启operatorChain生成output是ChainingOutput,否则创建CopyingChainingOutput,那我们就看看不复用streamRecord的CopyingChainingOutput

// 未开启reuse的output,这里面内容比较简单
final class CopyingChainingOutput<T> extends ChainingOutput<T> {

    private final TypeSerializer<T> serializer;

    public CopyingChainingOutput(
            OneInputStreamOperator<T, ?> operator,
            TypeSerializer<T> serializer, // 用于序列化的
            OutputTag<T> outputTag,
            StreamStatusProvider streamStatusProvider) {
        // 调用父类的构造即ChainingOutput,operator复制给成员变量input
        // 注意 : input对应的就是下游算子
        //    这么理解 :   op1 -> output -> input -> op2 -> output
        super(operator, streamStatusProvider, outputTag);
        this.serializer = serializer;
    }
        
    // collector.collect方法,毕竟是子类,想想我们在算子中,向下游发送数据,比如process中也是通过out.collect方法
    @Override
    public void collect(StreamRecord<T> record) {
        if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }
                // 具体的发送到下游的方法
        pushToOperator(record);
    }

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
            // we are not responsible for emitting to the side-output specified by this
            // OutputTag.
            return;
        }

        pushToOperator(record);
    }

    @Override
    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            StreamRecord<T> castRecord = (StreamRecord<T>) record;
            // metric内容
            numRecordsIn.inc();
            // 调用record的copy方法获取一个新的record,里面是new了一个新的record
            // 这里调用了序列化的方法,嘿嘿 后面想起来了就一起看看
            StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
            input.setKeyContextElement(copy);
            // 直接调用下游的processElement方法,所以不需要对数据进行序列化和反序列
            input.processElement(copy);
        } catch (ClassCastException e) {
            if (outputTag != null) {
                // Enrich error message
                ClassCastException replace =
                        new ClassCastException(
                                String.format("%s. Failed to push OutputTag with id '%s' to operator. "+ "This                                      can occur when multiple OutputTags with different types "+ "but identical names are being used.",
                                              e.getMessage(), outputTag.getId()));

                throw new ExceptionInChainedOperatorException(replace);
            } else {
                throw new ExceptionInChainedOperatorException(e);
            }
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
}

// map算子的包装类,在同一个chain中,会直接调用算子的processElement方法
// StreamMap.processElement
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
  // 接收到record后直接调用userFunction的map方法,然后将结果发送下游
  output.collect(element.replace(userFunction.map(element.getValue())));
}

可以看到对于在operatorChain中数据传递就是直接调用下游的processElement方法,当位于operatorChain最后一个的operator,他会持有一个RecordWriterOutput,将数据发送到下一个task(chained/operator),对于数据交互,我们分别通过数据的输入和数据的输出来进行讲解

二. TaskManager内Task数据传递

虽然Task都在一个JVM进程内,但是数据在传递的过程中也是需要进行序列化和反序列化

在同一个TaskManger内Task传递数据主要通过一下几个组件完成的,ResultSubPartition,InputGate,InputChannel ,BufferBuilder
流程 :

上游算子持续写入数据,将数据写入到ResultSubPartition中,ResultSubPartition接收到数据会通知InputGate,InputGate将InputChannel加入的可用的Channel队列中,下游算子通过channel.pollNext()读取数据进行消费交给用户的代码执行,对于简述的流程我们在下面会看到对应实现

1.数据输出

接上上面,在chain中,一层一层调用下游的processElement方法,直到chain的最后一个operator的output是RecordWriterOutput,该operator通过该output发送到下游(当然不是真的发送到下游,而是发送到subpartition中)

// 直接进入源码,看看数据到底是怎么发送的

public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

  public RecordWriterOutput(
    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
    TypeSerializer<OUT> outSerializer,
    OutputTag outputTag,
    StreamStatusProvider streamStatusProvider,
    boolean supportsUnalignedCheckpoints) {
    // 真正干活的
    // flink在stream中数据传递的抽象通过record表示,该writer用于将数据序列化并写入的subPartitin中
    this.recordWriter =(RecordWriter<SerializationDelegate<StreamElement>>) (RecordWriter<?>) recordWriter;
    // 数据序列化的包装器,里面包装了数据类型的序列化器
    TypeSerializer<StreamElement> outRecordSerializer = new StreamElementSerializer<>(outSerializer);
    if (outSerializer != null) {
      serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
    }
    // 忽略非重点....
  }

  @Override
  public void collect(StreamRecord<OUT> record) {
    if (this.outputTag != null) {
      return;
    }
    // 真正干活的方法
    pushToRecordWriter(record);
  }

  private <X> void pushToRecordWriter(StreamRecord<X> record) {
    // 将数据设置到序列化包装器的instance中
    serializationDelegate.setInstance(record);
    try {
      // 通过recordWriter将包装器发送 
      // recordWriter的实现类是ChannelSelectorRecordWriter
      recordWriter.emit(serializationDelegate);
    } catch (Exception e) {
      throw new RuntimeException(e.getMessage(), e);
    }
  }
  // 一些事件相关的发送逻辑此处省略............
}



// --------------------------------------------------------------------------------------------

// 简单先了解一下ChannelSelector,主要返回一个int值,该值代表的是数据发送下游的具体channel的索引
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
  @Override
  public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    // 一个轮训的算法
    nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
    return nextChannelToSendTo;
  }
}

// 通过上面的逻辑我们可以看到,collect方法最终调用了recordWriter.emit()方法进行发送数据,那么我们进入具体的实现类看看细节

// 顾名思义,通过名字我们大致可以猜到,对于record的发送,我们需要选择发送到对应的channel中
public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {

  @Override
  public void emit(T record) throws IOException {
    // 方法负载,调用父类emit方法
    // 通过selectorChannel方法计算要发送到的具体channel的索引.
    // 这其实就是数据的分发策略了,比如shuffle,keyby等,表示数据的发送策略,发送哪个channel
    emit(record, channelSelector.selectChannel(record));
  }

  // RecordWriter.emit()
  protected void emit(T record, int targetSubpartition) throws IOException {
    // targetPartition = PipelinedResultPartition
    // serializeRecord方法传入序列化器和record,将record使用指定的序列化器进行序列化会返回一个java的ByteBuffer
    // 通过emitRecord方法发送数据
    targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);

    // 数据传输中,肯定是一次发送批量性能会更高,所以在flink中也会以这种形式发送数据
    // 在父类中RecordWriter有个内部类(OutputFlusher),继承Thread并重写run方法,在父类构造方法中构建此对象,
    // 如果传入的timeout>0则创建OutputFlusher对象否则不创建直接走下面判断逻辑,在run方法中会循环调用
    // Thread.sleep(timeout)进行睡眠,当睡眠结束后调用flushAll()
    // flushAlways=(timeout==0)的时候,每接收一条数据发送一条数据
    // timeout时间配置 DataStream.setBufferTimeout();
    if (flushAlways) {

      targetPartition.flush(targetSubpartition);
    }
  }
}


// 进入 BufferWritingResultPartition.emitRecord()
// 这个用于将数据放入buffer中在resultPartitin中,有一个localBufferPool,缓存这可用的buffer,
// 当buffer不可用的时候请求全局的networkBufferPool来申请可用的buf
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
  // 通过appendUnicastDataForNewRecord方法申请buffer,具体代码不看了,有兴趣自己研究一下
  // 简单介绍一下
  // 内部通过LocalBuuferPool来申请buffer,在申请buffer时候如果local的用完,则申请network的buffer
  // 如果network的buffer申请超过限制或者network没有可用buffer则会阻塞等待,直到有可用buffer,联想一下`反压`
  // 当申请到buffer之后会构建一个BufferBuilder,然后调用对应的targetSubpartition的add方法传入一个
  // BufferConsumer(通过buffer内部方法构建的),在add方法中会调用notifyDataAvailable()方法,最终会调用
  // 到channel的notifyDataAvailable()方法开始触发消费
  // 上面走完后,将record写入的buffer中
  // 该方法我们在数据输入的时候进行讲解
  BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);

  // 对于上面的数据写入到buffer后,可能会出现下面三种情况,每种情况对应不同的处理方式
  
  // buffer满了,但是record之存入了一部分
  while (record.hasRemaining()) {
    // full buffer, partial record
    // 将buffer进行finish,表示buffer不可在写入
    finishUnicastBufferBuilder(targetSubpartition);
    // 重亲申请一个buffer,写入剩余的数据,直到退出循环
    buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
  }
    
  // buffer满了,record也全部存入了
  if (buffer.isFull()) {
    // full buffer, full record
    // 将buffer进行finish,表示buffer不可在写入
    finishUnicastBufferBuilder(targetSubpartition);
  }
    
  // buffer没满,但record全部存入了
  // 什么也不做,只要record是完整的存入就可以了
  // partial buffer, full record
}

// 到此record已经写入到buffer,等待下游去消费数据了

2.数据输入

// 对于上面,我们已经了解到数据是如何写入到buffer(`在SubPartition中`)中的了,

// 那么我们接着上面是如何通知下游去触发消费的

// 该方法在emitRecord()调用
// BufferWritingResultPartition.appendUnicastDataForNewRecord()
private BufferBuilder appendUnicastDataForNewRecord(
  final ByteBuffer record, final int targetSubpartition) throws IOException {
  // 获取bufferBuilder,当数据写完之后没有写满buffer
  BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
  // 如果等于null则说明已经写满发送了,或者刚初始化的时候
  if (buffer == null) {
    buffer = requestNewUnicastBufferBuilder(targetSubpartition);
    // 通知数据可用的方法在add中被调用
    // 添加buffer消费者,                                                     构建一个buffer消费者,  0 表示要跳过的数据长度
    subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), 0);
  }
  // 追加的形式写入数据
  buffer.appendAndCommit(record);
  return buffer;
}

// PipelinedSubpartition.add()
@Override
public boolean add(BufferConsumer bufferConsumer, int partialRecordLength) {
  return add(bufferConsumer, partialRecordLength, false);
}
// 继续走
private boolean add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {
  checkNotNull(bufferConsumer);
  synchronized (buffers) {
    // 构建一个BufferConsumerWithPartialRecordLength添加到buffers中,在后面会用到
    if (addBuffer(bufferConsumer, partialRecordLength)) {
                prioritySequenceNumber = sequenceNumber;
            }
  }
  //    其他内容省略,不是终点,通过notifyDataAvailable()方法,来通知数据可用,可以被消费了
  if (notifyDataAvailable) {
    notifyDataAvailable();
  }
  return true;
}
// 下面就是一长串的调用
private void notifyDataAvailable() {
  final PipelinedSubpartitionView readView = this.readView;
  if (readView != null) {
    readView.notifyDataAvailable();
  }
}

// PipelinedSubpartitionView
@Override
public void notifyDataAvailable() {
  availabilityListener.notifyDataAvailable();
}
// LocalInputChannel
@Override
public void notifyDataAvailable() {
  notifyChannelNonEmpty();
}
// InputChanne
protected void notifyChannelNonEmpty() {
  // 传入了this,表示该channel有数据了,可以被消费
  inputGate.notifyChannelNonEmpty(this);
}
// SingleInputChanne
void notifyChannelNonEmpty(InputChannel channel) {
  queueChannel(checkNotNull(channel), null);
}
// 最终会调用SingleInputGate.queueChannel
// 代码走到这里在后面都是有OutputFlusher线程去执行到的(不是绝对的),
// 如果不记得了可以看看上面有RecordWriter内部的flush线程做的
// 当代码走完了,说明有数据的channel已经被放入队列中,执行当前task的线程会开始
// 读取数据通过pollnext方法
private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber) {
  try (GateNotificationHelper notification =
       new GateNotificationHelper(this, inputChannelsWithData)) {
    synchronized (inputChannelsWithData) {
      boolean priority = prioritySequenceNumber != null;
      if (priority
          && isOutdated(
            prioritySequenceNumber,
            lastPrioritySequenceNumber[channel.getChannelIndex()])) {
        // priority event at the given offset already polled (notification is not atomic
        // in respect to
        // buffer enqueuing), so just ignore the notification
        return;
      }

      // 尝试将channel放入inputChannelsWithData队列
      // 如果放入队列,
      if (!queueChannelUnsafe(channel, priority)) {
        return;
      }

      if (priority && inputChannelsWithData.getNumPriorityElements() == 1) {
        notification.notifyPriority();
      }
      // 如果队列有channel可用,调用该方法,方法内部会调用inputChannelsWithData.notifyAll唤醒wait的线程
      // inputChannelsWithData 里面存的是可用的channel,即该channel有数据
      if (inputChannelsWithData.size() == 1) {
        notification.notifyDataAvailable();
      }
    }
  }
}

// 上面有数据的channel已经被加入到队列了,对于当前task需要从队列拉取到channel来消费数据即可
// 在上面构建task的时候我们构建了邮箱(mailbox)以及相关的处理程序
// 那么task是怎么运行起来的呢,我们聊一下
// 在invokeable调用invoke()后,我们在里面先调用了beforeInvoke构建operatorChain调用open等初始化的逻辑
// 然后调用runMailboxLoop()方法真的开始执行task
// 我们直接进入该方法看看
public void runMailboxLoop() throws Exception {
  // 循环mailbox处理器
  mailboxProcessor.runMailboxLoop();
}

//MailboxProcessor.runMailboxLoop()
public void runMailboxLoop() throws Exception {
  final TaskMailbox localMailbox = mailbox;
  Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");
  assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
  final MailboxController defaultActionContext = new MailboxController(this);
    
  // 基于邮箱的线程模型消费数据,这里不展开
  while (isMailboxLoopRunning()) {
    // default action 可用之前会阻塞
    processMail(localMailbox, false);
    if (isMailboxLoopRunning()) {
      // 执行默认的action,里面执行的任务,在构建task的时候,构建的通过lambda表达式传入了 this::processInput
      
      // 执行StreamTask.processInput()方法
      mailboxDefaultAction.runDefaultAction(defaultActionContext); 
    }
  }
}

// StreamTask.processInput()
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
      // 调用输入处理器的processInput方法进行数据的读取操作,inputPorcessor在初始化的时候调用构造的
      // 这里进入方法里
        InputStatus status = inputProcessor.processInput();
        if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
            return;
        }
        if (status == InputStatus.END_OF_INPUT) {
            controller.allActionsCompleted();
            return;
        }
        CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
        MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
        assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
    }

//StreamOneInputProcessor.processInput()
   public InputStatus processInput() throws Exception {
     // 这里又是 input和output,可能不太好理解, 可以想一想flink的物理执行图的样子
  
     // input = inputGate  这里表示数据的输入,上游算子将数据发送到subPartition,inputGate的inputChannel消费其数据
     //          数据是序列化后的buffer
     // output = 用于将数据拿到反序列化后的record,发送给operator,output持有operator的引用
         // 数据流向      
     // op1(map) --> subPartition --> inputGate --> output --> op2(flatMap) --> subPartition
     
     // 将input的数据通过output传递
        InputStatus status = input.emitNext(output);

        if (status == InputStatus.END_OF_INPUT) {
            endOfInputAware.endInput(input.getInputIndex() + 1);
        }

        return status;
    }

// StreamTaskNetworkInput.emitNext()
  public InputStatus emitNext(DataOutput<T> output) throws Exception {
            
    // 可以看到这里是一个循环,正常的数据读取会循环两次
    // 第一次进入processBuffer方法,currentRecordDeserializer对其赋值
    // 第二次将reocrd反序列化进行发送下游
        while (true) {
            // get the stream element from the deserializer
            // 
            if (currentRecordDeserializer != null) {
                DeserializationResult result;
                try {
                    // 将数据 序列化到deserializationDelegate
                    // deserializationDelegate里面会通过对buf的数据反序列化,复制到instance变量
                    result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
                } catch (IOException e) {
                    throw new IOException(
                            String.format("Can't get next record for channel %s", lastChannel), e);
                }
                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }
                                // 如果结果是一个完成的record,那么直接发送下游不需要在等新的buffer了
                if (result.isFullRecord()) {
                    // 想下游发送数据
                    // 实际就是 output.emitRecord(reocrd) -> operator.processElement  假设 : op = streamMap
                    // 到这里就完成了数据的发送
                    processElement(deserializationDelegate.getInstance(), output);
                    return InputStatus.MORE_AVAILABLE;
                }
            }
                        
          // 从inputGate中获取buffer,
            Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
            if (bufferOrEvent.isPresent()) {
                // return to the mailbox after receiving a checkpoint barrier to avoid processing of
                // data after the barrier before checkpoint is performed for unaligned checkpoint
                // mode
                if (bufferOrEvent.get().isBuffer()) {
                    // 处理record的buf
                    // 并获取对应的序列化器赋值给currentRecordDeserializer
                    processBuffer(bufferOrEvent.get());
                } else {
                  // 处理事件的buf
                    processEvent(bufferOrEvent.get());
                    return InputStatus.MORE_AVAILABLE;
                }
            } else {
                if (checkpointedInputGate.isFinished()) {
                    checkState(
                            checkpointedInputGate.getAvailableFuture().isDone(),
                            "Finished BarrierHandler should be available");
                    return InputStatus.END_OF_INPUT;
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
    }

//CheckpointedInputGate.pollNext()
//CheckpointedInputGate[InputGateWithMetrics[SingleInputGate]],想想装饰者模式,每次封装都是对其功能的增强
// 我们需要看buffer是怎么拿到的
   public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
     // 我们直接进入SingleInputGate.pollNext方法
        Optional<BufferOrEvent> next = inputGate.pollNext();
                // 省略无用代码
        return next;
    }

//SingleInputGate.pollNext()
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
      // false表示是否要阻塞
        return getNextBufferOrEvent(false);
    }


   private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking)
            throws IOException, InterruptedException {
     // 省略无用代码

        Optional<InputWithData<InputChannel, BufferAndAvailability>> next =
          // 主要获取buf数据的方法
                waitAndGetNextData(blocking);

        InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
        return Optional.of( transformToBufferOrEvent( // 构建BufferOrEvent对象
                        inputWithData.data.buffer(),
                        inputWithData.moreAvailable,
                        inputWithData.input,
                        inputWithData.morePriorityEvents));
    }

// 如果在传入blocking==true,如果没有可用channel的情况下则进入wait
// 这里buffer获取到了,然后就是一层层的封装过程,返回到最后就是一个BufferOrEvent

// 至此 buffer获取到,开始数据的解析,反序列化的过程
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(
            boolean blocking) throws IOException, InterruptedException {
        while (true) {
            synchronized (inputChannelsWithData) {
              // 从inputChannelsWithData队列获取可用channel,该队列前面提到过,如果blocking=true
              // 在没有可用的channel的时候会进入wait状态让出lock
                Optional<InputChannel> inputChannelOpt = getChannel(blocking);
                if (!inputChannelOpt.isPresent()) {
                    return Optional.empty();
                }

                final InputChannel inputChannel = inputChannelOpt.get();
              // inputChannel = LocalInputChannel
              // LocalInputChannel持有subpartitionView引用.可以直接从subpartitionView的buffers获取buf
              // 在上面addBuffer中添加的
              // 在getNextBuffer中重新包装buffer,包装为BufferAndAvailability对象
                Optional<BufferAndAvailability> bufferAndAvailabilityOpt =
                        inputChannel.getNextBuffer();

                if (!bufferAndAvailabilityOpt.isPresent()) {
                    checkUnavailability();
                    continue;
                }

                final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
                if (bufferAndAvailability.moreAvailable()) {
                    // enqueue the inputChannel at the end to avoid starvation
                    queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
                }

                final boolean morePriorityEvents =
                        inputChannelsWithData.getNumPriorityElements() > 0;
                if (bufferAndAvailability.hasPriority()) {
                    lastPrioritySequenceNumber[inputChannel.getChannelIndex()] =
                            bufferAndAvailability.getSequenceNumber();
                    if (!morePriorityEvents) {
                        priorityAvailabilityHelper.resetUnavailable();
                    }
                }

                checkUnavailability();
                                
                return Optional.of(
                        new InputWithData<>(
                                inputChannel,
                                bufferAndAvailability,
                                !inputChannelsWithData.isEmpty(),
                                morePriorityEvents));
            }
        }
    }

在上面StreamTaskNetworkInput.emitNext()方法中最后通过processElement方法,将数据传入当前的operator,然后开始调用算子处理数据

三. TaskManager和TaskManager传递数据

TaskManagerRunner.runTaskManager()中会构建TaskManagerRunner对象,该对象的构建过程很复杂,一层一层封装,最终调用start方法启动tm,在构造taskManager的时候,会构建ShuffleEnvironment<NettyShuffleEnvironment>,在ShuffleEnvironment持有一个ConnectionManager,该ConnectionManager会持有netty的client和server,当需要网络请求的时候会通过ConnectionManager去进行网络连接做请求

// -----------------   由于代码比较逻辑比较多,这里直接截取重点地方  ----------------------------
// 说一下创建流程
TaskManagerRunner.runTaskManager() // createTaskExecutorService 构造方法调用createTaskExecutorService()
TaskManagerRunner. createTaskExecutorService()  // 构建taskExecutor
TaskManagerRunner.startTaskManager() // 启动TaskManagerService
TaskManagerServices.fromConfiguration() // 构建tm的内部组件
TaskManagerServices.createShuffleEnvironment //构建ShuffleEnvironment并调用start方法 -- 内部构建connectionManager
NettyShuffleEnvironment.start() // 最终会调用connectionManager.start方法,初始化netty的clinet和server


  // 我们看看NettyShuffleEnvironment的构建已经connectionManager的构建过程中

  static NettyShuffleEnvironment createNettyShuffleEnvironment(
  NettyShuffleEnvironmentConfiguration config,
  ResourceID taskExecutorResourceId,
  TaskEventPublisher taskEventPublisher,
  ResultPartitionManager resultPartitionManager,
  MetricGroup metricGroup,
  Executor ioExecutor) {
  checkNotNull(config);
  checkNotNull(taskExecutorResourceId);
  checkNotNull(taskEventPublisher);
  checkNotNull(resultPartitionManager);
  checkNotNull(metricGroup);
    
  NettyConfig nettyConfig = config.nettyConfig();

  FileChannelManager fileChannelManager =
    new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
    
  // 如果本地测试构建的是LocalConnectionManger,当然如果一个tm就已经把任务启动起来了,
  // 那么也是LocalConnectionManger,否则都是基于netty的
  
  // 这里我们稍后看内容
  ConnectionManager connectionManager =
    nettyConfig != null? new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig)
                                    : new LocalConnectionManager();
    
  // 全局的buffer,是基于netty的
  // 回想一下,每个task有自己的localBufferPool,当buffer不够了,就回去请求netty申请buffer
  // 是的,没错,请求的就是这个pool
  NetworkBufferPool networkBufferPool =
    new NetworkBufferPool(
    config.numNetworkBuffers(),
    config.networkBufferSize(),
    config.getRequestSegmentsTimeout());

  registerShuffleMetrics(metricGroup, networkBufferPool);

  ResultPartitionFactory resultPartitionFactory =
    new ResultPartitionFactory(
    resultPartitionManager,
    fileChannelManager,
    networkBufferPool,
    config.getBlockingSubpartitionType(),
    config.networkBuffersPerChannel(),
    config.floatingNetworkBuffersPerGate(),
    config.networkBufferSize(),
    config.isBlockingShuffleCompressionEnabled(),
    config.getCompressionCodec(),
    config.getMaxBuffersPerChannel(),
    config.sortShuffleMinBuffers(),
    config.sortShuffleMinParallelism(),
    config.isSSLEnabled());
    
  // 这里可以看到是一个SingleInputGate的工厂,为什么这么做呢,因为task在初始化的时候,需要针对该task每个要消费的分区构建
  // 对应的inputGate,通过工厂可以简化构建过程
  SingleInputGateFactory singleInputGateFactory =
    new SingleInputGateFactory(
    taskExecutorResourceId,
    config,
    connectionManager,
    resultPartitionManager,
    taskEventPublisher,
    networkBufferPool);
    
  return new NettyShuffleEnvironment(taskExecutorResourceId,config,networkBufferPool,connectionManager,
    resultPartitionManager, fileChannelManager,resultPartitionFactory,singleInputGateFactory,ioExecutor);
}

上面我们看到了如果构建出NettyShuffleEnvironment,在NettyConnectionManager中管理netty的client和server,作为TaskManager的tcp连接,所有Task共用

// 管理netty的client和server

public class NettyConnectionManager implements ConnectionManager {
  public NettyConnectionManager(
    ResultPartitionProvider partitionProvider,
    TaskEventPublisher taskEventPublisher,
    NettyConfig nettyConfig) {
        // server : 用于其他client连接自己的    clinet : 用于连接其他server
        // server和client就是对netty代码的一层简单封装,内部就是构建bootstrap然后配置一些option和handler等
    this.server = new NettyServer(nettyConfig);
    this.client = new NettyClient(nettyConfig);
    this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

    this.partitionRequestClientFactory =
      new PartitionRequestClientFactory(client, nettyConfig.getNetworkRetries());
        // 内部提供了两个方法,分别是提供server和client的handlers方法
    this.nettyProtocol =
      new NettyProtocol(
      checkNotNull(partitionProvider), checkNotNull(taskEventPublisher));
  }

  @Override
  public int start() throws IOException {
    // 初始化clinet 和 server
    client.init(nettyProtocol, bufferPool);
    return server.init(nettyProtocol, bufferPool);
  }
    
  // 请求partition,并为其创建一个client
  // 里面主要包含的两个内容
  // CreditBasedPartitionRequestClientHandler channelHandler  
  // Channel tcpChannel  task用于发送tcp请求
  @Override
  public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
    throws IOException, InterruptedException {
    return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
  }
    
  // 省略其他代码 ......
}

上面我们看到TaskManager已经构建出了network相关的所有内容了,那么数据交换的流程是什么样子的呢

其实和TaskManager内的task之间的交互流程很相似,对于上面TaskManager内数据传输的基础上加了四个组件分别是PartitonRequestQueue,PartitionRequestServerHandler(server端的handler),PartitionRequestClient,CreditBasedClientHandler,四个组件

流程 :

上游算子写入数据到subPartiton中,有PartitonRequestQueue读取buffer,将buffer写入netty的channel,在netty出站之前会被netty的handler进行处理,即PartitionRequestServerHandler,在PartitionRequestClient读取到server发来的数据,通过CreditBasedPartitionRequestClientHandler(client端handler)的处理写入InputChannel

注意 : Flink数据交互是基于生产者消费者模型的

// 对于通过网络传输数据来说,数据写入到buffer与tm内传输是一致的都是调用addBuffer方法将生成的buffer加入的buffers队列中,其中不同的区别在于消费数据的对象有一些变更,在后面我们会看到

// 由于其中涉及网络通信可能看起来比较乱,如果我们把思路捋清楚之后其实就会变得很简单
// 网络传输是基于netty的所以下面内容需要一定的netty知识,如果netty不太了解,建议先了解一下netty


// 继承netty的入栈处理器,具体调用时机可以参考netty
class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
  // 其他方法忽略掉

  // 当有数据进入的时候入栈处理器的该方法被调用
  // 请求时机,下游的PartitionRequestClient发起请求,当前task接收到对应的请求后,会调用该方法进行响应的处理
  // 要注意在进入该handler之前会先进入decodeHandler对数据进行解码,解码出来的对象就是NettyMessage对象
  protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
    try {
      Class<?> msgClazz = msg.getClass();
      // 根据不同的请求做出不同的处理,我们只关注ResumeConsumption这个事件即可
      if (msgClazz == PartitionRequest.class) {
        PartitionRequest request = (PartitionRequest) msg;
        try {
          NetworkSequenceViewReader reader;
          reader =
            new CreditBasedSequenceNumberingViewReader(
            request.receiverId, request.credit, outboundQueue);

          reader.requestSubpartitionView(
            partitionProvider, request.partitionId, request.queueIndex);

          outboundQueue.notifyReaderCreated(reader);
        } catch (PartitionNotFoundException notFound) {
          respondWithError(ctx, notFound, request.receiverId);
        }
      }else if (msgClazz == TaskEventRequest.class) {
        TaskEventRequest request = (TaskEventRequest) msg;

        if (!taskEventPublisher.publish(request.partitionId, request.event)) {
          respondWithError(
            ctx,
            new IllegalArgumentException("Task event receiver not found."),
            request.receiverId);
        }
      } else if (msgClazz == CancelPartitionRequest.class) {
        CancelPartitionRequest request = (CancelPartitionRequest) msg;
        outboundQueue.cancel(request.receiverId);
      } else if (msgClazz == CloseRequest.class) {
        outboundQueue.close();
      } 
      // 收到client发来的credit,flink的流控机制是基于credit
      // 收到credit表示这我一次性要发送多少数据,如果对方credit过低,则只会
      // 发送对应credit的数据,不然会导致整的tcp连接反压
      // credit 表示 下游消费者可用的buffer数量
      else if (msgClazz == AddCredit.class) {
        AddCredit request = (AddCredit) msg;
        // 添加credit或者resume之后,消费者可以开始消费数据,即有请求我们基于响应
        // resume是在checkpoint barrier对齐阶段的,当前barrier之后的数据不可以被消费
        outboundQueue.addCreditOrResumeConsumption(

          request.receiverId,
          // 将当前的credit与历史的credit累加 lambda表达式
          reader -> reader.addCredit(request.credit));
      } else if (msgClazz == ResumeConsumption.class) {
        ResumeConsumption request = (ResumeConsumption) msg;

        outboundQueue.addCreditOrResumeConsumption(
          request.receiverId, NetworkSequenceViewReader::resumeConsumption);
      } else {
        LOG.warn("Received unexpected client request: {}", msg);
      }
    } catch (Throwable t) {
      respondWithError(ctx, t);
    }
  }

}


//PartitonRequestQueue.addCreditOrResumeConsumption()
// 上面是我们netty收到credit请求,那我们需要给消费者响应数据
void addCreditOrResumeConsumption(
  InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation)
  throws Exception {
  if (fatalError) {
    return;
  }
  // 通过对应的recevierId获取对应的reader读取数据
  // recevierId记录了发起请求或者需要接收的inputChannel编号
  NetworkSequenceViewReader reader = allReaders.get(receiverId);
  if (reader != null) {
    // 传入的表达式,累加credit
    operation.accept(reader);

    // 尝试将reader加入的队列,加入队列后,会被循环的调用处理该reader读取数据
    // 后面一层一层调用  availableReaders.add(reader); 将其加入到队列中
    enqueueAvailableReader(reader);
  } else {
    throw new IllegalStateException(
      "No reader for receiverId = " + receiverId + " exists.");
  }
}


private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
  if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
    return;
  }

  boolean triggerWrite = availableReaders.isEmpty();
  registerAvailableReader(reader);
  // 如果为空则开始触发写
  if (triggerWrite) {
    writeAndFlushNextMessageIfPossible(ctx.channel());
  }
}


// 如果仔细看下面的内容其实和tm内数据传输很相似
// 区别就是通过一个是先获取对应reader,在调用subpartitionView.pollNext,
// 而tm内传输就是通过直接调用subpartitionView

// 注意他们的类也不一样,该类是PartitonRequestQueue
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
  if (fatalError || !channel.isWritable()) {
    return;
  }
  BufferAndAvailability next = null;
  try {
    while (true) {
      // 从可用reader中获取一个reader,如果存在的话则开始进行处理
      NetworkSequenceViewReader reader = pollAvailableReader();
      if (reader == null) {
        return;
      }
            // reader里是调用了subpartitionView.pollNext,与tm内tsask交互类似
      next = reader.getNextBuffer();
      if (next == null) {
        if (!reader.isReleased()) {
          continue;
        }

        Throwable cause = reader.getFailureCause();
        if (cause != null) {
          ErrorResponse msg =
            new ErrorResponse(
            new ProducerFailedException(cause), reader.getReceiverId());

          ctx.writeAndFlush(msg);
        }
      } else {
        // 如果还有数据则将reader在次加入队列中,等待下次被调用,注意这里是循环
        if (next.moreAvailable()) {
          registerAvailableReader(reader);
        }
                
        // 构建一个response
        // 该response的响应带有buffer,buffer内是record,注意是被序列化的
        BufferResponse msg =
          new BufferResponse(
          next.buffer(),
          next.getSequenceNumber(),
          reader.getReceiverId(),
          next.buffersInBacklog());

       // 将数据发送给clinet端, 当前属于server端
        // 时刻要记住现在是在netty阶段,不要忘记网络传输
        channel.writeAndFlush(msg)
          // 当flush完成会尝试处理下一个buffer
          // 添加一个listener,该listener也会做与上面相同的内容
          .addListener(writeListener);

        return;
      }
    }
  } catch (Throwable t) {
    if (next != null) {
      next.buffer().recycleBuffer();
    }

    throw new IOException(t.getMessage(), t);
  }
}


上面的内容是下游算子发送请求到上游,netty接收到下游的请求,对消息进行decode,最后封装成nettyMessage,在PartitionRequestServerHandler判断事件类型,进行响应的处理,由于我们关注的是数据交互内容,所以我们关注的是AddCredit事件,收到该实际后找到请求的inpuCannel对应的reader,然后通过reader调用pollNext读取数据反馈给client响应

下面我们需要对client的响应内容进行分析

netty的server端即producer,收到client端即consumer发送的请求后,处理完成后会进行响应,数据是通过tcp进行发送的,在netty对数据处理抽象成了一个pipeline,pipeline是由多个handler组成的,当netty接收的数据的时候会触发inboundHander的计算,当发送数据出去的时候会触发outboundHander的计算,所以netty的client端接收到数据后对进入decode进行解码响应,解析的response会向下游的handler发送,下游的handler就是CreditBasedPartitionRequestClientHandler,所以我们的入口就是这个类

注 : 由于涉及网络连接我们对于其关系的调用可能会比较混乱,毕竟跨网络其代码复杂度就会增加,在后面我会通过画图的方式将其原理讲解出来

class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter
  implements NetworkClientHandler {

  // netty client接收到数据的时候调用该方法
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
      // 解码消息
      decodeMsg(msg);
    } catch (Throwable t) {
      notifyAllChannelsOfErrorAndClose(t);
    }
  }

  private void decodeMsg(Object msg) throws Throwable {
    final Class<?> msgClazz = msg.getClass();

    // 解码实际上在数据接入的时候已经解码,现在只不过是对msg做了强转
    if (msgClazz == NettyMessage.BufferResponse.class) {
      NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

      RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
      if (inputChannel == null || inputChannel.isReleased()) {
        bufferOrEvent.releaseBuffer();
        cancelRequestFor(bufferOrEvent.receiverId);
        return;
      }
      try {
        // 就当层层调用吗
        decodeBufferOrEvent(inputChannel, bufferOrEvent);
      } catch (Throwable t) {
        inputChannel.onError(t);
      }
    } else if (msgClazz == NettyMessage.ErrorResponse.class) {
      // 省略 .....
    } else {
      throw new IllegalStateException( "Received unknown message from producer: " + msg.getClass());
    }
  }


  private void decodeBufferOrEvent(
    RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent)
    throws Throwable {
    if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
      inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
    } else if (bufferOrEvent.getBuffer() != null) {
      // 如果有buffer调用inputChannel的onBuffer方法
      // inputChannel此时是RemoteInputChannel
      // 主要就是将buffer加入到队列中
      inputChannel.onBuffer(
        bufferOrEvent.getBuffer(), bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
    } else {
      throw new IllegalStateException(
        "The read buffer is null in credit-based input channel.");
    }
  }

}


// RemoteInputChannel.onBuffer

   public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
        boolean recycleBuffer = true;

        try {
            if (expectedSequenceNumber != sequenceNumber) {
                onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                return;
            }

            final boolean wasEmpty;
            boolean firstPriorityEvent = false;
            synchronized (receivedBuffers) {
                NetworkActionsLogger.traceInput(
                        "RemoteInputChannel#onBuffer",
                        buffer,
                        inputGate.getOwningTaskName(),
                        channelInfo,
                        channelStatePersister,
                        sequenceNumber);
           
                if (isReleased.get()) {
                    return;
                }

                wasEmpty = receivedBuffers.isEmpty();

                SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber);
                DataType dataType = buffer.getDataType();
                if (dataType.hasPriority()) {
                    firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
                    recycleBuffer = false;
                } else {
                    receivedBuffers.add(sequenceBuffer);
                    recycleBuffer = false;
                    channelStatePersister.maybePersist(buffer);
                    if (dataType.requiresAnnouncement()) {
                        firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer));
                    }
                }
                ++expectedSequenceNumber;
            }

            if (firstPriorityEvent) {
                notifyPriorityEvent(sequenceNumber);
            }
          // 改代码前面遇到过很多次了
            if (wasEmpty) {
              // 通知channel有数据,将channel加入可用channel,和前面的代码一样
              // 都是调用到了inputGate的notifyChannelNonEmpty方法
              // 并唤醒在inputChannelsWithData等待的线程
                notifyChannelNonEmpty();
            }

            if (backlog >= 0) {
                onSenderBacklog(backlog);
            }
        } finally {
            if (recycleBuffer) {
                buffer.recycleBuffer();
            }
        }
    }


上面已经完成了数据进入队列和可用channel进入队列,后面的内容与tm内数据交互基本一直只不过在请求inputChannel的时候从LocalInputChannel变成了RemotInputChannel,其余逻辑都是相同的

到这里我们已经把数据交换的内容已经讲解完成,下面是图解,对于里面的内容在代码中都有体现

数据网络交换流程
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容