ORC文件的组成部分
部分 | 个数 | 存储内容 | 大小 |
---|---|---|---|
stripes | 若干个 | 若干行的数据 | 250 MB |
file footer | 1 | 见下文 | |
postscript | 1 | 见下文 | 小于255个字节 |
1. file footer
- 该文件中所有stripe的一个列表
- 每个stripe中的行数
- 每个列的数据类型
- 列级别的聚合信息, 包含 count, min, max, and sum
message Footer {
optional uint64 headerLength = 1;
optional uint64 contentLength = 2;
repeated StripeInformation stripes = 3;
repeated Type types = 4;
repeated UserMetadataItem metadata = 5;
optional uint64 numberOfRows = 6;
repeated ColumnStatistics statistics = 7;
optional uint32 rowIndexStride = 8;
}
message StripeInformation {
optional uint64 offset = 1;
optional uint64 indexLength = 2;
optional uint64 dataLength = 3;
optional uint64 footerLength = 4;
optional uint64 numberOfRows = 5;
}
message Type {
enum Kind {
BOOLEAN = 0;
BYTE = 1;
SHORT = 2;
INT = 3;
LONG = 4;
FLOAT = 5;
DOUBLE = 6;
STRING = 7;
BINARY = 8;
TIMESTAMP = 9;
LIST = 10;
MAP = 11;
STRUCT = 12;
UNION = 13;
DECIMAL = 14;
DATE = 15;
VARCHAR = 16;
CHAR = 17;
}
optional Kind kind = 1;
repeated uint32 subtypes = 2 [packed=true];
repeated string fieldNames = 3;
optional uint32 maximumLength = 4;
optional uint32 precision = 5;
optional uint32 scale = 6;
}
message UserMetadataItem {
optional string name = 1;
optional bytes value = 2;
}
message ColumnStatistics {
optional uint64 numberOfValues = 1;
optional IntegerStatistics intStatistics = 2;
optional DoubleStatistics doubleStatistics = 3;
optional StringStatistics stringStatistics = 4;
optional BucketStatistics bucketStatistics = 5;
optional DecimalStatistics decimalStatistics = 6;
optional DateStatistics dateStatistics = 7;
optional BinaryStatistics binaryStatistics = 8;
optional TimestampStatistics timestampStatistics = 9;
optional bool hasNull = 10;
}
// 一共8种统计
message IntegerStatistics {
optional sint64 minimum = 1;
optional sint64 maximum = 2;
optional sint64 sum = 3;
}
message DoubleStatistics {
optional double minimum = 1;
optional double maximum = 2;
optional double sum = 3;
}
message StringStatistics {
optional string minimum = 1;
optional string maximum = 2;
// sum will store the total length of all strings in a stripe
optional sint64 sum = 3;
}
message BucketStatistics {
repeated uint64 count = 1 [packed=true];
}
message DecimalStatistics {
optional string minimum = 1;
optional string maximum = 2;
optional string sum = 3;
}
message DateStatistics {
// min,max values saved as days since epoch
optional sint32 minimum = 1;
optional sint32 maximum = 2;
}
message TimestampStatistics {
// min,max values saved as milliseconds since epoch
optional sint64 minimum = 1;
optional sint64 maximum = 2;
}
message BinaryStatistics {
// sum will store the total binary blob length in a stripe
optional sint64 sum = 1;
}
2. postscript
message PostScript {
optional uint64 footerLength = 1;
optional CompressionKind compression = 2;
optional uint64 compressionBlockSize = 3;
// the version of the file format
// [0, 11] = Hive 0.11
// [0, 12] = Hive 0.12
repeated uint32 version = 4 [packed = true];
optional uint64 metadataLength = 5;
// Version of the writer:
// 0 (or missing) = original
// 1 = HIVE-8732 fixed
optional uint32 writerVersion = 6;
// Leave this last in the record
optional string magic = 8000;
}
enum CompressionKind {
NONE = 0;
ZLIB = 1;
SNAPPY = 2;
LZO = 3;
}
3. stripe
- index data: 包含每列的最大/最小值, 每列中行的位置
- Row data: 具体的行的数据
- stripe footer: contains a directory of stream locations
index data和row data 都被按照列来切分了
message StripeInformation {
// the start of the stripe within the file
optional uint64 offset = 1;
// the length of the indexes in bytes
optional uint64 indexLength = 2;
// the length of the data in bytes
optional uint64 dataLength = 3;
// the length of the footer in bytes
optional uint64 footerLength = 4;
// the number of rows in the stripe
optional uint64 numberOfRows = 5;
// If this is present, the reader should use this value for the encryption
// stripe id for setting the encryption IV. Otherwise, the reader should
// use one larger than the previous stripe's encryptStripeId.
// For unmerged ORC files, the first stripe will use 1 and the rest of the
// stripes won't have it set. For merged files, the stripe information
// will be copied from their original files and thus the first stripe of
// each of the input files will reset it to 1.
// Note that 1 was choosen, because protobuf v3 doesn't serialize
// primitive types that are the default (eg. 0).
optional uint64 encryptStripeId = 6;
// For each encryption variant, the new encrypted local key to use until we
// find a replacement.
repeated bytes encryptedLocalKeys = 7;
}
4. Index
ORC中提供的索引包括了3种级别的:
level | 位于ORC的哪块 | 数据内容 |
---|---|---|
file level | file footer | 整个文件的列级别的统计 |
stripe level | file footer | 每个stripe的列级别的统计 |
row level | stripe的最开始部分 | 每个RowGroup的列级别的统计和每个RowGroup的起始位置 |
这里提到的索引并不像是mysql那样的索引, 可以直接定位到具体的一样记录, 而是为了尽快的跳过文件/stripe/RowGroup
默认是每10000行的数据组成一个RowGroup, 并放置在stripe的最开始部分
message RowIndexEntry {
repeated uint64 positions = 1 [packed=true];
optional ColumnStatistics statistics = 2;
}
message RowIndex {
repeated RowIndexEntry entry = 1;
}
5. 读ORC文件
// org.apache.orc.impl.RecordReaderImpl
public class RecordReaderImpl implements RecordReader {
@Override
public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
try {
if (rowInStripe >= rowCountInStripe) {
currentStripe += 1;
if (currentStripe >= stripes.size()) {
batch.size = 0;
return false;
}
readStripe();
}
int batchSize = computeBatchSize(batch.getMaxSize());
rowInStripe += batchSize;
reader.setVectorColumnCount(batch.getDataColumnCount());
reader.nextBatch(batch, batchSize);
batch.selectedInUse = false;
batch.size = batchSize;
advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
return batch.size != 0;
} catch (IOException e) {
// Rethrow exception with file name in log message
throw new IOException("Error reading file: " + path, e);
}
}
}
读stripe
// org.apache.orc.impl.RecordReaderImpl
private long rowInStripe = 0;
private long rowCountInStripe = 0; // StripeInformation 中读取的stripe中的行数
private boolean[] includedRowGroups = null;
private void readStripe() throws IOException {
// 初始化 rowCountInStripe,rowInStripe=0
StripeInformation stripe = beginReadStripe();
// 获取要读取的RowGroup
includedRowGroups = pickRowGroups();
// move forward to the first unskipped row
//
if (includedRowGroups != null) {
while (rowInStripe < rowCountInStripe &&
!includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
}
}
// if we haven't skipped the whole stripe, read the data
if (rowInStripe < rowCountInStripe) {
// if we aren't projecting columns or filtering rows, just read it all
if (isFullRead() && includedRowGroups == null) {
readAllDataStreams(stripe);
} else {
readPartialDataStreams(stripe);
}
reader.startStripe(streams, stripeFooter);
// if we skipped the first row group, move the pointers forward
if (rowInStripe != 0) {
seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
}
}
}
6. 写ORC文件
写文件读大概套路
- 写每个stripe之前将数据尽量放在buffer里面
- 直到buffer中的数据要落磁盘的时候, 才将其按照列来写到磁盘 (这次其实也只是写到outstream中, 数据可能还会buffer在操作系统的buffer中, 可以理解为数据从用户态buffer写到了内核态buffer)
- ORC因为是列式存储, 写的时候具体使用
TreeWriter
来写数据
初始化流:
// org.apache.orc.impl.WriterImpl.java
public FSDataOutputStream getStream() throws IOException {
if (rawWriter == null) {
rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
fs.getDefaultReplication(path), blockSize);
rawWriter.writeBytes(OrcFile.MAGIC);
headerLength = rawWriter.getPos();
writer = new OutStream("metadata", bufferSize, codec,
new DirectStream(rawWriter));
protobufWriter = CodedOutputStream.newInstance(writer);
}
return rawWriter;
}
该方法在以下几个地方会调用:
flushStripe
writeMetadata
writeFooter
appendStripe
上面的3种 OutputSteam
显示一个层层递进的关系:
rawWriter
-> OutStream
-> CodedOutputStream
最后的写操作都落到rawWriter
的头上
@Override
public void addRowBatch(VectorizedRowBatch batch) throws IOException {
if (buildIndex) {
// Batch the writes up to the rowIndexStride so that we can get the
// right size indexes.
int posn = 0;
while (posn < batch.size) {
int chunkSize = Math.min(batch.size - posn,
rowIndexStride - rowsInIndex);
// 具体写入数据
treeWriter.writeRootBatch(batch, posn, chunkSize);
posn += chunkSize;
rowsInIndex += chunkSize;
rowsInStripe += chunkSize;
if (rowsInIndex >= rowIndexStride) {
createRowIndexEntry();
}
}
} else {
rowsInStripe += batch.size;
treeWriter.writeRootBatch(batch, 0, batch.size);
}
memoryManager.addedRow(batch.size);
}
通过 addRowBatch
对外提供了数据写入的接口, VectorizedRowBatch
将数据的写入变成向量化,并且分列
public class VectorizedRowBatch implements Writable {
public int numCols; // number of columns
public ColumnVector[] cols; // a vector for each column
public int size; // number of rows that qualify (i.e. haven't been filtered out)
public int[] selected; // array of positions of selected values
public int[] projectedColumns;
public int projectionSize;
private int dataColumnCount;
private int partitionColumnCount;
/// ....
}
public class WriterImpl implements Writer, MemoryManager.Callback {
@Override
public boolean checkMemory(double newScale) throws IOException {
long limit = (long) Math.round(adjustedStripeSize * newScale);
long size = estimateStripeSize();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
limit);
}
if (size > limit) {
flushStripe(); // 这里会调用writer的flush方法
return true;
}
return false;
}
}
对写入数据在内存中的占用和是否采用flush
通过实现 MemoryManager.Callback
的方法回调来实现, 具体实现的方法为:
// org.apache.orc.impl.MemoryManager.java
public class MemoryManager {
public interface Callback {
/**
* The writer needs to check its memory usage
* @param newScale the current scale factor for memory allocations
* @return true if the writer was over the limit
* @throws IOException
*/
boolean checkMemory(double newScale) throws IOException;
}
}
ref:
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC
- https://orc.apache.org/specification/ORCv1/
- https://github.com/apache/hive/blob/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
- https://orc.apache.org/docs/indexes.html
- http://lxw1234.com/archives/2016/04/632.htm