开篇
PalDB写数据Demo
StoreWriter writer = PalDB.createWriter(new File("store.paldb"));
writer.put("foo", "bar");
writer.put(1213, new int[] {1, 2, 3});
writer.close();
PalDB写数据流程
PalDB写入过程主要分为2个阶段:kv的写入,PalDB文件的生成。PalDB在整个文件索引生成的整个过程包括:
- 写入key到index的临时文件
- 写入value到data的临时文件
- 准备合并数据并生成meta元素的临时文件
- 合并meta元素的临时文件、key的临时文件,value的临时文件到最终的PalDB文件。
PalDB写数据相关数据结构
PalDB在写入过程中会把数据临时存在内存当中,存放的方式也挺新颖的。它在内部维护了一个数组,数组的大小由最大key的长度确定,譬如你的key的最大长度为6,那么数组的大小就是6,每个数组维护这个长度的key所有信息,如下标为5的数组元素维护着所有key长度为5的信息。下列的数据结构都是按照这个方法来进行存储的。
- File[] indexFiles 维护key对应的index文件
- DataOutputStream[] indexStreams 维护key对应文件流
- private File[] dataFiles 维护value对应的data文件
- DataOutputStream[] dataStreams 维护value对应的data文件流
- int[] maxOffsetLengths 维护index指向data的位移的存储字段类型
- int[] keyCounts 维护对应key的数量
public class StorageWriter {
// Configuration
private final Configuration config;
private final double loadFactor;
// Output
private final File tempFolder;
private final OutputStream outputStream;
// Index stream
private File[] indexFiles;
private DataOutputStream[] indexStreams;
// Data stream
private File[] dataFiles;
private DataOutputStream[] dataStreams;
// Cache last value
private byte[][] lastValues;
private int[] lastValuesLength;
// Data length
private long[] dataLengths;
// Index length
private long indexesLength;
// Max offset length
private int[] maxOffsetLengths;
// Number of keys
private int keyCount;
private int[] keyCounts;
// Number of values
private int valueCount;
// Number of collisions
private int collisions;
private HashUtils hashUtils;
PalDB写数据流程源码解析
写入kv过程
kv的写入过程主要包括key的写入过程和value的写入过程,整体流程如下:
- 通过getIndexStream获取长度为keyLength对应的key的输入流。
- 通过indexStream.write(key)往index的输入流写入key。
- 通过LongPacker.packLong(indexStream, dataLength)往index的输入流写入key对应data的偏移量,指向dataStream的位移。
- 通过DataOutputStream dataStream = getDataStream(keyLength)获取长度为keyLength对应的value的输入流
- 通过LongPacker.packInt(dataStream, value.length)写入value的长度
- 通过dataStream.write(value);写入value的值
在整个写入过程中会有一些辅助变量和一些技巧节省内存
- dataLengths[keyLength]保存长度为keyLength的key下的data的长度
- lastValues[keyLength]保存长度为keyLength的key上一个保存的value
- lastValuesLength保存长度为keyLength的key对应value的保存位移
- 如果keyLength下本次保存的值和lastValue相同,那么只保存上一个偏移量即可
- 提供了一个节省内存的trick就是相同长度下的key的value按照顺序排列即可
public void put(byte[] key, byte[] value)
throws IOException {
int keyLength = key.length;
//Get the Output stream for that keyLength, each key length has its own file
// 获取指定长度的key对应的输出流
DataOutputStream indexStream = getIndexStream(keyLength);
// Write key
// 写入key
indexStream.write(key);
// Check if the value is identical to the last inserted
// 判断本次数据是否和上次写入数据一致
byte[] lastValue = lastValues[keyLength];
boolean sameValue = lastValue != null && Arrays.equals(value, lastValue);
// Get data stream and length
// 获取数据的偏移量,
long dataLength = dataLengths[keyLength];
if (sameValue) {
//如果本次写入数据和上次一致,就把dataLength指向上一个value的起始位置
dataLength -= lastValuesLength[keyLength];
}
// Write offset and record max offset length
//写入key对应value的位移dataLength
int offsetLength = LongPacker.packLong(indexStream, dataLength);
maxOffsetLengths[keyLength] = Math.max(offsetLength, maxOffsetLengths[keyLength]);
// Write if data is not the same
// 本次数据和上次不一致的情况下开始写入value
if (!sameValue) {
// Get stream
// 获取指定长度key对应的value的输出流
DataOutputStream dataStream = getDataStream(keyLength);
// Write size and value
// 写入value的长度和value的值
int valueSize = LongPacker.packInt(dataStream, value.length);
dataStream.write(value);
// Update data length
// 更新数据偏移量,这里的偏移量是下一个写入value的偏移量
dataLengths[keyLength] += valueSize + value.length;
// Update last value
//更新上一次写入的value值
lastValues[keyLength] = value;
//更新keyLength对应的key的保存的data的长度
lastValuesLength[keyLength] = valueSize + value.length;
// value的计数
valueCount++;
}
//更新key的数量
keyCount++;
//更新长度为keyLength的key的个数
keyCounts[keyLength]++;
}
private DataOutputStream getIndexStream(int keyLength)
throws IOException {
// Resize array if necessary
// 每次按照keyLength进行扩容
if (indexStreams.length <= keyLength) {
indexStreams = Arrays.copyOf(indexStreams, keyLength + 1);
indexFiles = Arrays.copyOf(indexFiles, keyLength + 1);
keyCounts = Arrays.copyOf(keyCounts, keyLength + 1);
maxOffsetLengths = Arrays.copyOf(maxOffsetLengths, keyLength + 1);
lastValues = Arrays.copyOf(lastValues, keyLength + 1);
lastValuesLength = Arrays.copyOf(lastValuesLength, keyLength + 1);
dataLengths = Arrays.copyOf(dataLengths, keyLength + 1);
}
// Get or create stream
DataOutputStream dos = indexStreams[keyLength];
if (dos == null) {
File file = new File(tempFolder, "temp_index" + keyLength + ".dat");
file.deleteOnExit();
indexFiles[keyLength] = file;
dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
indexStreams[keyLength] = dos;
//这里之所以进行++是因为value写入的过程中会先写入1byte的数据,dos.writeByte(0);
dataLengths[keyLength]++;
}
return dos;
}
private DataOutputStream getDataStream(int keyLength)
throws IOException {
// Resize array if necessary
if (dataStreams.length <= keyLength) {
dataStreams = Arrays.copyOf(dataStreams, keyLength + 1);
dataFiles = Arrays.copyOf(dataFiles, keyLength + 1);
}
DataOutputStream dos = dataStreams[keyLength];
if (dos == null) {
File file = new File(tempFolder, "data" + keyLength + ".dat");
file.deleteOnExit();
dataFiles[keyLength] = file;
dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
dataStreams[keyLength] = dos;
// Write one byte so the zero offset is reserved
dos.writeByte(0);
}
return dos;
}
合并key和value文件生成PalDB文件
PalDB通过close动作来完成PalDB文件的生成,主要完成的事情包括:
- 将value对应的输出流刷新到文件
- 将key对应的输出流刷新到文件
- 生成元数据文件metadata.dat文件 writeMetadata(metadataDataOutputStream);
- 将不同长度对应的key对应的index文件进行重建 buildIndex(i)
- 将metaData、不同长度的key对应的临时文件、不同长度的key下的value对应的临时文件进行合并生成PalDB最终文件
- mergeFiles方法执行最终的合并动作,做的事情就是将文件按照key的长度的顺序写入一个文件当中
public void close()
throws IOException {
// Close the data and index streams
for (DataOutputStream dos : dataStreams) {
if (dos != null) {
dos.close();
}
}
for (DataOutputStream dos : indexStreams) {
if (dos != null) {
dos.close();
}
}
// Stats
LOGGER.log(Level.INFO, "Number of keys: {0}", keyCount);
LOGGER.log(Level.INFO, "Number of values: {0}", valueCount);
// Prepare files to merge
List<File> filesToMerge = new ArrayList<File>();
try {
//Write metadata file
File metadataFile = new File(tempFolder, "metadata.dat");
metadataFile.deleteOnExit();
FileOutputStream metadataOututStream = new FileOutputStream(metadataFile);
DataOutputStream metadataDataOutputStream = new DataOutputStream(metadataOututStream);
writeMetadata(metadataDataOutputStream);
metadataDataOutputStream.close();
metadataOututStream.close();
filesToMerge.add(metadataFile);
// Build index file
for (int i = 0; i < indexFiles.length; i++) {
if (indexFiles[i] != null) {
filesToMerge.add(buildIndex(i));
}
}
// Stats collisions
LOGGER.log(Level.INFO, "Number of collisions: {0}", collisions);
// Add data files
for (File dataFile : dataFiles) {
if (dataFile != null) {
filesToMerge.add(dataFile);
}
}
// Merge and write to output
checkFreeDiskSpace(filesToMerge);
mergeFiles(filesToMerge, outputStream);
} finally {
outputStream.close();
cleanup(filesToMerge);
}
}
private void mergeFiles(List<File> inputFiles, OutputStream outputStream)
throws IOException {
long startTime = System.nanoTime();
//Merge files
for (File f : inputFiles) {
if (f.exists()) {
FileInputStream fileInputStream = new FileInputStream(f);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
try {
LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()});
byte[] buffer = new byte[8192];
int length;
while ((length = bufferedInputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, length);
}
} finally {
bufferedInputStream.close();
fileInputStream.close();
}
} else {
LOGGER.log(Level.INFO, "Skip merging file {0} because it doesn't exist", f.getName());
}
}
}
元数据文件生成过程
元数据的生成过程就把PalDB的索引文件的相关元信息进行写入,具体内容看代码的自解释。核心的包括:
- 写入key相关的统计信息
- 写入key的起始位移
- 写入value的起始位移
private void writeMetadata(DataOutputStream dataOutputStream)
throws IOException {
//Write format version
dataOutputStream.writeUTF(FormatVersion.getLatestVersion().name());
//Write time
dataOutputStream.writeLong(System.currentTimeMillis());
//Prepare
int keyLengthCount = getNumKeyCount();
int maxKeyLength = keyCounts.length - 1;
//Write size (number of keys)
dataOutputStream.writeInt(keyCount);
//Write the number of different key length
dataOutputStream.writeInt(keyLengthCount);
//Write the max value for keyLength
dataOutputStream.writeInt(maxKeyLength);
// For each keyLength
long datasLength = 0l;
for (int i = 0; i < keyCounts.length; i++) {
if (keyCounts[i] > 0) {
// Write the key length
dataOutputStream.writeInt(i);
// Write key count
dataOutputStream.writeInt(keyCounts[i]);
// Write slot count
int slots = (int) Math.round(keyCounts[i] / loadFactor);
dataOutputStream.writeInt(slots);
// Write slot size
int offsetLength = maxOffsetLengths[i];
dataOutputStream.writeInt(i + offsetLength);
// Write index offset
dataOutputStream.writeInt((int) indexesLength);
// Increment index length
indexesLength += (i + offsetLength) * slots;
// Write data length
dataOutputStream.writeLong(datasLength);
// Increment data length
datasLength += dataLengths[i];
}
}
//Write serializers
try {
Serializers.serialize(dataOutputStream, config.getSerializers());
} catch (Exception e) {
throw new RuntimeException();
}
//Write the position of the index and the data
// 元数据文件+key文件+value文件的顺序,先写入key索引文件起始位移,再写入value的data文件的起始位移。
int indexOffset = dataOutputStream.size() + (Integer.SIZE / Byte.SIZE) + (Long.SIZE / Byte.SIZE);
dataOutputStream.writeInt(indexOffset);
dataOutputStream.writeLong(indexOffset + indexesLength);
}
key索引文件重建过程
索引文件的重建过程做的事情就是把key从原来的文件重建到根据key数据和负载因子生成的索引文件中,做的事情包括:
- 根据key的数量和负载因子重新计算新的索引文件Math.round(count / loadFactor)
- 从旧的key的索引文件中读取key的数据并进行hash定位到slot位置
- 在指定的slot位置写入key的值以及key对应value在data文件中偏移量
private File buildIndex(int keyLength)
throws IOException {
// 根据该长度下key的数目/负载因子计算存储的slot的格式
long count = keyCounts[keyLength];
int slots = (int) Math.round(count / loadFactor);
int offsetLength = maxOffsetLengths[keyLength];
//注意slotSize的计算方式,slot里面保存的内容包括key的长度以及指向data的偏移量占用的字节数
int slotSize = keyLength + offsetLength;
// Init index
File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw");
try {
// 设置重建key的文件的长度
indexAccessFile.setLength(slots * slotSize);
FileChannel indexChannel = indexAccessFile.getChannel();
MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());
// Init reading stream
// 初始化输入流
File tempIndexFile = indexFiles[keyLength];
DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
try {
byte[] keyBuffer = new byte[keyLength];
byte[] slotBuffer = new byte[slotSize];
byte[] offsetBuffer = new byte[offsetLength];
// Read all keys
// 遍历key的数量重新写入到新建的索引文件当中
for (int i = 0; i < count; i++) {
// Read key
tempIndexStream.readFully(keyBuffer);
// Read offset
long offset = LongPacker.unpackLong(tempIndexStream);
// Hash,根据key进行重hash后确定放置到具体的slot位置
long hash = (long) hashUtils.hash(keyBuffer);
boolean collision = false;
for (int probe = 0; probe < count; probe++) {
int slot = (int) ((hash + probe) % slots);
byteBuffer.position(slot * slotSize);
byteBuffer.get(slotBuffer);
long found = LongPacker.unpackLong(slotBuffer, keyLength);
if (found == 0) {
// The spot is empty use it
// 根据hash值写入key以及key对应value在data文件的偏移量
byteBuffer.position(slot * slotSize);
byteBuffer.put(keyBuffer);
int pos = LongPacker.packLong(offsetBuffer, offset);
byteBuffer.put(offsetBuffer, 0, pos);
break;
} else {
collision = true;
// PalDB不支持存在相同的key
if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
throw new RuntimeException(
String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
}
}
}
if (collision) {
collisions++;
}
}
}
return indexFile;
}