前言
最近在进行StarRocks与数据湖集成方面的一些工作(重点是SR 3.2与Paimon 0.6的适配),同时阅读和修改了部分代码,发现StarRocks JNI Connector是个称得上精妙的模块,为各类大数据组件适配StarRocks提供了方便的入口,从而丰富其联邦查询能力。本文尝试简单分析一下它的设计思路,并通过Paimon Reader看一下StarRocks的数据湖Reader是如何通过它实现的。
总体设计
StarRocks JNI Connector背后的思想比较简单,就是介于C++-based的BE和Java-based的大数据组件之间的抽象中间层,可以直接复用Java SDK,规避了对BE代码的侵入以及使用C++访问大数据存储的诸多不便。示意图如下。
可见,JNI Connector在Java一侧提供了统一的规约,接入方实现open() / getNext() / close()
三个方法(均位于ConnectorScanner
抽象类下),并提供必要的信息(如数据类型等),就可以将数据读出。在JNI Connector内部,会把要处理的数据写入C++能识别到的native memory区域(对于Java来说则是堆外内存)。BE侧通过读取这部分内存进行Scan操作。ConnectorScanner
抽象类的代码如下。
public abstract class ConnectorScanner {
private OffHeapTable offHeapTable;
private String[] fields;
private ColumnType[] types;
private int tableSize;
/**
* Initialize the reader with parameters passed by the class constructor and allocate necessary resources.
* Developers can call {@link ConnectorScanner#initOffHeapTableWriter(ColumnType[], String[], int)} method here
* to allocate memory spaces.
*/
public abstract void open() throws IOException;
/**
* Close the reader and release resources.
*/
public abstract void close() throws IOException;
/**
* Scan original data and save it to off-heap table.
*
* @return The number of rows scanned.
* The specific implementation needs to call the {@link ConnectorScanner#appendData(int, Object)} method
* to save data to off-heap table.
* The number of rows scanned must less than or equal to {@link ConnectorScanner#tableSize}
*/
public abstract int getNext() throws IOException;
/**
* This method need be called before {@link ConnectorScanner#getNext()}
*
* @param requiredTypes column types to scan
* @param requiredFields columns names to scan
* @param fetchSize number of rows
*/
protected void initOffHeapTableWriter(ColumnType[] requiredTypes, String[] requiredFields, int fetchSize) {
this.tableSize = fetchSize;
this.types = requiredTypes;
this.fields = requiredFields;
}
protected void appendData(int index, ColumnValue value) {
offHeapTable.appendData(index, value);
}
protected int getTableSize() {
return tableSize;
}
public OffHeapTable getOffHeapTable() {
return offHeapTable;
}
public long getNextOffHeapChunk() throws IOException {
initOffHeapTable();
int numRows = 0;
try {
numRows = getNext();
} catch (IOException e) {
releaseOffHeapTable();
throw e;
}
return finishOffHeapTable(numRows);
}
private void initOffHeapTable() {
offHeapTable = new OffHeapTable(types, fields, tableSize);
}
private long finishOffHeapTable(int numRows) {
offHeapTable.setNumRows(numRows);
return offHeapTable.getMetaNativeAddress();
}
protected void releaseOffHeapColumnVector(int fieldId) {
offHeapTable.releaseOffHeapColumnVector(fieldId);
}
protected void releaseOffHeapTable() {
if (offHeapTable != null) {
offHeapTable.close();
}
}
}
从上述设计和代码可以看出,JNI Connector需要特别关注的点有两个:一是如何在读取时兼容不同大数据组件的存储类型(ColumnValue
和ColumnType
),二是如何保证BE侧正确而高效地访问包含外表数据的内存区域(OffHeapColumnVector
和OffHeapTable
)。下面分别讨论。
类型兼容性
JNI Connector设计了接口ColumnValue
用来表示不同组件的不同数据类型的取值规范,目前有三种实现,分别对应Hive、Hudi和Paimon,类图如下。
可见是提供了对常见基础类型和复合类型的支持。以paimon-reader模块中的PaimonColumnValue
为例,部分基础类型取值的部分代码如下。
@Override
public long getLong() {
return (long) fieldData;
}
@Override
public double getDouble() {
return (double) fieldData;
}
@Override
public String getString(ColumnType.TypeValue type) {
if (type == ColumnType.TypeValue.DATE) {
int epoch = (int) fieldData;
LocalDate date = LocalDate.ofEpochDay(epoch);
return PaimonScannerUtils.formatDate(date);
} else {
return fieldData.toString();
}
}
@Override
public String getTimestamp(ColumnType.TypeValue type) {
if (type == ColumnType.TypeValue.DATETIME_MILLIS) {
Timestamp ts = (Timestamp) fieldData;
LocalDateTime dateTime = ts.toLocalDateTime();
return PaimonScannerUtils.formatDateTime(dateTime);
} else {
return fieldData.toString();
}
}
对于复合类型(即Array、Map和Struct),则需要分别处理每个元素,如PaimonColumnValue#unpackMap()
方法:
@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
InternalMap map = (InternalMap) fieldData;
DataType keyType;
DataType valueType;
if (dataType instanceof MapType) {
keyType = ((MapType) dataType).getKeyType();
valueType = ((MapType) dataType).getValueType();
} else {
throw new UnsupportedOperationException("Unsupported type: " + dataType);
}
InternalArray keyArray = map.keyArray();
toPaimonColumnValue(keys, keyArray, keyType);
InternalArray valueArray = map.valueArray();
toPaimonColumnValue(values, valueArray, valueType);
}
private void toPaimonColumnValue(List<ColumnValue> values, InternalArray array, DataType dataType) {
for (int i = 0; i < array.size(); i++) {
PaimonColumnValue cv = null;
Object o = InternalRowUtils.get(array, i, dataType);
if (o != null) {
cv = new PaimonColumnValue(o, dataType);
}
values.add(cv);
}
}
而上文中出现的ColumnType
类主要是显式声明了StarRocks支持的19种定长基础类型(INT、LONG、TIMESTAMP等等)、2种变长基础类型(VARCHAR、BINARY)和3种复合类型,以及提供了解析复合类型的helper方法,读者可自行参考代码。
堆外内存布局与访问
如果想让SR BE能够处理JNI Connector读取的表数据,就必然要求堆外内存中的数据存储方法是BE原生可以识别的。BE的C++代码中对上述三类数据的原生存储列举如下:
- 定长基础类型的列
FixedLengthColumn
需要一个容器,为vector<CppType> _data
; - 变长基础类型的列
BinaryColumnBase
需要两个容器,分别为数据容器vector<uint8_t> _bytes
,以及偏移量容器vector<uint32_t> _offsets
,分别记录该列每一行的数据,以及每行数据对应的起始地址; - 复合类型的列则视情况而定,例如数组类型
ArrayColumn
需要两个容器ColumnPtr _elements
和FixedLengthColumn<uint32_t>::Ptr _offsets
(采用ColumnPtr
是为了兼容嵌套类型),而映射类型MapColumn
需要三个容器,读者可自行推测。
特别地,如果一个列可以为空,那么根据NullableColumn
的定义,还需要一个额外的空标记容器FixedLengthColumn<uint8_t>::Ptr _null_column
来存储该列每一行是否为空。
OffHeapColumnVector
基于以上知识,JNI Connector设计了基于堆外内存的外表列数据容器OffHeapColumnVector
,解释一下它的几个关键属性,简单易懂。
// 空标记,对应_null_column
private long nulls;
// 实际数据,对应_data
private long data;
// 偏移量,对应_offsets
private long offsetData;
// 初始化容量
private int capacity;
// 列类型
private ColumnType type;
// 空元素计数
private int numNulls;
// 已写入的元素数
protected int elementsAppended;
// 嵌套的OffHeapColumnVector,用于变长类型和复合类型
private OffHeapColumnVector[] childColumns;
OffHeapColumnVector
的内存分配采用了类似Spark Tungsten(之前的博客讲过)的风格,测试环境下会调用JVM Unsafe API,正式环境则会调用SR BE的Memory Tracker Native API。通过reserveInternal()
方法,我们可以清楚地看到不同类型的内存分配逻辑。
private void reserveInternal(int newCapacity) {
int oldCapacity = (nulls == 0L) ? 0 : capacity;
long oldOffsetSize = (nulls == 0) ? 0 : (capacity + 1) * 4L;
long newOffsetSize = (newCapacity + 1) * 4L;
int typeSize = type.getPrimitiveTypeValueSize();
if (type.isUnknown()) {
// don't do anything.
} else if (typeSize != -1) {
this.data = Platform.reallocateMemory(data, oldCapacity * typeSize, newCapacity * typeSize);
} else if (type.isByteStorageType()) {
this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize);
int childCapacity = newCapacity * DEFAULT_STRING_LENGTH;
this.childColumns = new OffHeapColumnVector[1];
this.childColumns[0] = new OffHeapColumnVector(childCapacity, new ColumnType(type.name + "#data",
ColumnType.TypeValue.BYTE));
} else if (type.isArray() || type.isMap() || type.isStruct()) {
if (type.isArray() || type.isMap()) {
this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize);
}
int size = type.childTypes.size();
this.childColumns = new OffHeapColumnVector[size];
for (int i = 0; i < size; i++) {
this.childColumns[i] = new OffHeapColumnVector(newCapacity, type.childTypes.get(i));
}
} else {
throw new RuntimeException("Unhandled type: " + type);
}
this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity);
Platform.setMemory(nulls + oldCapacity, (byte) 0, newCapacity - oldCapacity);
capacity = newCapacity;
if (offsetData != 0) {
// offsetData[0] == 0 always.
// we have to set it explicitly otherwise it's undefined value here.
Platform.putInt(null, offsetData, 0);
}
}
来看一下写入和读取定长类型的方法,以INT为例,同样是Spark Tungsten风格,如同操作C++指针。Platform
类实际上就是从Spark中的同名类稍加修改而来,getInt()
和putInt()
方法均是直接调用Unsafe API。
public int appendInt(int v) {
reserve(elementsAppended + 1);
putInt(elementsAppended, v);
return elementsAppended++;
}
private void putInt(int rowId, int value) {
Platform.putInt(null, data + 4L * rowId, value);
}
public int getInt(int rowId) {
return Platform.getInt(null, data + 4L * rowId);
}
至于变长类型,则需要额外写入每行的偏移量信息,读取时根据偏移量从字节流中取出对应的区块并转化即可。
private int appendByteArray(byte[] value, int offset, int length) {
int copiedOffset = arrayData().appendBytes(length, value, offset);
reserve(elementsAppended + 1);
putArrayOffset(elementsAppended, copiedOffset, length);
return elementsAppended++;
}
private void putArrayOffset(int rowId, int offset, int length) {
Platform.putInt(null, offsetData + 4L * rowId, offset);
Platform.putInt(null, offsetData + 4L * (rowId + 1), offset + length);
}
public String getUTF8String(int rowId) {
if (isNullAt(rowId)) {
return null;
}
int start = getArrayOffset(rowId);
int end = getArrayOffset(rowId + 1);
int size = end - start;
byte[] bytes = arrayData().getBytes(start, size);
return new String(bytes, StandardCharsets.UTF_8);
}
复合类型涉及到的更多是childColumns
的操作,与上面的思想相通,此处不再赘述。
OffHeapTable
顾名思义,OffHeapTable
是统一管理一张表对应的所有OffHeapColumnVector
的组件。它的实现也非常简洁,部分代码摘录如下。
public class OffHeapTable {
public OffHeapColumnVector[] vectors;
public String[] fields;
public OffHeapColumnVector meta;
public int numRows;
public boolean[] released;
public OffHeapTable(ColumnType[] types, String[] fields, int capacity) {
this.fields = fields;
this.vectors = new OffHeapColumnVector[types.length];
this.released = new boolean[types.length];
int metaSize = 0;
for (int i = 0; i < types.length; i++) {
vectors[i] = new OffHeapColumnVector(capacity, types[i]);
metaSize += types[i].computeColumnSize();
released[i] = false;
}
this.meta = new OffHeapColumnVector(metaSize, new ColumnType("#meta", ColumnType.TypeValue.LONG));
this.numRows = 0;
}
public void appendData(int fieldId, ColumnValue o) {
vectors[fieldId].appendValue(o);
}
public void releaseOffHeapColumnVector(int fieldId) {
if (!released[fieldId]) {
vectors[fieldId].close();
released[fieldId] = true;
}
}
public long getMetaNativeAddress() {
meta.appendLong(numRows);
for (OffHeapColumnVector v : vectors) {
v.updateMeta(meta);
}
return meta.valuesNativeAddress();
}
}
除了列名称、行数、释放标记等必要信息,需要特别注意的是,OffHeapTable
还额外维护了一个名为meta
的存储元数据的OffHeapColumnVector
,里面存有各个数据容器的起始内存地址,方便快速定位。更新元数据的操作如下所示。
public void updateMeta(OffHeapColumnVector meta) {
if (type.isUnknown()) {
meta.appendLong(0);
} else if (type.isByteStorageType()) {
meta.appendLong(nullsNativeAddress());
meta.appendLong(arrayOffsetNativeAddress());
meta.appendLong(arrayDataNativeAddress());
} else if (type.isArray() || type.isMap() || type.isStruct()) {
meta.appendLong(nullsNativeAddress());
if (type.isArray() || type.isMap()) {
meta.appendLong(arrayOffsetNativeAddress());
}
for (OffHeapColumnVector c : childColumns) {
c.updateMeta(meta);
}
} else {
meta.appendLong(nullsNativeAddress());
meta.appendLong(valuesNativeAddress());
}
}
从Paimon Reader到BE Scan
介绍完类型兼容和堆外内存访问的设计,接下来就可以通过Paimon Reader中的PaimonSplitScanner
看看JNI Connector是如何与BE联动的。
JNI Connector强制要求ConnectorScanner
的实现类传入两个固定的构造参数,分别是读取的数据行数,以及表类型特定的参数(如列信息、谓词条件等):
public PaimonSplitScanner(int fetchSize, Map<String, String> params) {
this.fetchSize = fetchSize;
this.requiredFields = params.get("required_fields").split(",");
this.nestedFields = params.getOrDefault("nested_fields", "").split(",");
this.splitInfo = params.get("split_info");
this.predicateInfo = params.get("predicate_info");
this.encodedTable = params.get("native_table");
this.classLoader = this.getClass().getClassLoader();
}
前面已经提到接入方需要实现open() / getNext() / close()
三个方法,来看下PaimonSplitScanner#open()
方法的实现。
@Override
public void open() throws IOException {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
table = PaimonScannerUtils.decodeStringToObject(encodedTable);
parseRequiredTypes();
initOffHeapTableWriter(requiredTypes, requiredFields, fetchSize);
initReader();
} catch (Exception e) {
close();
String msg = "Failed to open the paimon reader.";
LOG.error(msg, e);
throw new IOException(msg, e);
}
}
private void initReader() throws IOException {
ReadBuilder readBuilder = table.newReadBuilder();
RowType rowType = table.rowType();
List<String> fieldNames = PaimonScannerUtils.fieldNames(rowType);
int[] projected = Arrays.stream(requiredFields).mapToInt(fieldNames::indexOf).toArray();
readBuilder.withProjection(projected);
List<Predicate> predicates = PaimonScannerUtils.decodeStringToObject(predicateInfo);
readBuilder.withFilter(predicates);
Split split = PaimonScannerUtils.decodeStringToObject(splitInfo);
RecordReader<InternalRow> reader = readBuilder.newRead().executeFilter().createReader(split);
iterator = new RecordReaderIterator<>(reader);
}
可见,open()
方法通过解析表名,获取列及类型信息,创建OffHeapTable
实例,并通过调用Paimon SDK中的相关方法构造带有列裁剪、谓词下推等信息的RecordReader
实例,最终产生实际读取Paimon数据的迭代器。getNext()
方法就通过此迭代器读取数据,并转换为定义好的PaimonColumnValue
实例,然后调用基类的方法将其写入各个OffHeapColumnVector
,水到渠成。
// 最终会被ConnectorScanner#getNextOffHeapChunk()调用
@Override
public int getNext() throws IOException {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
int numRows = 0;
while (iterator.hasNext() && numRows < fetchSize) {
InternalRow row = iterator.next();
if (row == null) {
break;
}
for (int i = 0; i < requiredFields.length; i++) {
Object fieldData = InternalRowUtils.get(row, i, logicalTypes[i]);
if (fieldData == null) {
appendData(i, null);
} else {
ColumnValue fieldValue = new PaimonColumnValue(fieldData, logicalTypes[i]);
appendData(i, fieldValue);
}
}
numRows++;
}
return numRows;
} catch (Exception e) {
close();
String msg = "Failed to get the next off-heap table chunk of paimon.";
LOG.error(msg, e);
throw new IOException(msg, e);
}
}
最后一步,来看下BE是如何利用上面讲到的所有内容的。BE侧对应的C++类名为JniScanner
,而在JniScanner::_init_jni_table_scanner()
方法中,我们可以看到通过ScannerFactory
工厂实例(Java代码略)获取对应的ConnectorScanner
实例及其参数的逻辑:
Status JniScanner::_init_jni_table_scanner(JNIEnv* _jni_env, RuntimeState* runtime_state) {
jclass scanner_factory_class = _jni_env->FindClass(_jni_scanner_factory_class.c_str());
jmethodID scanner_factory_constructor = _jni_env->GetMethodID(scanner_factory_class, "<init>", "()V");
jobject scanner_factory_obj = _jni_env->NewObject(scanner_factory_class, scanner_factory_constructor);
jmethodID get_scanner_method =
_jni_env->GetMethodID(scanner_factory_class, "getScannerClass", "()Ljava/lang/Class;");
_jni_scanner_cls = (jclass)_jni_env->CallObjectMethod(scanner_factory_obj, get_scanner_method);
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to init the scanner class."));
_jni_env->DeleteLocalRef(scanner_factory_class);
_jni_env->DeleteLocalRef(scanner_factory_obj);
jmethodID scanner_constructor = _jni_env->GetMethodID(_jni_scanner_cls, "<init>", "(ILjava/util/Map;)V");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get a scanner class constructor."));
jclass hashmap_class = _jni_env->FindClass("java/util/HashMap");
jmethodID hashmap_constructor = _jni_env->GetMethodID(hashmap_class, "<init>", "(I)V");
jobject hashmap_object = _jni_env->NewObject(hashmap_class, hashmap_constructor, _jni_scanner_params.size());
jmethodID hashmap_put =
_jni_env->GetMethodID(hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get the HashMap methods."));
}
以及初始化ConnectorScanner
中open() / getNext() / close()
相关方法的逻辑:
Status JniScanner::_init_jni_method(JNIEnv* _jni_env) {
// init jmethod
_jni_scanner_open = _jni_env->GetMethodID(_jni_scanner_cls, "open", "()V");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `open` jni method"));
_jni_scanner_get_next_chunk = _jni_env->GetMethodID(_jni_scanner_cls, "getNextOffHeapChunk", "()J");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `getNextOffHeapChunk` jni method"));
_jni_scanner_close = _jni_env->GetMethodID(_jni_scanner_cls, "close", "()V");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `close` jni method"));
_jni_scanner_release_column = _jni_env->GetMethodID(_jni_scanner_cls, "releaseOffHeapColumnVector", "(I)V");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `releaseOffHeapColumnVector` jni method"));
_jni_scanner_release_table = _jni_env->GetMethodID(_jni_scanner_cls, "releaseOffHeapTable", "()V");
RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `releaseOffHeapTable` jni method"));
return Status::OK();
}
BE通过JNI调用open()
和getNextOffHeapChunk()
方法取得数据并放入native memory后,继续调用JniScanner::_fill_column()
方法获取对应列的行数据和元数据(即内存地址),再根据不同类型调用不同的append方法。需要注意,由于OffHeapColumnVector
固定包含空标记字段,所以这里通过内存数据还原出来的都是NullableColumn
,并通过调用C++的memcpy()
函数将元数据中指向的内存区域复制到实际的NullableColumn
里。
Status JniScanner::_fill_column(FillColumnArgs* pargs) {
FillColumnArgs& args = *pargs;
if (args.must_nullable && !args.column->is_nullable()) {
return Status::DataQualityError(fmt::format("NOT NULL column[{}] is not supported.", args.slot_name));
}
void* ptr = next_chunk_meta_as_ptr();
if (ptr == nullptr) {
// struct field mismatch.
args.column->append_default(args.num_rows);
return Status::OK();
}
if (args.column->is_nullable()) {
// if column is nullable, we parse `null_column`,
// and update `args.nulls` and set `data_column` to `args.column`
bool* null_column_ptr = static_cast<bool*>(ptr);
auto* nullable_column = down_cast<NullableColumn*>(args.column);
NullData& null_data = nullable_column->null_column_data();
null_data.resize(args.num_rows);
memcpy(null_data.data(), null_column_ptr, args.num_rows);
nullable_column->update_has_null();
auto* data_column = nullable_column->data_column().get();
pargs->column = data_column;
pargs->nulls = null_data.data();
} else {
// otherwise we skip this chunk meta, because in Java side
// we assume every column starts with `null_column`.
}
LogicalType column_type = args.slot_type.type;
if (column_type == LogicalType::TYPE_BOOLEAN) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_BOOLEAN>(args)));
} else if (column_type == LogicalType::TYPE_TINYINT) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_TINYINT>(args)));
} else if (column_type == LogicalType::TYPE_SMALLINT) {
RETURN_IF_ERROR((_append_primitive_data<TYPE_SMALLINT>(args)));
}
// ...以下各种类型略去...
else {
return Status::InternalError(fmt::format("Type {} is not supported for off-heap table scanner", column_type));
}
return Status::OK();
}
这样,Paimon Reader读取的数据就转交到了BE Scan流程,接下来就可以进行后续的计算了。
The End
放松一下,准备去看双红会。
民那晚安。