PalDB 写数据过程

开篇


PalDB写数据Demo

StoreWriter writer = PalDB.createWriter(new File("store.paldb"));
writer.put("foo", "bar");
writer.put(1213, new int[] {1, 2, 3});
writer.close();


PalDB写数据流程

 PalDB写入过程主要分为2个阶段:kv的写入,PalDB文件的生成。PalDB在整个文件索引生成的整个过程包括:

  • 写入key到index的临时文件
  • 写入value到data的临时文件
  • 准备合并数据并生成meta元素的临时文件
  • 合并meta元素的临时文件、key的临时文件,value的临时文件到最终的PalDB文件。
PalDB写数据流程


PalDB写数据相关数据结构

 PalDB在写入过程中会把数据临时存在内存当中,存放的方式也挺新颖的。它在内部维护了一个数组,数组的大小由最大key的长度确定,譬如你的key的最大长度为6,那么数组的大小就是6,每个数组维护这个长度的key所有信息,如下标为5的数组元素维护着所有key长度为5的信息。下列的数据结构都是按照这个方法来进行存储的。

  • File[] indexFiles 维护key对应的index文件
  • DataOutputStream[] indexStreams 维护key对应文件流
  • private File[] dataFiles 维护value对应的data文件
  • DataOutputStream[] dataStreams 维护value对应的data文件流
  • int[] maxOffsetLengths 维护index指向data的位移的存储字段类型
  • int[] keyCounts 维护对应key的数量
PalDB临时内存数据存储
public class StorageWriter {

  // Configuration
  private final Configuration config;
  private final double loadFactor;
  // Output
  private final File tempFolder;
  private final OutputStream outputStream;
  // Index stream
  private File[] indexFiles;
  private DataOutputStream[] indexStreams;
  // Data stream
  private File[] dataFiles;
  private DataOutputStream[] dataStreams;
  // Cache last value
  private byte[][] lastValues;
  private int[] lastValuesLength;
  // Data length
  private long[] dataLengths;
  // Index length
  private long indexesLength;
  // Max offset length
  private int[] maxOffsetLengths;
  // Number of keys
  private int keyCount;
  private int[] keyCounts;
  // Number of values
  private int valueCount;
  // Number of collisions
  private int collisions;

  private HashUtils hashUtils;


PalDB写数据流程源码解析

写入kv过程

 kv的写入过程主要包括key的写入过程和value的写入过程,整体流程如下:

  • 通过getIndexStream获取长度为keyLength对应的key的输入流。
  • 通过indexStream.write(key)往index的输入流写入key。
  • 通过LongPacker.packLong(indexStream, dataLength)往index的输入流写入key对应data的偏移量,指向dataStream的位移。
  • 通过DataOutputStream dataStream = getDataStream(keyLength)获取长度为keyLength对应的value的输入流
  • 通过LongPacker.packInt(dataStream, value.length)写入value的长度
  • 通过dataStream.write(value);写入value的值

 在整个写入过程中会有一些辅助变量和一些技巧节省内存

  • dataLengths[keyLength]保存长度为keyLength的key下的data的长度
  • lastValues[keyLength]保存长度为keyLength的key上一个保存的value
  • lastValuesLength保存长度为keyLength的key对应value的保存位移
  • 如果keyLength下本次保存的值和lastValue相同,那么只保存上一个偏移量即可
  • 提供了一个节省内存的trick就是相同长度下的key的value按照顺序排列即可
public void put(byte[] key, byte[] value)
      throws IOException {
    int keyLength = key.length;

    //Get the Output stream for that keyLength, each key length has its own file
    // 获取指定长度的key对应的输出流
    DataOutputStream indexStream = getIndexStream(keyLength);

    // Write key
    // 写入key
    indexStream.write(key);

    // Check if the value is identical to the last inserted
    // 判断本次数据是否和上次写入数据一致
    byte[] lastValue = lastValues[keyLength];
    boolean sameValue = lastValue != null && Arrays.equals(value, lastValue);

    // Get data stream and length
    // 获取数据的偏移量,
    long dataLength = dataLengths[keyLength];
    if (sameValue) {
      //如果本次写入数据和上次一致,就把dataLength指向上一个value的起始位置
      dataLength -= lastValuesLength[keyLength];
    }

    // Write offset and record max offset length
    //写入key对应value的位移dataLength
    int offsetLength = LongPacker.packLong(indexStream, dataLength);
    maxOffsetLengths[keyLength] = Math.max(offsetLength, maxOffsetLengths[keyLength]);

    // Write if data is not the same
    // 本次数据和上次不一致的情况下开始写入value
    if (!sameValue) {
      // Get stream
      // 获取指定长度key对应的value的输出流
      DataOutputStream dataStream = getDataStream(keyLength);

      // Write size and value
      // 写入value的长度和value的值
      int valueSize = LongPacker.packInt(dataStream, value.length);
      dataStream.write(value);

      // Update data length
      // 更新数据偏移量,这里的偏移量是下一个写入value的偏移量
      dataLengths[keyLength] += valueSize + value.length;

      // Update last value
      //更新上一次写入的value值
      lastValues[keyLength] = value;
      //更新keyLength对应的key的保存的data的长度
      lastValuesLength[keyLength] = valueSize + value.length;

      // value的计数
      valueCount++;
    }

    //更新key的数量
    keyCount++;
    //更新长度为keyLength的key的个数
    keyCounts[keyLength]++;
  }



private DataOutputStream getIndexStream(int keyLength)
      throws IOException {
    // Resize array if necessary
    // 每次按照keyLength进行扩容
    if (indexStreams.length <= keyLength) {
      indexStreams = Arrays.copyOf(indexStreams, keyLength + 1);
      indexFiles = Arrays.copyOf(indexFiles, keyLength + 1);
      keyCounts = Arrays.copyOf(keyCounts, keyLength + 1);
      maxOffsetLengths = Arrays.copyOf(maxOffsetLengths, keyLength + 1);
      lastValues = Arrays.copyOf(lastValues, keyLength + 1);
      lastValuesLength = Arrays.copyOf(lastValuesLength, keyLength + 1);
      dataLengths = Arrays.copyOf(dataLengths, keyLength + 1);
    }

    // Get or create stream
    DataOutputStream dos = indexStreams[keyLength];
    if (dos == null) {
      File file = new File(tempFolder, "temp_index" + keyLength + ".dat");
      file.deleteOnExit();
      indexFiles[keyLength] = file;

      dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
      indexStreams[keyLength] = dos;

      //这里之所以进行++是因为value写入的过程中会先写入1byte的数据,dos.writeByte(0);
      dataLengths[keyLength]++;
    }
    return dos;
  }



private DataOutputStream getDataStream(int keyLength)
      throws IOException {
    // Resize array if necessary
    if (dataStreams.length <= keyLength) {
      dataStreams = Arrays.copyOf(dataStreams, keyLength + 1);
      dataFiles = Arrays.copyOf(dataFiles, keyLength + 1);
    }

    DataOutputStream dos = dataStreams[keyLength];
    if (dos == null) {
      File file = new File(tempFolder, "data" + keyLength + ".dat");
      file.deleteOnExit();
      dataFiles[keyLength] = file;

      dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
      dataStreams[keyLength] = dos;

      // Write one byte so the zero offset is reserved
      dos.writeByte(0);
    }
    return dos;
  }


合并key和value文件生成PalDB文件

 PalDB通过close动作来完成PalDB文件的生成,主要完成的事情包括:

  • 将value对应的输出流刷新到文件
  • 将key对应的输出流刷新到文件
  • 生成元数据文件metadata.dat文件 writeMetadata(metadataDataOutputStream);
  • 将不同长度对应的key对应的index文件进行重建 buildIndex(i)
  • 将metaData、不同长度的key对应的临时文件、不同长度的key下的value对应的临时文件进行合并生成PalDB最终文件
  • mergeFiles方法执行最终的合并动作,做的事情就是将文件按照key的长度的顺序写入一个文件当中
public void close()
      throws IOException {
    // Close the data and index streams
    for (DataOutputStream dos : dataStreams) {
      if (dos != null) {
        dos.close();
      }
    }
    for (DataOutputStream dos : indexStreams) {
      if (dos != null) {
        dos.close();
      }
    }

    // Stats
    LOGGER.log(Level.INFO, "Number of keys: {0}", keyCount);
    LOGGER.log(Level.INFO, "Number of values: {0}", valueCount);

    // Prepare files to merge
    List<File> filesToMerge = new ArrayList<File>();

    try {

      //Write metadata file
      File metadataFile = new File(tempFolder, "metadata.dat");
      metadataFile.deleteOnExit();
      FileOutputStream metadataOututStream = new FileOutputStream(metadataFile);
      DataOutputStream metadataDataOutputStream = new DataOutputStream(metadataOututStream);
      writeMetadata(metadataDataOutputStream);
      metadataDataOutputStream.close();
      metadataOututStream.close();
      filesToMerge.add(metadataFile);

      // Build index file
      for (int i = 0; i < indexFiles.length; i++) {
        if (indexFiles[i] != null) {
          filesToMerge.add(buildIndex(i));
        }
      }

      // Stats collisions
      LOGGER.log(Level.INFO, "Number of collisions: {0}", collisions);

      // Add data files
      for (File dataFile : dataFiles) {
        if (dataFile != null) {
          filesToMerge.add(dataFile);
        }
      }

      // Merge and write to output
      checkFreeDiskSpace(filesToMerge);
      mergeFiles(filesToMerge, outputStream);
    } finally {
      outputStream.close();
      cleanup(filesToMerge);
    }
  }



private void mergeFiles(List<File> inputFiles, OutputStream outputStream)
      throws IOException {
    long startTime = System.nanoTime();

    //Merge files
    for (File f : inputFiles) {
      if (f.exists()) {
        FileInputStream fileInputStream = new FileInputStream(f);
        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
        try {
          LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()});

          byte[] buffer = new byte[8192];
          int length;
          while ((length = bufferedInputStream.read(buffer)) > 0) {
            outputStream.write(buffer, 0, length);
          }
        } finally {
          bufferedInputStream.close();
          fileInputStream.close();
        }
      } else {
        LOGGER.log(Level.INFO, "Skip merging file {0} because it doesn't exist", f.getName());
      }
    }
  }


元数据文件生成过程

 元数据的生成过程就把PalDB的索引文件的相关元信息进行写入,具体内容看代码的自解释。核心的包括:

  • 写入key相关的统计信息
  • 写入key的起始位移
  • 写入value的起始位移
private void writeMetadata(DataOutputStream dataOutputStream)
      throws IOException {
    //Write format version
    dataOutputStream.writeUTF(FormatVersion.getLatestVersion().name());

    //Write time
    dataOutputStream.writeLong(System.currentTimeMillis());

    //Prepare
    int keyLengthCount = getNumKeyCount();
    int maxKeyLength = keyCounts.length - 1;

    //Write size (number of keys)
    dataOutputStream.writeInt(keyCount);

    //Write the number of different key length
    dataOutputStream.writeInt(keyLengthCount);

    //Write the max value for keyLength
    dataOutputStream.writeInt(maxKeyLength);

    // For each keyLength
    long datasLength = 0l;
    for (int i = 0; i < keyCounts.length; i++) {
      if (keyCounts[i] > 0) {
        // Write the key length
        dataOutputStream.writeInt(i);

        // Write key count
        dataOutputStream.writeInt(keyCounts[i]);

        // Write slot count
        int slots = (int) Math.round(keyCounts[i] / loadFactor);
        dataOutputStream.writeInt(slots);

        // Write slot size
        int offsetLength = maxOffsetLengths[i];
        dataOutputStream.writeInt(i + offsetLength);

        // Write index offset
        dataOutputStream.writeInt((int) indexesLength);

        // Increment index length
        indexesLength += (i + offsetLength) * slots;

        // Write data length
        dataOutputStream.writeLong(datasLength);

        // Increment data length
        datasLength += dataLengths[i];
      }
    }

    //Write serializers
    try {
      Serializers.serialize(dataOutputStream, config.getSerializers());
    } catch (Exception e) {
      throw new RuntimeException();
    }

    //Write the position of the index and the data
    // 元数据文件+key文件+value文件的顺序,先写入key索引文件起始位移,再写入value的data文件的起始位移。
    int indexOffset = dataOutputStream.size() + (Integer.SIZE / Byte.SIZE) + (Long.SIZE / Byte.SIZE);
    dataOutputStream.writeInt(indexOffset);
    dataOutputStream.writeLong(indexOffset + indexesLength);
  }


key索引文件重建过程

 索引文件的重建过程做的事情就是把key从原来的文件重建到根据key数据和负载因子生成的索引文件中,做的事情包括:

  • 根据key的数量和负载因子重新计算新的索引文件Math.round(count / loadFactor)
  • 从旧的key的索引文件中读取key的数据并进行hash定位到slot位置
  • 在指定的slot位置写入key的值以及key对应value在data文件中偏移量
private File buildIndex(int keyLength)
      throws IOException {
    // 根据该长度下key的数目/负载因子计算存储的slot的格式
    long count = keyCounts[keyLength];
    int slots = (int) Math.round(count / loadFactor);
    int offsetLength = maxOffsetLengths[keyLength];

    //注意slotSize的计算方式,slot里面保存的内容包括key的长度以及指向data的偏移量占用的字节数
    int slotSize = keyLength + offsetLength;

    // Init index
    File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
    RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw");
    try {
      // 设置重建key的文件的长度
      indexAccessFile.setLength(slots * slotSize);
      FileChannel indexChannel = indexAccessFile.getChannel();
      MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());

      // Init reading stream
      // 初始化输入流
      File tempIndexFile = indexFiles[keyLength];
      DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
      try {
        byte[] keyBuffer = new byte[keyLength];
        byte[] slotBuffer = new byte[slotSize];
        byte[] offsetBuffer = new byte[offsetLength];

        // Read all keys
        // 遍历key的数量重新写入到新建的索引文件当中
        for (int i = 0; i < count; i++) {
          // Read key
          tempIndexStream.readFully(keyBuffer);

          // Read offset
          long offset = LongPacker.unpackLong(tempIndexStream);

          // Hash,根据key进行重hash后确定放置到具体的slot位置
          long hash = (long) hashUtils.hash(keyBuffer);

          boolean collision = false;
          for (int probe = 0; probe < count; probe++) {
            int slot = (int) ((hash + probe) % slots);
            byteBuffer.position(slot * slotSize);
            byteBuffer.get(slotBuffer);

            long found = LongPacker.unpackLong(slotBuffer, keyLength);
            if (found == 0) {
              // The spot is empty use it
              // 根据hash值写入key以及key对应value在data文件的偏移量
              byteBuffer.position(slot * slotSize);
              byteBuffer.put(keyBuffer);
              int pos = LongPacker.packLong(offsetBuffer, offset);
              byteBuffer.put(offsetBuffer, 0, pos);
              break;
            } else {
              collision = true;
              // PalDB不支持存在相同的key
              if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
                throw new RuntimeException(
                        String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
              }
            }
          }

          if (collision) {
            collisions++;
          }
        }
      } 

    return indexFile;
  }


PalDB的文件存储格式

PalDB存储结构
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,894评论 2 89
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,590评论 18 139
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,082评论 1 32
  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,442评论 0 13
  • 跟z老师聊了几句天,我就是突然想依偎在他怀里了
    Rites阅读 130评论 0 0