Flink数据交换源码及其原理
前言
对于里面内容我们需要一定的netty只是,在涉及网络交互的时候是基于netty的,所以不了解netty可能让我们难以理解其中逻辑
对于一些相关组件我们提前介绍一下,后面会常提到
ResultPartition: 算子的数据会写入ResultPartition,包含多个ResultSubPartition
ResultSubPartition : 每个task消费一个ResultSubPartition,
task不会只消费一个ResultSubPartition
InputGate : 对数据输入的封装,Gate(翻译 : 门)就很容易理解,其实InputGate主要还是inputChannel的封装,InputGate中包含多个InputChannel
InputChannel : 实际工作的内容,分为本地和远程,每一个InputChannel接收一个ResultSubPartition输出
Task构建简单流程
我们从构建出来Task之后开始读取数据发送数据的流程,今天聊一聊Task是如何传递数据,这里是讨论数据交互前所需要的组件构建的过程以及交互过程
在chain中直接调用下游算子的processElement方法即可,如果是taskManger和跨网络中,会对数据进行序列化以及反序列写入到buffer(buffer包含一个MemorySegment)中,会通过bufferBuilder来讲数据写入到MemorySegment中,与BufferBuilder想对应的时候BufferConsumer位于下游task,负责读取MemorySegment的数据,一个bufferBuilder对应一个BufferConsumer
// 这是flink调度任务的一个具体执行类,继承runnable,表示一个task由一个线程执行
public class Task implements
Runnable,TaskSlotPayload,TaskActions,
PartitionProducerStateProvider,
CheckpointListener,BackPressureSampleableTask{ }
// Task.run,可以看到直接调用doRun方法,doRun方法也是主要逻辑存在的地方
public void run() {
try {
doRun();
} finally {
terminationFuture.complete(executionState);
}
}
// 这里主要保留了 最主要的几行代码,其他的都删掉了,这里通过invokable调用invoke方法来真正的启动任务
// AbstractInvokable 一些实现类 StreamTask DataSrouceTask等.
// 我们通过api的方式source生成SourceStreamTask,其他算子根据不同的输入生成不同的StreamTask
private void doRun() {
AbstractInvokable invokable = null;
// 构建AbstractInvokable,实际上就是StreamTask,当然我说的是我这里的栗子,不同的模式可能会创建不同的AbstractInvokable,里面通过具体nameOfInvokableClass全限定类名来反射构建具体的AbstractInvokable对象,比如这里构建的是OneInputStreamTask
invokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
this.invokable = invokable;
// 调用invoke开始执行
invokable.invoke();
}
// 在进入invoke的方法前,我们看看AbstractInvokable是如何构建出来的,以OneInputStreamTask为栗子
// 这里通过反射的方式构建了传入的Environment
public OneInputStreamTask(Environment env) throws Exception {
// 调用父类构造
super(env);
}
// 父类构造
protected StreamTask(Environment env) throws Exception {
this(env, null);
}
// 层层调用在这里注意一点
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor)
throws Exception {
this(
environment,
timerService,
uncaughtExceptionHandler,
actionExecutor,
// 创建了一个Mailbox对象,后面task的数据接收和发送都依赖于该对象,并将构建当前线程作为参数传入
// 表示当前线程才可以读取mail --- 毕竟这是属于你的邮件
new TaskMailboxImpl(Thread.currentThread()));
}
// 最终执行到该构造方法中
// 只标注重点内容
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox)
throws Exception {
// 调用父类构造即AbstractInvokable
super(environment);
this.configuration = new StreamConfig(getTaskConfiguration());
// TODO 重点 后面需要
// private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
// 构建recordWriter,实际构建的是RecordWriterDelegate,通过getRecordWriter方法获取具体的recordWriter
// 下面介绍怎么构建出来recordWriter的
this.recordWriter = createRecordWriterDelegate(configuration, environment);
// 处理基于mailbox之外的一些动作,比如发送事件等动作
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
// TODO 重点
// 构建mailbox处理器,用于处理mailbox收到的mail信息
// 参数1:MailboxDefaultAction实现类,由lambda表达式方式编写,即调用当前类的processInput方法(即OneInputStreamTask)
// 参数2:mailbox --- 处理邮箱,你首先得有一个邮箱对吧
// 参数3:处理邮箱的线程
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
this.mailboxProcessor.initMetric(environment.getMetricGroup());
// 用于执行mail的executor,类似于java的线程池,但不医院
this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
// 异步处理snapshot的线程,嘿嘿嘿
this.asyncOperationsThreadPool =
Executors.newCachedThreadPool(
new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
this.stateBackend = createStateBackend();
// task处理checkpoint的conordinator
this.subtaskCheckpointCoordinator =
new SubtaskCheckpointCoordinatorImpl(
stateBackend.createCheckpointStorage(getEnvironment().getJobID()),
getName(),
actionExecutor,
getCancelables(),
getAsyncOperationsThreadPool(),
getEnvironment(),
this,
configuration.isUnalignedCheckpointsEnabled(),
this::prepareInputSnapshot);
if (timerService == null) {
ThreadFactory timerThreadFactory =
new DispatcherThreadFactory(
TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
this.timerService =
new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory);
} else {
this.timerService = timerService;
}
// 用于处理io的线程,比如恢复state
this.channelIOExecutor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory("channel-state-unspilling"));
injectChannelStateWriterIntoChannels();
}
// 在上面我们已经了解了streamTask的构造方法,现在我们把recordWirter构建的过程介绍一下
// StreamTask.createRecordWriterDelegate
public static <OUT>
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
createRecordWriterDelegate(
StreamConfig configuration, Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites =
// 构建task对应的writers,通常是1个
createRecordWriters(configuration, environment);
// 判断writer的输出数量,一般情况下就是1 也就是构建的都是SingleRecordWriter
if (recordWrites.size() == 1) {
// 直接获取对其包装一下即可
return new SingleRecordWriter<>(recordWrites.get(0));
} else if (recordWrites.size() == 0) {
return new NonRecordWriter<>();
} else {
// 比如
// 在下面中,map生成的时候就会生成MultipleRecordWriters
// DataStreamSource<Object> source =....'
// SingleOutputStreamOperator<U> map =source.map;
// source.connect(map);
return new MultipleRecordWriters<>(recordWrites);
}
}
// 构建task对应的recordWriter
// StreamTask.createRecordWriters
private static <OUT>
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration, Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
new ArrayList<>();
// 找到operator的所有出边
List<StreamEdge> outEdgesInOrder =
configuration.getOutEdgesInOrder(
environment.getUserCodeClassLoader().asClassLoader());
// 遍历出边集合,为每一个出边构建一个recordWriter
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge edge = outEdgesInOrder.get(i);
recordWriters.add(
// 构建recordWriter
// 具体不看了,简单介绍一下
// 获取task对应的partitioner,获取对应的ResultPartitionWriter,然后通过构造者模式构建RecordWriter
// 在内部会判断是不是广播的,如果是广播的则构建BroadcastRecordWriter,否则是ChannelSelectorRecordWriter(基本都是他)
// ChannelSelectorRecordWriter的构造过程中最终要的两个参数ResultPartitionWriter, ChannelSelector
createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskName(),
edge.getBufferTimeout()));
}
return recordWriters;
}
在上面我们已经了解到了task的构建和recordWriter的构建了,现在我们要进入invoke方法,来看看怎么执行的了,当invoke调用的时候,Task会做一些执行前的准备工作,然后真正的开始调用userFunction读取数据发送数据的过程了,那么我们现在开始通过StreamTask来看内部如何实现的
// StreamTask是抽象类
// StreamTask.invoke方法
@Override
public final void invoke() throws Exception {
try {
// 最主要的两个方法
// 在里面会初始化 input和output,来明确数据的输入和输出
// 通过不同的输入输出来确定数据的交互方式,比如线程内,taskManager的task之间或者基于网络传输
beforeInvoke();
// 运行mail,开始持续的读取数据发送下游,后面看
runMailboxLoop();
}
// 不重要代码去掉了
protected void beforeInvoke() throws Exception {
// 重点
// 构建operatorChain,我们需要深入去看, recordWriter在构造方法是构建的,前面已经讲解
operatorChain = new OperatorChain<>(this, recordWriter);
// 即运行在task里的具体的operator
mainOperator = operatorChain.getMainOperator();
// 做一些task的初始化工作
actionExecutor.runThrowing(() -> {
// 读取state的reader
SequentialChannelStateReader reader =
getEnvironment().getTaskStateManager()
.getSequentialChannelStateReader();
reader.readOutputData(getEnvironment().getAllWriters(), false);
// operator初始化,会调用的算子的open方法
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
// 读取state数据
channelIOExecutor.execute(() -> {
try {
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});
});
isRunning = true;
}
// beforeInvoke的最终要一点就是构建的OperatorChain,我们一起深入看看operatorChain的构造方法
// 代码很多,由于我们只关注数据交换的内容,其他的地方不做讲解
public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
this.operatorEventDispatcher =
new OperatorEventDispatcherImpl(
containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
StreamOperatorFactory<OUT> operatorFactory =
configuration.getStreamOperatorFactory(userCodeClassloader);
// we read the chained configs, and the order of record writer registrations by output name
Map<Integer, StreamConfig> chainedConfigs =
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
// from here on, we need to make sure that the output writers are shut down again on failure
boolean success = false;
try {
createChainOutputs(
outEdgesInOrder,
recordWriterDelegate,
chainedConfigs,
containingTask,
streamOutputMap);
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperatorWrapper<?, ?>> allOpWrappers =
new ArrayList<>(chainedConfigs.size());
this.mainOperatorOutput =
// 重点
// 构建output,即数据的输出
createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOpWrappers,
containingTask.getMailboxExecutorFactory());
if (operatorFactory != null) {
Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
mainOperatorOutput,
operatorEventDispatcher);
OP mainOperator = mainOperatorAndTimeService.f0;
mainOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
mainOperatorOutput.getWatermarkGauge());
this.mainOperatorWrapper =
createOperatorWrapper(
mainOperator,
containingTask,
configuration,
mainOperatorAndTimeService.f1,
true);
// add main operator to end of chain
allOpWrappers.add(mainOperatorWrapper);
this.tailOperatorWrapper = allOpWrappers.get(0);
} else {
checkState(allOpWrappers.size() == 0);
this.mainOperatorWrapper = null;
this.tailOperatorWrapper = null;
}
this.chainedSources =
createChainedSources(
containingTask,
configuration.getInputs(userCodeClassloader),
chainedConfigs,
userCodeClassloader,
allOpWrappers);
this.numOperators = allOpWrappers.size();
firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);
success = true;
} finally {
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
}
}
}
}
}
// 在这里面构建output
// OperatorChain.createOutputCollector
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
new ArrayList<>(4);
// 当调用了startNewChain或进行chain之后会进入这里,构建非chain的output
// 这里的需要跨网络传输或者线程之间传输
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 默认情况下开启chain,都会进入到该逻辑,生成对应的output
// 开启chain会进入到这个循环,这个循环里构建的output都是在一个task中,即一个线程中执行
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
/*
这里简单介绍一下这个里的递归操作
假设 : chain1 => source->map1->map2 | chain2 flatMap->sink
我们跟踪chain1的构建过程
当构建operatorChain的output时候,首先构建source的output,由于会进入chained的循环,开始递归,进入map1的递归,
此时map1的也会进入chained的循环,继续递归,此时构建map2的output,由于map2后面是非chained(因为是新的chain)一
起的,所以此时map2进入进入nonChained的循环,构建出需要网络的传输的output,构建完成map2的output,开始弹栈,对之
前入栈的chained的构建output的方法,开始完成递归的构造output
结果 :
调用 : input.processElement() input.processElement() (包装了RecordWriterOuput)序列化发送buffer
chain1 : source(CopyingChainingOutput)->map1(CopyingChainingOutput)->map2(CountingOuput(RecordWriterOuput))
*/
WatermarkGaugeExposingOutput<StreamRecord<T>> output =
// 这个方法会递归的调用createOutputCollector(当前方法)方法
// 因为需要对chain在一起的每个operator创建一个对应的output
// 具体构建代码我们就不看了,简单介绍一下
// 内部条件判断,是否开启了reuse,如果开启了则创建ChainingOutput,否则创建CopyingChainingOutput
// CopyingChainingOutput(通过StreamRecord.copy创建一个新的)和ChainingOutput(复用StreamRecord)
createOperatorChain(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 下面进行返回,如果上面构建了output,基本都会进入该逻辑,如果广播/sink则会进行下面的逻辑
if (allOutputs.size() == 1) {
return allOutputs.get(0).f0;
} else {
Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++) {
asArray[i] = allOutputs.get(i).f0;
}
// 如果是sink asArray一定是0,所以下面的collector不会输出任何结果,嘿嘿嘿
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
return closer.register(new CopyingBroadcastingOutputCollector<>(asArray, this));
} else {
return closer.register(new BroadcastingOutputCollector<>(asArray, this));
}
}
}
通过上面我们已经了解到了数据传输所需要的相关组件,现在根据不同的交互方式来观察一下数据的走向
一. 线程内数据传递
在上面我们看到了构建output的过程,那么数据发送到下游就是通过output来完成,在上面了解都开启operatorChain生成output是ChainingOutput,否则创建CopyingChainingOutput,那我们就看看不复用streamRecord的CopyingChainingOutput
// 未开启reuse的output,这里面内容比较简单
final class CopyingChainingOutput<T> extends ChainingOutput<T> {
private final TypeSerializer<T> serializer;
public CopyingChainingOutput(
OneInputStreamOperator<T, ?> operator,
TypeSerializer<T> serializer, // 用于序列化的
OutputTag<T> outputTag,
StreamStatusProvider streamStatusProvider) {
// 调用父类的构造即ChainingOutput,operator复制给成员变量input
// 注意 : input对应的就是下游算子
// 这么理解 : op1 -> output -> input -> op2 -> output
super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
// collector.collect方法,毕竟是子类,想想我们在算子中,向下游发送数据,比如process中也是通过out.collect方法
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are not responsible for emitting to the main output.
return;
}
// 具体的发送到下游的方法
pushToOperator(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are not responsible for emitting to the side-output specified by this
// OutputTag.
return;
}
pushToOperator(record);
}
@Override
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
StreamRecord<T> castRecord = (StreamRecord<T>) record;
// metric内容
numRecordsIn.inc();
// 调用record的copy方法获取一个新的record,里面是new了一个新的record
// 这里调用了序列化的方法,嘿嘿 后面想起来了就一起看看
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
input.setKeyContextElement(copy);
// 直接调用下游的processElement方法,所以不需要对数据进行序列化和反序列
input.processElement(copy);
} catch (ClassCastException e) {
if (outputTag != null) {
// Enrich error message
ClassCastException replace =
new ClassCastException(
String.format("%s. Failed to push OutputTag with id '%s' to operator. "+ "This can occur when multiple OutputTags with different types "+ "but identical names are being used.",
e.getMessage(), outputTag.getId()));
throw new ExceptionInChainedOperatorException(replace);
} else {
throw new ExceptionInChainedOperatorException(e);
}
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
}
// map算子的包装类,在同一个chain中,会直接调用算子的processElement方法
// StreamMap.processElement
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// 接收到record后直接调用userFunction的map方法,然后将结果发送下游
output.collect(element.replace(userFunction.map(element.getValue())));
}
可以看到对于在operatorChain中数据传递就是直接调用下游的processElement方法,当位于operatorChain最后一个的operator,他会持有一个RecordWriterOutput,将数据发送到下一个task(chained/operator),对于数据交互,我们分别通过数据的输入和数据的输出来进行讲解
二. TaskManager内Task数据传递
虽然Task都在一个JVM进程内,但是数据在传递的过程中也是需要进行序列化和反序列化
在同一个TaskManger内Task传递数据主要通过一下几个组件完成的,ResultSubPartition,InputGate,InputChannel ,BufferBuilder
流程 :
上游算子持续写入数据,将数据写入到ResultSubPartition中,ResultSubPartition接收到数据会通知InputGate,InputGate将InputChannel加入的可用的Channel队列中,下游算子通过channel.pollNext()读取数据进行消费交给用户的代码执行,对于简述的流程我们在下面会看到对应实现
1.数据输出
接上上面,在chain中,一层一层调用下游的processElement方法,直到chain的最后一个operator的output是RecordWriterOutput,该operator通过该output发送到下游(当然不是真的发送到下游,而是发送到subpartition中)
// 直接进入源码,看看数据到底是怎么发送的
public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
public RecordWriterOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
OutputTag outputTag,
StreamStatusProvider streamStatusProvider,
boolean supportsUnalignedCheckpoints) {
// 真正干活的
// flink在stream中数据传递的抽象通过record表示,该writer用于将数据序列化并写入的subPartitin中
this.recordWriter =(RecordWriter<SerializationDelegate<StreamElement>>) (RecordWriter<?>) recordWriter;
// 数据序列化的包装器,里面包装了数据类型的序列化器
TypeSerializer<StreamElement> outRecordSerializer = new StreamElementSerializer<>(outSerializer);
if (outSerializer != null) {
serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
}
// 忽略非重点....
}
@Override
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
return;
}
// 真正干活的方法
pushToRecordWriter(record);
}
private <X> void pushToRecordWriter(StreamRecord<X> record) {
// 将数据设置到序列化包装器的instance中
serializationDelegate.setInstance(record);
try {
// 通过recordWriter将包装器发送
// recordWriter的实现类是ChannelSelectorRecordWriter
recordWriter.emit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
// 一些事件相关的发送逻辑此处省略............
}
// --------------------------------------------------------------------------------------------
// 简单先了解一下ChannelSelector,主要返回一个int值,该值代表的是数据发送下游的具体channel的索引
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
// 一个轮训的算法
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
}
// 通过上面的逻辑我们可以看到,collect方法最终调用了recordWriter.emit()方法进行发送数据,那么我们进入具体的实现类看看细节
// 顾名思义,通过名字我们大致可以猜到,对于record的发送,我们需要选择发送到对应的channel中
public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
@Override
public void emit(T record) throws IOException {
// 方法负载,调用父类emit方法
// 通过selectorChannel方法计算要发送到的具体channel的索引.
// 这其实就是数据的分发策略了,比如shuffle,keyby等,表示数据的发送策略,发送哪个channel
emit(record, channelSelector.selectChannel(record));
}
// RecordWriter.emit()
protected void emit(T record, int targetSubpartition) throws IOException {
// targetPartition = PipelinedResultPartition
// serializeRecord方法传入序列化器和record,将record使用指定的序列化器进行序列化会返回一个java的ByteBuffer
// 通过emitRecord方法发送数据
targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
// 数据传输中,肯定是一次发送批量性能会更高,所以在flink中也会以这种形式发送数据
// 在父类中RecordWriter有个内部类(OutputFlusher),继承Thread并重写run方法,在父类构造方法中构建此对象,
// 如果传入的timeout>0则创建OutputFlusher对象否则不创建直接走下面判断逻辑,在run方法中会循环调用
// Thread.sleep(timeout)进行睡眠,当睡眠结束后调用flushAll()
// flushAlways=(timeout==0)的时候,每接收一条数据发送一条数据
// timeout时间配置 DataStream.setBufferTimeout();
if (flushAlways) {
targetPartition.flush(targetSubpartition);
}
}
}
// 进入 BufferWritingResultPartition.emitRecord()
// 这个用于将数据放入buffer中在resultPartitin中,有一个localBufferPool,缓存这可用的buffer,
// 当buffer不可用的时候请求全局的networkBufferPool来申请可用的buf
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
// 通过appendUnicastDataForNewRecord方法申请buffer,具体代码不看了,有兴趣自己研究一下
// 简单介绍一下
// 内部通过LocalBuuferPool来申请buffer,在申请buffer时候如果local的用完,则申请network的buffer
// 如果network的buffer申请超过限制或者network没有可用buffer则会阻塞等待,直到有可用buffer,联想一下`反压`
// 当申请到buffer之后会构建一个BufferBuilder,然后调用对应的targetSubpartition的add方法传入一个
// BufferConsumer(通过buffer内部方法构建的),在add方法中会调用notifyDataAvailable()方法,最终会调用
// 到channel的notifyDataAvailable()方法开始触发消费
// 上面走完后,将record写入的buffer中
// 该方法我们在数据输入的时候进行讲解
BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
// 对于上面的数据写入到buffer后,可能会出现下面三种情况,每种情况对应不同的处理方式
// buffer满了,但是record之存入了一部分
while (record.hasRemaining()) {
// full buffer, partial record
// 将buffer进行finish,表示buffer不可在写入
finishUnicastBufferBuilder(targetSubpartition);
// 重亲申请一个buffer,写入剩余的数据,直到退出循环
buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
}
// buffer满了,record也全部存入了
if (buffer.isFull()) {
// full buffer, full record
// 将buffer进行finish,表示buffer不可在写入
finishUnicastBufferBuilder(targetSubpartition);
}
// buffer没满,但record全部存入了
// 什么也不做,只要record是完整的存入就可以了
// partial buffer, full record
}
// 到此record已经写入到buffer,等待下游去消费数据了
2.数据输入
// 对于上面,我们已经了解到数据是如何写入到buffer(`在SubPartition中`)中的了,
// 那么我们接着上面是如何通知下游去触发消费的
// 该方法在emitRecord()调用
// BufferWritingResultPartition.appendUnicastDataForNewRecord()
private BufferBuilder appendUnicastDataForNewRecord(
final ByteBuffer record, final int targetSubpartition) throws IOException {
// 获取bufferBuilder,当数据写完之后没有写满buffer
BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
// 如果等于null则说明已经写满发送了,或者刚初始化的时候
if (buffer == null) {
buffer = requestNewUnicastBufferBuilder(targetSubpartition);
// 通知数据可用的方法在add中被调用
// 添加buffer消费者, 构建一个buffer消费者, 0 表示要跳过的数据长度
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), 0);
}
// 追加的形式写入数据
buffer.appendAndCommit(record);
return buffer;
}
// PipelinedSubpartition.add()
@Override
public boolean add(BufferConsumer bufferConsumer, int partialRecordLength) {
return add(bufferConsumer, partialRecordLength, false);
}
// 继续走
private boolean add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {
checkNotNull(bufferConsumer);
synchronized (buffers) {
// 构建一个BufferConsumerWithPartialRecordLength添加到buffers中,在后面会用到
if (addBuffer(bufferConsumer, partialRecordLength)) {
prioritySequenceNumber = sequenceNumber;
}
}
// 其他内容省略,不是终点,通过notifyDataAvailable()方法,来通知数据可用,可以被消费了
if (notifyDataAvailable) {
notifyDataAvailable();
}
return true;
}
// 下面就是一长串的调用
private void notifyDataAvailable() {
final PipelinedSubpartitionView readView = this.readView;
if (readView != null) {
readView.notifyDataAvailable();
}
}
// PipelinedSubpartitionView
@Override
public void notifyDataAvailable() {
availabilityListener.notifyDataAvailable();
}
// LocalInputChannel
@Override
public void notifyDataAvailable() {
notifyChannelNonEmpty();
}
// InputChanne
protected void notifyChannelNonEmpty() {
// 传入了this,表示该channel有数据了,可以被消费
inputGate.notifyChannelNonEmpty(this);
}
// SingleInputChanne
void notifyChannelNonEmpty(InputChannel channel) {
queueChannel(checkNotNull(channel), null);
}
// 最终会调用SingleInputGate.queueChannel
// 代码走到这里在后面都是有OutputFlusher线程去执行到的(不是绝对的),
// 如果不记得了可以看看上面有RecordWriter内部的flush线程做的
// 当代码走完了,说明有数据的channel已经被放入队列中,执行当前task的线程会开始
// 读取数据通过pollnext方法
private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber) {
try (GateNotificationHelper notification =
new GateNotificationHelper(this, inputChannelsWithData)) {
synchronized (inputChannelsWithData) {
boolean priority = prioritySequenceNumber != null;
if (priority
&& isOutdated(
prioritySequenceNumber,
lastPrioritySequenceNumber[channel.getChannelIndex()])) {
// priority event at the given offset already polled (notification is not atomic
// in respect to
// buffer enqueuing), so just ignore the notification
return;
}
// 尝试将channel放入inputChannelsWithData队列
// 如果放入队列,
if (!queueChannelUnsafe(channel, priority)) {
return;
}
if (priority && inputChannelsWithData.getNumPriorityElements() == 1) {
notification.notifyPriority();
}
// 如果队列有channel可用,调用该方法,方法内部会调用inputChannelsWithData.notifyAll唤醒wait的线程
// inputChannelsWithData 里面存的是可用的channel,即该channel有数据
if (inputChannelsWithData.size() == 1) {
notification.notifyDataAvailable();
}
}
}
}
// 上面有数据的channel已经被加入到队列了,对于当前task需要从队列拉取到channel来消费数据即可
// 在上面构建task的时候我们构建了邮箱(mailbox)以及相关的处理程序
// 那么task是怎么运行起来的呢,我们聊一下
// 在invokeable调用invoke()后,我们在里面先调用了beforeInvoke构建operatorChain调用open等初始化的逻辑
// 然后调用runMailboxLoop()方法真的开始执行task
// 我们直接进入该方法看看
public void runMailboxLoop() throws Exception {
// 循环mailbox处理器
mailboxProcessor.runMailboxLoop();
}
//MailboxProcessor.runMailboxLoop()
public void runMailboxLoop() throws Exception {
final TaskMailbox localMailbox = mailbox;
Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController defaultActionContext = new MailboxController(this);
// 基于邮箱的线程模型消费数据,这里不展开
while (isMailboxLoopRunning()) {
// default action 可用之前会阻塞
processMail(localMailbox, false);
if (isMailboxLoopRunning()) {
// 执行默认的action,里面执行的任务,在构建task的时候,构建的通过lambda表达式传入了 this::processInput
// 执行StreamTask.processInput()方法
mailboxDefaultAction.runDefaultAction(defaultActionContext);
}
}
}
// StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
// 调用输入处理器的processInput方法进行数据的读取操作,inputPorcessor在初始化的时候调用构造的
// 这里进入方法里
InputStatus status = inputProcessor.processInput();
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT) {
controller.allActionsCompleted();
return;
}
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
}
//StreamOneInputProcessor.processInput()
public InputStatus processInput() throws Exception {
// 这里又是 input和output,可能不太好理解, 可以想一想flink的物理执行图的样子
// input = inputGate 这里表示数据的输入,上游算子将数据发送到subPartition,inputGate的inputChannel消费其数据
// 数据是序列化后的buffer
// output = 用于将数据拿到反序列化后的record,发送给operator,output持有operator的引用
// 数据流向
// op1(map) --> subPartition --> inputGate --> output --> op2(flatMap) --> subPartition
// 将input的数据通过output传递
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
endOfInputAware.endInput(input.getInputIndex() + 1);
}
return status;
}
// StreamTaskNetworkInput.emitNext()
public InputStatus emitNext(DataOutput<T> output) throws Exception {
// 可以看到这里是一个循环,正常的数据读取会循环两次
// 第一次进入processBuffer方法,currentRecordDeserializer对其赋值
// 第二次将reocrd反序列化进行发送下游
while (true) {
// get the stream element from the deserializer
//
if (currentRecordDeserializer != null) {
DeserializationResult result;
try {
// 将数据 序列化到deserializationDelegate
// deserializationDelegate里面会通过对buf的数据反序列化,复制到instance变量
result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
} catch (IOException e) {
throw new IOException(
String.format("Can't get next record for channel %s", lastChannel), e);
}
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
// 如果结果是一个完成的record,那么直接发送下游不需要在等新的buffer了
if (result.isFullRecord()) {
// 想下游发送数据
// 实际就是 output.emitRecord(reocrd) -> operator.processElement 假设 : op = streamMap
// 到这里就完成了数据的发送
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
// 从inputGate中获取buffer,
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is performed for unaligned checkpoint
// mode
if (bufferOrEvent.get().isBuffer()) {
// 处理record的buf
// 并获取对应的序列化器赋值给currentRecordDeserializer
processBuffer(bufferOrEvent.get());
} else {
// 处理事件的buf
processEvent(bufferOrEvent.get());
return InputStatus.MORE_AVAILABLE;
}
} else {
if (checkpointedInputGate.isFinished()) {
checkState(
checkpointedInputGate.getAvailableFuture().isDone(),
"Finished BarrierHandler should be available");
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
//CheckpointedInputGate.pollNext()
//CheckpointedInputGate[InputGateWithMetrics[SingleInputGate]],想想装饰者模式,每次封装都是对其功能的增强
// 我们需要看buffer是怎么拿到的
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
// 我们直接进入SingleInputGate.pollNext方法
Optional<BufferOrEvent> next = inputGate.pollNext();
// 省略无用代码
return next;
}
//SingleInputGate.pollNext()
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
// false表示是否要阻塞
return getNextBufferOrEvent(false);
}
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking)
throws IOException, InterruptedException {
// 省略无用代码
Optional<InputWithData<InputChannel, BufferAndAvailability>> next =
// 主要获取buf数据的方法
waitAndGetNextData(blocking);
InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
return Optional.of( transformToBufferOrEvent( // 构建BufferOrEvent对象
inputWithData.data.buffer(),
inputWithData.moreAvailable,
inputWithData.input,
inputWithData.morePriorityEvents));
}
// 如果在传入blocking==true,如果没有可用channel的情况下则进入wait
// 这里buffer获取到了,然后就是一层层的封装过程,返回到最后就是一个BufferOrEvent
// 至此 buffer获取到,开始数据的解析,反序列化的过程
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(
boolean blocking) throws IOException, InterruptedException {
while (true) {
synchronized (inputChannelsWithData) {
// 从inputChannelsWithData队列获取可用channel,该队列前面提到过,如果blocking=true
// 在没有可用的channel的时候会进入wait状态让出lock
Optional<InputChannel> inputChannelOpt = getChannel(blocking);
if (!inputChannelOpt.isPresent()) {
return Optional.empty();
}
final InputChannel inputChannel = inputChannelOpt.get();
// inputChannel = LocalInputChannel
// LocalInputChannel持有subpartitionView引用.可以直接从subpartitionView的buffers获取buf
// 在上面addBuffer中添加的
// 在getNextBuffer中重新包装buffer,包装为BufferAndAvailability对象
Optional<BufferAndAvailability> bufferAndAvailabilityOpt =
inputChannel.getNextBuffer();
if (!bufferAndAvailabilityOpt.isPresent()) {
checkUnavailability();
continue;
}
final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
if (bufferAndAvailability.moreAvailable()) {
// enqueue the inputChannel at the end to avoid starvation
queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
}
final boolean morePriorityEvents =
inputChannelsWithData.getNumPriorityElements() > 0;
if (bufferAndAvailability.hasPriority()) {
lastPrioritySequenceNumber[inputChannel.getChannelIndex()] =
bufferAndAvailability.getSequenceNumber();
if (!morePriorityEvents) {
priorityAvailabilityHelper.resetUnavailable();
}
}
checkUnavailability();
return Optional.of(
new InputWithData<>(
inputChannel,
bufferAndAvailability,
!inputChannelsWithData.isEmpty(),
morePriorityEvents));
}
}
}
在上面StreamTaskNetworkInput.emitNext()方法中最后通过processElement方法,将数据传入当前的operator,然后开始调用算子处理数据
三. TaskManager和TaskManager传递数据
TaskManagerRunner.runTaskManager()中会构建TaskManagerRunner对象,该对象的构建过程很复杂,一层一层封装,最终调用start方法启动tm,在构造taskManager的时候,会构建ShuffleEnvironment<NettyShuffleEnvironment>
,在ShuffleEnvironment持有一个ConnectionManager,该ConnectionManager会持有netty的client和server,当需要网络请求的时候会通过ConnectionManager去进行网络连接做请求
// ----------------- 由于代码比较逻辑比较多,这里直接截取重点地方 ----------------------------
// 说一下创建流程
TaskManagerRunner.runTaskManager() // createTaskExecutorService 构造方法调用createTaskExecutorService()
TaskManagerRunner. createTaskExecutorService() // 构建taskExecutor
TaskManagerRunner.startTaskManager() // 启动TaskManagerService
TaskManagerServices.fromConfiguration() // 构建tm的内部组件
TaskManagerServices.createShuffleEnvironment //构建ShuffleEnvironment并调用start方法 -- 内部构建connectionManager
NettyShuffleEnvironment.start() // 最终会调用connectionManager.start方法,初始化netty的clinet和server
// 我们看看NettyShuffleEnvironment的构建已经connectionManager的构建过程中
static NettyShuffleEnvironment createNettyShuffleEnvironment(
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorResourceId,
TaskEventPublisher taskEventPublisher,
ResultPartitionManager resultPartitionManager,
MetricGroup metricGroup,
Executor ioExecutor) {
checkNotNull(config);
checkNotNull(taskExecutorResourceId);
checkNotNull(taskEventPublisher);
checkNotNull(resultPartitionManager);
checkNotNull(metricGroup);
NettyConfig nettyConfig = config.nettyConfig();
FileChannelManager fileChannelManager =
new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
// 如果本地测试构建的是LocalConnectionManger,当然如果一个tm就已经把任务启动起来了,
// 那么也是LocalConnectionManger,否则都是基于netty的
// 这里我们稍后看内容
ConnectionManager connectionManager =
nettyConfig != null? new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig)
: new LocalConnectionManager();
// 全局的buffer,是基于netty的
// 回想一下,每个task有自己的localBufferPool,当buffer不够了,就回去请求netty申请buffer
// 是的,没错,请求的就是这个pool
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(
config.numNetworkBuffers(),
config.networkBufferSize(),
config.getRequestSegmentsTimeout());
registerShuffleMetrics(metricGroup, networkBufferPool);
ResultPartitionFactory resultPartitionFactory =
new ResultPartitionFactory(
resultPartitionManager,
fileChannelManager,
networkBufferPool,
config.getBlockingSubpartitionType(),
config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate(),
config.networkBufferSize(),
config.isBlockingShuffleCompressionEnabled(),
config.getCompressionCodec(),
config.getMaxBuffersPerChannel(),
config.sortShuffleMinBuffers(),
config.sortShuffleMinParallelism(),
config.isSSLEnabled());
// 这里可以看到是一个SingleInputGate的工厂,为什么这么做呢,因为task在初始化的时候,需要针对该task每个要消费的分区构建
// 对应的inputGate,通过工厂可以简化构建过程
SingleInputGateFactory singleInputGateFactory =
new SingleInputGateFactory(
taskExecutorResourceId,
config,
connectionManager,
resultPartitionManager,
taskEventPublisher,
networkBufferPool);
return new NettyShuffleEnvironment(taskExecutorResourceId,config,networkBufferPool,connectionManager,
resultPartitionManager, fileChannelManager,resultPartitionFactory,singleInputGateFactory,ioExecutor);
}
上面我们看到了如果构建出NettyShuffleEnvironment,在NettyConnectionManager中管理netty的client和server,作为TaskManager的tcp连接,所有Task共用
// 管理netty的client和server
public class NettyConnectionManager implements ConnectionManager {
public NettyConnectionManager(
ResultPartitionProvider partitionProvider,
TaskEventPublisher taskEventPublisher,
NettyConfig nettyConfig) {
// server : 用于其他client连接自己的 clinet : 用于连接其他server
// server和client就是对netty代码的一层简单封装,内部就是构建bootstrap然后配置一些option和handler等
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
this.partitionRequestClientFactory =
new PartitionRequestClientFactory(client, nettyConfig.getNetworkRetries());
// 内部提供了两个方法,分别是提供server和client的handlers方法
this.nettyProtocol =
new NettyProtocol(
checkNotNull(partitionProvider), checkNotNull(taskEventPublisher));
}
@Override
public int start() throws IOException {
// 初始化clinet 和 server
client.init(nettyProtocol, bufferPool);
return server.init(nettyProtocol, bufferPool);
}
// 请求partition,并为其创建一个client
// 里面主要包含的两个内容
// CreditBasedPartitionRequestClientHandler channelHandler
// Channel tcpChannel task用于发送tcp请求
@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
}
// 省略其他代码 ......
}
上面我们看到TaskManager已经构建出了network相关的所有内容了,那么数据交换的流程是什么样子的呢
其实和TaskManager内的task之间的交互流程很相似,对于上面TaskManager内数据传输的基础上加了四个组件分别是PartitonRequestQueue,PartitionRequestServerHandler(server端的handler),PartitionRequestClient,CreditBasedClientHandler,四个组件
流程 :
上游算子写入数据到subPartiton中,有PartitonRequestQueue读取buffer,将buffer写入netty的channel,在netty出站之前会被netty的handler进行处理,即PartitionRequestServerHandler,在PartitionRequestClient读取到server发来的数据,通过CreditBasedPartitionRequestClientHandler(client端handler)的处理写入InputChannel
注意 : Flink数据交互是基于生产者消费者模型的
// 对于通过网络传输数据来说,数据写入到buffer与tm内传输是一致的都是调用addBuffer方法将生成的buffer加入的buffers队列中,其中不同的区别在于消费数据的对象有一些变更,在后面我们会看到
// 由于其中涉及网络通信可能看起来比较乱,如果我们把思路捋清楚之后其实就会变得很简单
// 网络传输是基于netty的所以下面内容需要一定的netty知识,如果netty不太了解,建议先了解一下netty
// 继承netty的入栈处理器,具体调用时机可以参考netty
class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
// 其他方法忽略掉
// 当有数据进入的时候入栈处理器的该方法被调用
// 请求时机,下游的PartitionRequestClient发起请求,当前task接收到对应的请求后,会调用该方法进行响应的处理
// 要注意在进入该handler之前会先进入decodeHandler对数据进行解码,解码出来的对象就是NettyMessage对象
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
try {
Class<?> msgClazz = msg.getClass();
// 根据不同的请求做出不同的处理,我们只关注ResumeConsumption这个事件即可
if (msgClazz == PartitionRequest.class) {
PartitionRequest request = (PartitionRequest) msg;
try {
NetworkSequenceViewReader reader;
reader =
new CreditBasedSequenceNumberingViewReader(
request.receiverId, request.credit, outboundQueue);
reader.requestSubpartitionView(
partitionProvider, request.partitionId, request.queueIndex);
outboundQueue.notifyReaderCreated(reader);
} catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
}else if (msgClazz == TaskEventRequest.class) {
TaskEventRequest request = (TaskEventRequest) msg;
if (!taskEventPublisher.publish(request.partitionId, request.event)) {
respondWithError(
ctx,
new IllegalArgumentException("Task event receiver not found."),
request.receiverId);
}
} else if (msgClazz == CancelPartitionRequest.class) {
CancelPartitionRequest request = (CancelPartitionRequest) msg;
outboundQueue.cancel(request.receiverId);
} else if (msgClazz == CloseRequest.class) {
outboundQueue.close();
}
// 收到client发来的credit,flink的流控机制是基于credit
// 收到credit表示这我一次性要发送多少数据,如果对方credit过低,则只会
// 发送对应credit的数据,不然会导致整的tcp连接反压
// credit 表示 下游消费者可用的buffer数量
else if (msgClazz == AddCredit.class) {
AddCredit request = (AddCredit) msg;
// 添加credit或者resume之后,消费者可以开始消费数据,即有请求我们基于响应
// resume是在checkpoint barrier对齐阶段的,当前barrier之后的数据不可以被消费
outboundQueue.addCreditOrResumeConsumption(
request.receiverId,
// 将当前的credit与历史的credit累加 lambda表达式
reader -> reader.addCredit(request.credit));
} else if (msgClazz == ResumeConsumption.class) {
ResumeConsumption request = (ResumeConsumption) msg;
outboundQueue.addCreditOrResumeConsumption(
request.receiverId, NetworkSequenceViewReader::resumeConsumption);
} else {
LOG.warn("Received unexpected client request: {}", msg);
}
} catch (Throwable t) {
respondWithError(ctx, t);
}
}
}
//PartitonRequestQueue.addCreditOrResumeConsumption()
// 上面是我们netty收到credit请求,那我们需要给消费者响应数据
void addCreditOrResumeConsumption(
InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation)
throws Exception {
if (fatalError) {
return;
}
// 通过对应的recevierId获取对应的reader读取数据
// recevierId记录了发起请求或者需要接收的inputChannel编号
NetworkSequenceViewReader reader = allReaders.get(receiverId);
if (reader != null) {
// 传入的表达式,累加credit
operation.accept(reader);
// 尝试将reader加入的队列,加入队列后,会被循环的调用处理该reader读取数据
// 后面一层一层调用 availableReaders.add(reader); 将其加入到队列中
enqueueAvailableReader(reader);
} else {
throw new IllegalStateException(
"No reader for receiverId = " + receiverId + " exists.");
}
}
private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return;
}
boolean triggerWrite = availableReaders.isEmpty();
registerAvailableReader(reader);
// 如果为空则开始触发写
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}
// 如果仔细看下面的内容其实和tm内数据传输很相似
// 区别就是通过一个是先获取对应reader,在调用subpartitionView.pollNext,
// 而tm内传输就是通过直接调用subpartitionView
// 注意他们的类也不一样,该类是PartitonRequestQueue
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError || !channel.isWritable()) {
return;
}
BufferAndAvailability next = null;
try {
while (true) {
// 从可用reader中获取一个reader,如果存在的话则开始进行处理
NetworkSequenceViewReader reader = pollAvailableReader();
if (reader == null) {
return;
}
// reader里是调用了subpartitionView.pollNext,与tm内tsask交互类似
next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg =
new ErrorResponse(
new ProducerFailedException(cause), reader.getReceiverId());
ctx.writeAndFlush(msg);
}
} else {
// 如果还有数据则将reader在次加入队列中,等待下次被调用,注意这里是循环
if (next.moreAvailable()) {
registerAvailableReader(reader);
}
// 构建一个response
// 该response的响应带有buffer,buffer内是record,注意是被序列化的
BufferResponse msg =
new BufferResponse(
next.buffer(),
next.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
// 将数据发送给clinet端, 当前属于server端
// 时刻要记住现在是在netty阶段,不要忘记网络传输
channel.writeAndFlush(msg)
// 当flush完成会尝试处理下一个buffer
// 添加一个listener,该listener也会做与上面相同的内容
.addListener(writeListener);
return;
}
}
} catch (Throwable t) {
if (next != null) {
next.buffer().recycleBuffer();
}
throw new IOException(t.getMessage(), t);
}
}
上面的内容是下游算子发送请求到上游,netty接收到下游的请求,对消息进行decode,最后封装成nettyMessage,在PartitionRequestServerHandler判断事件类型,进行响应的处理,由于我们关注的是数据交互内容,所以我们关注的是AddCredit事件,收到该实际后找到请求的inpuCannel对应的reader,然后通过reader调用pollNext读取数据反馈给client响应
下面我们需要对client的响应内容进行分析
netty的server端即producer,收到client端即consumer发送的请求后,处理完成后会进行响应,数据是通过tcp进行发送的,在netty对数据处理抽象成了一个pipeline,pipeline是由多个handler组成的,当netty接收的数据的时候会触发inboundHander的计算,当发送数据出去的时候会触发outboundHander的计算,所以netty的client端接收到数据后对进入decode进行解码响应,解析的response会向下游的handler发送,下游的handler就是CreditBasedPartitionRequestClientHandler,所以我们的入口就是这个类
注 : 由于涉及网络连接我们对于其关系的调用可能会比较混乱,毕竟跨网络其代码复杂度就会增加,在后面我会通过画图的方式将其原理讲解出来
class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter
implements NetworkClientHandler {
// netty client接收到数据的时候调用该方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// 解码消息
decodeMsg(msg);
} catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();
// 解码实际上在数据接入的时候已经解码,现在只不过是对msg做了强转
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null || inputChannel.isReleased()) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return;
}
try {
// 就当层层调用吗
decodeBufferOrEvent(inputChannel, bufferOrEvent);
} catch (Throwable t) {
inputChannel.onError(t);
}
} else if (msgClazz == NettyMessage.ErrorResponse.class) {
// 省略 .....
} else {
throw new IllegalStateException( "Received unknown message from producer: " + msg.getClass());
}
}
private void decodeBufferOrEvent(
RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent)
throws Throwable {
if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else if (bufferOrEvent.getBuffer() != null) {
// 如果有buffer调用inputChannel的onBuffer方法
// inputChannel此时是RemoteInputChannel
// 主要就是将buffer加入到队列中
inputChannel.onBuffer(
bufferOrEvent.getBuffer(), bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else {
throw new IllegalStateException(
"The read buffer is null in credit-based input channel.");
}
}
}
// RemoteInputChannel.onBuffer
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
boolean recycleBuffer = true;
try {
if (expectedSequenceNumber != sequenceNumber) {
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
return;
}
final boolean wasEmpty;
boolean firstPriorityEvent = false;
synchronized (receivedBuffers) {
NetworkActionsLogger.traceInput(
"RemoteInputChannel#onBuffer",
buffer,
inputGate.getOwningTaskName(),
channelInfo,
channelStatePersister,
sequenceNumber);
if (isReleased.get()) {
return;
}
wasEmpty = receivedBuffers.isEmpty();
SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber);
DataType dataType = buffer.getDataType();
if (dataType.hasPriority()) {
firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
recycleBuffer = false;
} else {
receivedBuffers.add(sequenceBuffer);
recycleBuffer = false;
channelStatePersister.maybePersist(buffer);
if (dataType.requiresAnnouncement()) {
firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer));
}
}
++expectedSequenceNumber;
}
if (firstPriorityEvent) {
notifyPriorityEvent(sequenceNumber);
}
// 改代码前面遇到过很多次了
if (wasEmpty) {
// 通知channel有数据,将channel加入可用channel,和前面的代码一样
// 都是调用到了inputGate的notifyChannelNonEmpty方法
// 并唤醒在inputChannelsWithData等待的线程
notifyChannelNonEmpty();
}
if (backlog >= 0) {
onSenderBacklog(backlog);
}
} finally {
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
上面已经完成了数据进入队列和可用channel进入队列,后面的内容与tm内数据交互基本一直只不过在请求inputChannel的时候从LocalInputChannel变成了RemotInputChannel,其余逻辑都是相同的
到这里我们已经把数据交换的内容已经讲解完成,下面是图解,对于里面的内容在代码中都有体现