本文参考了 Apache IoTDB 社区成员田原和王中的设计文档,由于飞书链接限制,本文没有贴出参考链接。
背景
Apache IoTDB 查询引擎目前采用 MPP 架构,一条查询 SQL 大致会经历下图几个阶段:
FragmentInstance 是分布式计划被拆分后实际分发到各个节点进行执行的实例。这些被拆分出的 FragmentInstance 逻辑上仍然构成一个树形结构,父亲结点需要子结点的输出作为输入(即下游 FragmentInstance 需要接收上游 FragmentInstance 的输出作为输入)来完成相应的逻辑。
由于 FragmentInstance 可能被分发到不同节点,数据传输需要进行网络通信并且具有依赖关系,需要对 FragmentInstance 之间的数据传输进行管理,因此引入了数据异步传输模块。该模块可以看成是火山模型中 ExchangeOperator 的一种实现方式,使用了生产者消费者模型。
重要概念
- ISinkHandle:通常每一个 FragmentInstance 持有一个 ISinkHandle,用于向上游 FragmentInstance 异步传输计算结果。
- ExchangeOperator:FragmentInstance 间数据传输逻辑和 FragmentInstance 的执行逻辑是解耦的,FragmentInstance 的算子树结点可能存在 ExchangeOperator,ExchangeOperator 持有 ISourceHandle,可以从上游获取输入。
- ISourceHandle:与 ISinkHandle 一一对应,接收 ISinkHandle 的计算结果,传给 ExchangeOperator。
- MPPDataExchangeManager:当前节点数据传输模块的管理中心,持有线程资源,是 ISinkHandle 和 ISourceHandle 交互的中间站。
具体实现
MPPDataExchangeManager
MppDataExchangeManager 是当前节点数据传输模块的管理中心,有下述职责:
- 负责创建 ISinkHandle 和 ISourceHandle。FragmentInstance 需要通过 MPPDataExchangeManager 创建 ISinkHandle 和 ISourceHandle,MPPDataExchangeManager 维护了两个 Map。
// FragmentInstance 可能有多个 ExchangeOperator,进而有多个 ISourceHandle
// 因此这里的 Map 是一个两层 Map,即 FragmentInstance -> PlanNodeID -> ISourceHandle
private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
// FragmentInstance -> SinkHandle
private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
- 定义了 SinkHandleListener 和 SourceHandleListener。ISinkHandle 和 ISourceHandle 定义了 abort(), close() 等方法, SinkHandleListener 和 SourceHandleListener 会在这些方法里被使用,用于通知相应 FragmentInstance 以及更新前述两个 Map。
- 实现了 MPPDataExchangeService.Iface。不同节点间通过 Thrift RPC 通信,MPPDataExchangeService.Iface 定义了 SinkHandle 和 SourceHandle 的交互接口,接口具体逻辑将在下文分析。
SinkHandle 和 SourceHandle
SinkHandle 和 SourceHandle 是 ISinkHandle 和 ISourceHandle 的一组实现类,用于不同节点间 FragmentInstance 的数据通信。
SinkHandle 和 SourceHandle 的数据通信主要分为三步:
- 每产生一个 TsBlock,SinkHandle 向 SourceHandle 发送一个 NewDataBlockEvent,包含该 TsBlock 的sequenceId 以及所占内存大小(如果再无新的数据产生,则发送一个EndOfDataBlockEvent)。在接收到 SourceHandle 对该 TsBlock 的 ack 之前,保存该 TsBlock。
- SourceHandle 收到 NewDataBlockEvent后,在内存中选取一段连续区间的 sequenceId,向 SinkHandle 发起拉取数据的请求。
- SourceHandle 拉取到数据后,向 SinkHandle 发送 ack 消息,SinkHandle 收到 ack 消息后,便可以将对应的TsBlock 释放。
- img
首先来看 SinkHandle 发送数据的逻辑(只有 SinkHandle 的 isFull() 返回的 Future 被 complete 后,send 方法才会被调用,具体可以参考 Driver#processInternal):
@Override
public synchronized void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
Validate.notNull(tsBlock, "tsBlocks is null");
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
}
if (noMoreTsBlocks) {
return;
}
long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
blocked =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
retainedSizeInBytes,
maxBytesCanReserve)
.left;
bufferRetainedSizeInBytes += retainedSizeInBytes;
sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
nextSequenceId += 1;
currentTsBlockSize = retainedSizeInBytes;
// TODO: consider merge multiple NewDataBlockEvent for less network traffic.
submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
} finally {
QUERY_METRICS.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
}
- SinkHandle 在初始化的时候就会向内存池申请内存,此时会初始化 blocked 这个 Future。
- 进入 send 方法说明 blocked.isDone() == true,send 并不会直接发送 TsBlock,而是发送 NewDataBlockEventTask,SourceHandle 后续会通过 sequenceId 拉取指定的 TsBlock。
- send() 用这次发送的 TsBlock 的大小来估计下一次要发送的 TsBlock 的大小,所以 16 -25 行更新 blocked 时使用的是当前 TsBlock 的 retainedSizeInBytes。
下面来看 SourceHandle 拉取 TsBlock 的逻辑,可以直接参考注释:
private synchronized void trySubmitGetDataBlocksTask() {
if (aborted || closed) {
return;
}
if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
return;
}
final int startSequenceId = nextSequenceId;
int endSequenceId = nextSequenceId;
long reservedBytes = 0L;
Pair<ListenableFuture<Void>, Boolean> pair = null;
long blockedSize = 0L;
// 选取一段连续的 sequenceId
while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) {
Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId);
if (bytesToReserve == null) {
throw new IllegalStateException("Data block size is null.");
}
// 从内存池申请内存
pair =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
bytesToReserve,
maxBytesCanReserve);
bufferRetainedSizeInBytes += bytesToReserve;
endSequenceId += 1;
reservedBytes += bytesToReserve;
// 没有申请到内存,跳出循环
if (!pair.right) {
blockedSize = bytesToReserve;
break;
}
}
if (pair == null) {
// Next data block not generated yet. Do nothing.
return;
}
nextSequenceId = endSequenceId;
// 注册回调函数,在申请内存的 future 被 complete(表明内存被申请到了)时拉取指定 sequenceId 的 TsBlock
if (!pair.right) {
endSequenceId--;
reservedBytes -= blockedSize;
// The future being not completed indicates,
// 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId).
// 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked.
// 3. Have not reserve memory for the rest of blocks.
//
// startSequenceId endSequenceId - 1 endSequenceId
// |-------- reserved --------|--- blocked ---|--- not reserved ---|
// Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks.
blockedOnMemory = pair.left;
final int blockedSequenceId = endSequenceId;
final long blockedRetainedSize = blockedSize;
blockedOnMemory.addListener(
() ->
executorService.submit(
new GetDataBlocksTask(
blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)),
executorService);
}
if (endSequenceId > startSequenceId) {
executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
}
}
LocalSinkHandle 和 LocalSourceHandle
LocalSinkHandle 和 LocalSourceHandle 是 ISinkHandle 和 ISourceHandle 的另一组实现类,用于同一节点不同 FragmentInstance 的数据通信。不复用 SinkHandle 和 SourceHandle 是因为同一节点没必要再使用 RPC 通信,可以节省网络开销。
LocalSinkHandle 和 LocalSourceHandle 通过一个共享的阻塞队列 SharedTsBlockQueue 进行通信。
LocalSinkHandle 的发送逻辑(只有 LocalSinkHandle 的 isFull() 返回的 Future 被 complete 后才会发送,直接往 queue 里放 TsBlock):
@Override
public void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
Validate.notNull(tsBlock, "tsBlocks is null");
synchronized (this) {
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
}
}
synchronized (queue) {
if (queue.hasNoMoreTsBlocks()) {
return;
}
logger.debug("[StartSendTsBlockOnLocal]");
synchronized (this) {
blocked = queue.add(tsBlock);
}
}
} finally {
QUERY_METRICS.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
LocalSourceHandle 的拉取逻辑(只有 isBlocked() 返回的 Future complete 时才会被调用):
@Override
public TsBlock receive() {
long startTime = System.nanoTime();
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
if (!queue.isBlocked().isDone()) {
throw new IllegalStateException("Source handle is blocked.");
}
TsBlock tsBlock;
synchronized (queue) {
tsBlock = queue.remove();
}
if (tsBlock != null) {
logger.debug(
"[GetTsBlockFromQueue] TsBlock:{} size:{}",
currSequenceId,
tsBlock.getRetainedSizeInBytes());
currSequenceId++;
}
checkAndInvokeOnFinished();
return tsBlock;
} finally {
QUERY_METRICS.recordDataExchangeCost(
SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
LocalSinkHandle 的 send 方法和 LocalSourceHandle 的 receive 方法实现都较为简单,主要通过 SharedTsBlockQueue 进行交互,下面是 SharedTsBlockQueue 的 remove 和 add 方法:
/**
* Remove a tsblock from the head of the queue and return. Should be invoked only when the future
* returned by {@link #isBlocked()} completes.
*/
public TsBlock remove() {
if (closed) {
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
// corresponding LocalSinkHandle.
if (sinkHandle != null) {
sinkHandle.checkAndInvokeOnFinished();
}
// 释放当前 TsBlock 在 MemoryPool 中占用的内存
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
}
return tsBlock;
}
/**
* Add tsblocks to the queue. Except the first invocation, this method should be invoked only when
* the returned future of last invocation completes.
*/
public ListenableFuture<Void> add(TsBlock tsBlock) {
if (closed) {
logger.warn("queue has been destroyed");
return immediateVoidFuture();
}
Validate.notNull(tsBlock, "TsBlock cannot be null");
Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
Pair<ListenableFuture<Void>, Boolean> pair =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
tsBlock.getRetainedSizeInBytes(),
maxBytesCanReserve);
blockedOnMemory = pair.left;
bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
// reserve memory failed, we should wait until there is enough memory
if (!pair.right) {
blockedOnMemory.addListener(
() -> {
synchronized (this) {
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
}
}
},
directExecutor());
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
}
}
return blockedOnMemory;
MemoryPool
由于采用异步传输机制,SinkHandle 在实际发送数据前需先将计算好的 TsBlock 保留在内存中,SourceHandle 在接收 TsBlock 前也需要先预留内存,为了对数据传输模块占用的内存进行管理,SinkHandle 和 SourceHandle 需要通过 MemoryPool 申请内存。
每个节点持有一个 MemoryPool,大小由配置参数决定:
public class LocalMemoryManager {
private final MemoryPool queryPool;
public LocalMemoryManager() {
queryPool =
new MemoryPool(
"query",
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange(),
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
}
public MemoryPool getQueryPool() {
return queryPool;
}
public MemoryPool(String id, long maxBytes, long maxBytesPerFragmentInstance) {
this.id = Validate.notNull(id);
Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero: %d", maxBytes);
// maxBytes 是整个 pool 最大可以使用的内存容量
this.maxBytes = maxBytes;
Validate.isTrue(
maxBytesPerFragmentInstance > 0L && maxBytesPerFragmentInstance <= maxBytes,
"max bytes per query should be greater than zero while less than or equal to max bytes. maxBytesPerQuery: %d, maxBytes: %d",
maxBytesPerFragmentInstance,
maxBytes);
// maxBytesPerFragmentInstance 是单个 FragmentInstance 的 ISinkHandle 和 ISourceHandle
// 占用内存之和的最大值
this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
}
ISinkHandle 和 ISourceHandle 通过 MemoryPool 的 reserve 方法申请内存,reserve 在判断是否申请成功时进行两层判断:
- 首先判断申请的内存会不会超过 MemoryPool 最大限制,maxBytes - reservedBytes < bytesToReserve 表明超过限制。
- 每一个 ISinkHandle/ISourceHandle 能申请的内存也有限制,第二层判断申请的内存会不会超过调用 reserve 方法的 ISinkHandle/ISourceHandle 的限制,即 27 - 32 行逻辑。
如果申请成功,则更新 MemoryPool 已使用的内存以及该 ISinkHandle/ISourceHandle 占用的内存(更新 queryMemoryReservations),然后返回 Futures.immediateFuture(null);
如果申请失败,则创建一个 MemoryReservationFuture,加入维持的 list,当调用 MemoryPool#free 释放内存的时候,会选取 list 中的 future 进行 complete。
下面是 reserve 方法的源码:
/** @return if reserve succeed, pair.right will be true, otherwise false */
public Pair<ListenableFuture<Void>, Boolean> reserve(
String queryId,
String fragmentInstanceId,
String planNodeId,
long bytesToReserve,
long maxBytesCanReserve) {
Validate.notNull(queryId);
Validate.notNull(fragmentInstanceId);
Validate.notNull(planNodeId);
Validate.isTrue(
bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
"bytes should be greater than zero while less than or equal to max bytes per fragment instance: %d",
bytesToReserve);
if (bytesToReserve > maxBytesCanReserve) {
LOGGER.warn(
"Cannot reserve {} bytes memory from MemoryPool for planNodeId{}",
bytesToReserve,
planNodeId);
throw new IllegalArgumentException(
"Query is aborted since it requests more memory than can be allocated.");
}
ListenableFuture<Void> result;
synchronized (this) {
if (maxBytes - reservedBytes < bytesToReserve
|| maxBytesCanReserve
- queryMemoryReservations
.getOrDefault(queryId, Collections.emptyMap())
.getOrDefault(fragmentInstanceId, Collections.emptyMap())
.getOrDefault(planNodeId, 0L)
< bytesToReserve) {
LOGGER.debug(
"Blocked reserve request: {} bytes memory for planNodeId{}",
bytesToReserve,
planNodeId);
result =
MemoryReservationFuture.create(
queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
return new Pair<>(result, Boolean.FALSE);
} else {
reservedBytes += bytesToReserve;
queryMemoryReservations
.computeIfAbsent(queryId, x -> new HashMap<>())
.computeIfAbsent(fragmentInstanceId, x -> new HashMap<>())
.merge(planNodeId, bytesToReserve, Long::sum);
result = Futures.immediateFuture(null);
return new Pair<>(result, Boolean.TRUE);
}
}
}
调用 MemoryPool#free 时,首先会更新 MemoryPool 占用的内存和 ISinkHanlde/ISourceHandle 占用的内存。
然后会遍历 memoryReservationFutures 查看可以 complete 的 Future:
public void free(String queryId, String fragmentInstanceId, String planNodeId, long bytes) {
List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
synchronized (this) {
Validate.notNull(queryId);
Validate.isTrue(bytes > 0L);
Long queryReservedBytes =
queryMemoryReservations
.getOrDefault(queryId, Collections.emptyMap())
.getOrDefault(fragmentInstanceId, Collections.emptyMap())
.get(planNodeId);
Validate.notNull(queryReservedBytes);
Validate.isTrue(bytes <= queryReservedBytes);
queryReservedBytes -= bytes;
if (queryReservedBytes == 0) {
queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
} else {
queryMemoryReservations
.get(queryId)
.get(fragmentInstanceId)
.put(planNodeId, queryReservedBytes);
}
reservedBytes -= bytes;
if (memoryReservationFutures.isEmpty()) {
return;
}
Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
while (iterator.hasNext()) {
MemoryReservationFuture<Void> future = iterator.next();
if (future.isCancelled() || future.isDone()) {
continue;
}
long bytesToReserve = future.getBytesToReserve();
String curQueryId = future.getQueryId();
String curFragmentInstanceId = future.getFragmentInstanceId();
String curPlanNodeId = future.getPlanNodeId();
// check total reserved bytes in memory pool
if (maxBytes - reservedBytes < bytesToReserve) {
continue;
}
// check total reserved bytes of one Sink/Source handle
if (future.getMaxBytesCanReserve()
- queryMemoryReservations
.getOrDefault(curQueryId, Collections.emptyMap())
.getOrDefault(curFragmentInstanceId, Collections.emptyMap())
.getOrDefault(curPlanNodeId, 0L)
>= bytesToReserve) {
reservedBytes += bytesToReserve;
queryMemoryReservations
.computeIfAbsent(curQueryId, x -> new HashMap<>())
.computeIfAbsent(curFragmentInstanceId, x -> new HashMap<>())
.merge(curPlanNodeId, bytesToReserve, Long::sum);
futureList.add(future);
iterator.remove();
}
}
}
// why we need to put this outside MemoryPool's lock?
// If we put this block inside the MemoryPool's lock, we will get deadlock case like the
// following:
// Assuming that thread-A: LocalSourceHandle.receive() -> A-SharedTsBlockQueue.remove() ->
// MemoryPool.free() (hold MemoryPool's lock) -> future.set(null) -> try to get
// B-SharedTsBlockQueue's lock
// thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() (hold
// B-SharedTsBlockQueue's lock) -> try to get MemoryPool's lock
for (MemoryReservationFuture<Void> future : futureList) {
try {
future.set(null);
} catch (Throwable t) {
// ignore it, because we still need to notify other future
LOGGER.error("error happened while trying to free memory: ", t);
}
}
}
总结
上述流程存在几点问题:
- SourceHandle 无法对事件到达做任务顺序的假设,导致 SourceHandle 的编写有些复杂,需要考虑事件乱序到达的情况。
- 假设 NewDataBlockEvent 事件顺序到达,且 SourceHandle 的消费速度与 SinkHandle 的生成速度一致,则传输一个 TsBlock,需3次 RPC,网络开销较大:
- SinkHandle 发送 NewDataBlockEvent
- SourceHandle 拉取 NewDataBlockEvent 对应 sequenceId的 TsBlock
- SourceHandle 成功拉取到 TsBlock 后,发送 ack 消息
传输一个 TsBlock 需要三次 rpc 通信的设计初衷:
- 控制数据传输线程数量,并且不让一个 SourceHandle 占据一个线程过久,以致其他 SourceHandle 无法发起数据传输请求:
- 因为 thrift rpc 是同步的,所以如果没有 NewDataBlockEvent,SourceHandle 盲目去拉取数据,那么可能SinkHandle 端数据还未准备好。如果阻塞等待 SinkHandle 产生数据后,返回此次 rpc 结果,会导致一个SourceHandle 占据一个数据传输线程过久,在数据传输线程总数一定的情况下,其他 SourceHandle 的请求会得不到及时处理。
- 所以需要依赖 SinkHandle 的 NewDataBlockEvent 通知,在 SinkHandle 数据准备好的时候,发送一个 rpc 去通知 SourceHandle 拉取数据。此时 SourceHandle 拉取数据的 rpc 虽然也是同步的,但是 SinkHandle 端的数据一定是准备好的,所以该 rpc 同步阻塞占用数据传输线程的时间很短,不会导致 SourceHandle的请求过多等待
- 防止 TsBlock 丢失,启用 ack 消息去做容错
- 如果 SinkHandle 在收到 SourceHandle 拉取请求的 rpc 后,就将对应的 TsBlock 释放,那么会存在数据丢失的风险:网络问题导致此次 rpc 失败,虽然 SourceHandle 那边会重试,但是 SinkHandle 在处理上次 rpc 请求时,已经把对应的 TsBlock 释放掉了,导致 SourceHandle 的重试也是徒劳无功。
- 内存控制实现较为简单,因为在拉取数据之前就已经得知每个 TsBlock 的大小,所以可以判断 SourceHandle 端内存是否足够,足够了才去拉取,无需提前预分配内存给查询,每次即时申请即可。