ORC文件格式和读写的部分源码

OrcFileLayout.png

ORC文件的组成部分

部分 个数 存储内容 大小
stripes 若干个 若干行的数据 250 MB
file footer 1 见下文
postscript 1 见下文 小于255个字节

1. file footer

  1. 该文件中所有stripe的一个列表
  2. 每个stripe中的行数
  3. 每个列的数据类型
  4. 列级别的聚合信息, 包含 count, min, max, and sum
message Footer {
  optional uint64 headerLength = 1;
  optional uint64 contentLength = 2;
  repeated StripeInformation stripes = 3;
  repeated Type types = 4;
  repeated UserMetadataItem metadata = 5;
  optional uint64 numberOfRows = 6;
  repeated ColumnStatistics statistics = 7;
  optional uint32 rowIndexStride = 8;
}

message StripeInformation {
  optional uint64 offset = 1;
  optional uint64 indexLength = 2;
  optional uint64 dataLength = 3;
  optional uint64 footerLength = 4;
  optional uint64 numberOfRows = 5;
}

message Type {
  enum Kind {
    BOOLEAN = 0;
    BYTE = 1;
    SHORT = 2;
    INT = 3;
    LONG = 4;
    FLOAT = 5;
    DOUBLE = 6;
    STRING = 7;
    BINARY = 8;
    TIMESTAMP = 9;
    LIST = 10;
    MAP = 11;
    STRUCT = 12;
    UNION = 13;
    DECIMAL = 14;
    DATE = 15;
    VARCHAR = 16;
    CHAR = 17;
  }
  optional Kind kind = 1;
  repeated uint32 subtypes = 2 [packed=true];
  repeated string fieldNames = 3;
  optional uint32 maximumLength = 4;
  optional uint32 precision = 5;
  optional uint32 scale = 6;
}

message UserMetadataItem {
  optional string name = 1;
  optional bytes value = 2;
}

message ColumnStatistics {
  optional uint64 numberOfValues = 1;
  optional IntegerStatistics intStatistics = 2;
  optional DoubleStatistics doubleStatistics = 3;
  optional StringStatistics stringStatistics = 4;
  optional BucketStatistics bucketStatistics = 5;
  optional DecimalStatistics decimalStatistics = 6;
  optional DateStatistics dateStatistics = 7;
  optional BinaryStatistics binaryStatistics = 8;
  optional TimestampStatistics timestampStatistics = 9;
  optional bool hasNull = 10;
}
// 一共8种统计
message IntegerStatistics  {
  optional sint64 minimum = 1;
  optional sint64 maximum = 2;
  optional sint64 sum = 3;
}

message DoubleStatistics {
  optional double minimum = 1;
  optional double maximum = 2;
  optional double sum = 3;
}

message StringStatistics {
  optional string minimum = 1;
  optional string maximum = 2;
  // sum will store the total length of all strings in a stripe
  optional sint64 sum = 3;
}

message BucketStatistics {
  repeated uint64 count = 1 [packed=true];
}

message DecimalStatistics {
  optional string minimum = 1;
  optional string maximum = 2;
  optional string sum = 3;
}

message DateStatistics {
  // min,max values saved as days since epoch
  optional sint32 minimum = 1;
  optional sint32 maximum = 2;
}

message TimestampStatistics {
  // min,max values saved as milliseconds since epoch
  optional sint64 minimum = 1;
  optional sint64 maximum = 2;
}

message BinaryStatistics {
  // sum will store the total binary blob length in a stripe
  optional sint64 sum = 1;
}

2. postscript

message PostScript {
  optional uint64 footerLength = 1;
  optional CompressionKind compression = 2;
  optional uint64 compressionBlockSize = 3;
  // the version of the file format
  //   [0, 11] = Hive 0.11
  //   [0, 12] = Hive 0.12
  repeated uint32 version = 4 [packed = true];
  optional uint64 metadataLength = 5;
  // Version of the writer:
  //   0 (or missing) = original
  //   1 = HIVE-8732 fixed
  optional uint32 writerVersion = 6;
  // Leave this last in the record
  optional string magic = 8000;
}

enum CompressionKind {
  NONE = 0;
  ZLIB = 1;
  SNAPPY = 2;
  LZO = 3;
}

3. stripe

  1. index data: 包含每列的最大/最小值, 每列中行的位置
  2. Row data: 具体的行的数据
  3. stripe footer: contains a directory of stream locations

index data和row data 都被按照列来切分了

message StripeInformation {
 // the start of the stripe within the file
 optional uint64 offset = 1;
 // the length of the indexes in bytes
 optional uint64 indexLength = 2;
 // the length of the data in bytes
 optional uint64 dataLength = 3;
 // the length of the footer in bytes
 optional uint64 footerLength = 4;
 // the number of rows in the stripe
 optional uint64 numberOfRows = 5;
 // If this is present, the reader should use this value for the encryption
 // stripe id for setting the encryption IV. Otherwise, the reader should
 // use one larger than the previous stripe's encryptStripeId.
 // For unmerged ORC files, the first stripe will use 1 and the rest of the
 // stripes won't have it set. For merged files, the stripe information
 // will be copied from their original files and thus the first stripe of
 // each of the input files will reset it to 1.
 // Note that 1 was choosen, because protobuf v3 doesn't serialize
 // primitive types that are the default (eg. 0).
 optional uint64 encryptStripeId = 6;
 // For each encryption variant, the new encrypted local key to use until we
 // find a replacement.
 repeated bytes encryptedLocalKeys = 7;
}

4. Index

ORC中提供的索引包括了3种级别的:

level 位于ORC的哪块 数据内容
file level file footer 整个文件的列级别的统计
stripe level file footer 每个stripe的列级别的统计
row level stripe的最开始部分 每个RowGroup的列级别的统计和每个RowGroup的起始位置

这里提到的索引并不像是mysql那样的索引, 可以直接定位到具体的一样记录, 而是为了尽快的跳过文件/stripe/RowGroup

默认是每10000行的数据组成一个RowGroup, 并放置在stripe的最开始部分

message RowIndexEntry {
 repeated uint64 positions = 1 [packed=true];
 optional ColumnStatistics statistics = 2;
}

message RowIndex {
 repeated RowIndexEntry entry = 1;
}

5. 读ORC文件

// org.apache.orc.impl.RecordReaderImpl

public class RecordReaderImpl implements RecordReader {
    @Override
  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
    try {
      if (rowInStripe >= rowCountInStripe) {
        currentStripe += 1;
        if (currentStripe >= stripes.size()) {
          batch.size = 0;
          return false;
        }
        readStripe();
      }

      int batchSize = computeBatchSize(batch.getMaxSize());

      rowInStripe += batchSize;
      reader.setVectorColumnCount(batch.getDataColumnCount());
      reader.nextBatch(batch, batchSize);
      batch.selectedInUse = false;
      batch.size = batchSize;
      advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
      return batch.size  != 0;
    } catch (IOException e) {
      // Rethrow exception with file name in log message
      throw new IOException("Error reading file: " + path, e);
    }
  }
}

读stripe

// org.apache.orc.impl.RecordReaderImpl
private long rowInStripe = 0;
private long rowCountInStripe = 0;  // StripeInformation 中读取的stripe中的行数
private boolean[] includedRowGroups = null;


private void readStripe() throws IOException {
  // 初始化 rowCountInStripe,rowInStripe=0
  StripeInformation stripe = beginReadStripe();
  // 获取要读取的RowGroup
  includedRowGroups = pickRowGroups();

  // move forward to the first unskipped row
  // 
  if (includedRowGroups != null) {
    while (rowInStripe < rowCountInStripe &&
        !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
      rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
    }
  }

  // if we haven't skipped the whole stripe, read the data
  if (rowInStripe < rowCountInStripe) {
    // if we aren't projecting columns or filtering rows, just read it all
    if (isFullRead() && includedRowGroups == null) {
      readAllDataStreams(stripe);
    } else {
      readPartialDataStreams(stripe);
    }
    reader.startStripe(streams, stripeFooter);
    // if we skipped the first row group, move the pointers forward
    if (rowInStripe != 0) {
      seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
    }
  }
}

6. 写ORC文件

写文件读大概套路

  1. 写每个stripe之前将数据尽量放在buffer里面
  2. 直到buffer中的数据要落磁盘的时候, 才将其按照列来写到磁盘 (这次其实也只是写到outstream中, 数据可能还会buffer在操作系统的buffer中, 可以理解为数据从用户态buffer写到了内核态buffer)
  3. ORC因为是列式存储, 写的时候具体使用TreeWriter来写数据

初始化流:

// org.apache.orc.impl.WriterImpl.java
public FSDataOutputStream getStream() throws IOException {
    if (rawWriter == null) {
      rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
                            fs.getDefaultReplication(path), blockSize);
      rawWriter.writeBytes(OrcFile.MAGIC);
      headerLength = rawWriter.getPos();
      writer = new OutStream("metadata", bufferSize, codec,
                             new DirectStream(rawWriter));
      protobufWriter = CodedOutputStream.newInstance(writer);
    }
    return rawWriter;
}

该方法在以下几个地方会调用:

  1. flushStripe
  2. writeMetadata
  3. writeFooter
  4. appendStripe

上面的3种 OutputSteam 显示一个层层递进的关系:

rawWriter -> OutStream -> CodedOutputStream

最后的写操作都落到rawWriter的头上

@Override
public void addRowBatch(VectorizedRowBatch batch) throws IOException {
    if (buildIndex) {
      // Batch the writes up to the rowIndexStride so that we can get the
      // right size indexes.
      int posn = 0;
      while (posn < batch.size) {
        int chunkSize = Math.min(batch.size - posn,
            rowIndexStride - rowsInIndex);
        // 具体写入数据
        treeWriter.writeRootBatch(batch, posn, chunkSize);
        posn += chunkSize;
        rowsInIndex += chunkSize;
        rowsInStripe += chunkSize;
        if (rowsInIndex >= rowIndexStride) {
          createRowIndexEntry();
        }
      }
    } else {
        rowsInStripe += batch.size;
        treeWriter.writeRootBatch(batch, 0, batch.size);
    }
    memoryManager.addedRow(batch.size);
}

通过 addRowBatch 对外提供了数据写入的接口, VectorizedRowBatch 将数据的写入变成向量化,并且分列

public class VectorizedRowBatch implements Writable {
  public int numCols;           // number of columns
  public ColumnVector[] cols;   // a vector for each column
  public int size;              // number of rows that qualify (i.e. haven't been filtered out)
  public int[] selected;        // array of positions of selected values
  public int[] projectedColumns;
  public int projectionSize;

  private int dataColumnCount;
  private int partitionColumnCount;
  /// ....
}
public class WriterImpl implements Writer, MemoryManager.Callback {
  @Override
  public boolean checkMemory(double newScale) throws IOException {
    long limit = (long) Math.round(adjustedStripeSize * newScale);
    long size = estimateStripeSize();
    if (LOG.isDebugEnabled()) {
      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
                limit);
    }
    if (size > limit) {
      flushStripe();  // 这里会调用writer的flush方法
      return true;
    }
    return false;
  }
}

对写入数据在内存中的占用和是否采用flush 通过实现 MemoryManager.Callback 的方法回调来实现, 具体实现的方法为:

// org.apache.orc.impl.MemoryManager.java
public class MemoryManager {
  public interface Callback {
    /**
     * The writer needs to check its memory usage
     * @param newScale the current scale factor for memory allocations
     * @return true if the writer was over the limit
     * @throws IOException
     */
    boolean checkMemory(double newScale) throws IOException;
  }
}

ref:

  1. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC
  2. https://orc.apache.org/specification/ORCv1/
  3. https://github.com/apache/hive/blob/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
  4. https://orc.apache.org/docs/indexes.html
  5. http://lxw1234.com/archives/2016/04/632.htm
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容