开篇
PalDB reader的多级缓存
在解释PalDB的读过程之前先解释下为啥PalDB读的性能比较理想,其实PalDB可以理解为有多级缓存:
- 一级缓存是StorageCache对象,里面是用LinkedHashMap实现的缓存
- 二级缓存是StorageReader对象,通过mmap实现的文件到内存的映射。
public final class ReaderImpl implements StoreReader {
// Logger
private final static Logger LOGGER = Logger.getLogger(ReaderImpl.class.getName());
// Configuration
private final Configuration config;
// Buffer
private final DataInputOutput dataInputOutput = new DataInputOutput();
// Storage
private final StorageReader storage;
// Serialization
private final StorageSerialization serialization;
// Cache
private final StorageCache cache;
// File
private final File file;
// Opened?
private boolean opened;
private StorageCache(Configuration config) {
cache = new LinkedHashMap(config.getInt(Configuration.CACHE_INITIAL_CAPACITY),
config.getFloat(Configuration.CACHE_LOAD_FACTOR), true) {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
boolean res = currentWeight > maxWeight;
if (res) {
Object key = eldest.getKey();
Object value = eldest.getValue();
currentWeight -= getWeight(key) + getWeight(value) + OVERHEAD;
}
return res;
}
};
PalDB 的读取过程
PalDB宏观的读取过程
PalDB的读取过程也非常简单明了,就是标准的带有缓存的读取过程:
- 判断缓存是否有数据,有则直接从cache中取出返回
- 通过storage.get从mmap的文件中读取数据
- 将数据放到缓存后直接返回数据
public <K> K get(Object key, K defaultValue) {
checkOpen();
if (key == null) {
throw new NullPointerException("The key can't be null");
}
K value = cache.get(key);
if (value == null) {
try {
byte[] valueBytes = storage.get(serialization.serializeKey(key));
if (valueBytes != null) {
Object v = serialization.deserialize(dataInputOutput.reset(valueBytes));
cache.put(key, v);
return (K) v;
} else {
return defaultValue;
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else if (value == StorageCache.NULL_VALUE) {
return null;
}
return value;
}
StorageReader的读取过程
PalDB读取过程是一个根据key进行hash定位slot的过程,整个过程如下:
- 对key进行hash的得到hash值,获取key下的slot的数量
- 在key的mmap内容中定位到key在PalDB中的位置,得到对应的value的偏移量
- 通过value的偏移量在value的mmap内容中定位到value对应的值并返回
在读取value的mmap的过程中,会根据偏移量对mmap的文件个数求余后定位mmap的文件后进行读取。
public byte[] get(byte[] key)
throws IOException {
int keyLength = key.length;
if (keyLength >= slots.length || keyCounts[keyLength] == 0) {
return null;
}
long hash = (long) hashUtils.hash(key);
int numSlots = slots[keyLength];
int slotSize = slotSizes[keyLength];
int indexOffset = indexOffsets[keyLength];
long dataOffset = dataOffsets[keyLength];
for (int probe = 0; probe < numSlots; probe++) {
int slot = (int) ((hash + probe) % numSlots);
indexBuffer.position(indexOffset + slot * slotSize);
indexBuffer.get(slotBuffer, 0, slotSize);
long offset = LongPacker.unpackLong(slotBuffer, keyLength);
if (offset == 0) {
return null;
}
if (isKey(slotBuffer, key)) {
byte[] value = mMapData ? getMMapBytes(dataOffset + offset) : getDiskBytes(dataOffset + offset);
return value;
}
}
return null;
}
//Read the data at the given offset, the data can be spread over multiple data buffers
private byte[] getMMapBytes(long offset)
throws IOException {
//Read the first 4 bytes to get the size of the data
ByteBuffer buf = getDataBuffer(offset);
int maxLen = (int) Math.min(5, dataSize - offset);
int size;
if (buf.remaining() >= maxLen) {
//Continuous read
int pos = buf.position();
size = LongPacker.unpackInt(buf);
// Used in case of data is spread over multiple buffers
offset += buf.position() - pos;
} else {
//The size of the data is spread over multiple buffers
int len = maxLen;
int off = 0;
sizeBuffer.reset();
while (len > 0) {
buf = getDataBuffer(offset + off);
int count = Math.min(len, buf.remaining());
buf.get(sizeBuffer.getBuf(), off, count);
off += count;
len -= count;
}
size = LongPacker.unpackInt(sizeBuffer);
offset += sizeBuffer.getPos();
buf = getDataBuffer(offset);
}
//Create output bytes
byte[] res = new byte[size];
//Check if the data is one buffer
if (buf.remaining() >= size) {
//Continuous read
buf.get(res, 0, size);
} else {
int len = size;
int off = 0;
while (len > 0) {
buf = getDataBuffer(offset);
int count = Math.min(len, buf.remaining());
buf.get(res, off, count);
offset += count;
off += count;
len -= count;
}
}
return res;
}
private ByteBuffer getDataBuffer(long index) {
ByteBuffer buf = dataBuffers[(int) (index / segmentSize)];
buf.position((int) (index % segmentSize));
return buf;
}
StorageReader的初始化过程
StorageReader的初始化过程其实就是一个写入的逆向过程,怎么写入就怎么读取,整体的顺序如下:
- 读取metaData的PalDB信息,包括key起始位移,value起始位移
- 将key对应的内容映射到内存的mmap当中,不超过2GB。
- 将value对应的内容映射到内存的mmap当中,根据segment大小进行切分
StorageReader(Configuration configuration, File file)
throws IOException {
path = file;
config = configuration;
//Config
segmentSize = config.getLong(Configuration.MMAP_SEGMENT_SIZE);
hashUtils = new HashUtils();
// Check valid segmentSize
if (segmentSize > Integer.MAX_VALUE) {
throw new IllegalArgumentException(
"The `" + Configuration.MMAP_SEGMENT_SIZE + "` setting can't be larger than 2GB");
}
//Open file and read metadata
long createdAt = 0;
FormatVersion formatVersion = null;
FileInputStream inputStream = new FileInputStream(path);
DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(inputStream));
try {
int ignoredBytes = -2;
//Byte mark
byte[] mark = FormatVersion.getPrefixBytes();
int found = 0;
while (found != mark.length) {
byte b = dataInputStream.readByte();
if (b == mark[found]) {
found++;
} else {
ignoredBytes += found + 1;
found = 0;
}
}
//Version
byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length);
dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length);
formatVersion = FormatVersion.fromBytes(versionFound);
if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) {
throw new RuntimeException(
"Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion
+ "'");
}
//Time
createdAt = dataInputStream.readLong();
//Metadata counters
keyCount = dataInputStream.readInt();
keyLengthCount = dataInputStream.readInt();
maxKeyLength = dataInputStream.readInt();
//Read offset counts and keys
indexOffsets = new int[maxKeyLength + 1];
dataOffsets = new long[maxKeyLength + 1];
keyCounts = new int[maxKeyLength + 1];
slots = new int[maxKeyLength + 1];
slotSizes = new int[maxKeyLength + 1];
int maxSlotSize = 0;
for (int i = 0; i < keyLengthCount; i++) {
int keyLength = dataInputStream.readInt();
keyCounts[keyLength] = dataInputStream.readInt();
slots[keyLength] = dataInputStream.readInt();
slotSizes[keyLength] = dataInputStream.readInt();
indexOffsets[keyLength] = dataInputStream.readInt();
dataOffsets[keyLength] = dataInputStream.readLong();
maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
}
slotBuffer = new byte[maxSlotSize];
//Read serializers
try {
Serializers.deserialize(dataInputStream, config.getSerializers());
} catch (Exception e) {
throw new RuntimeException();
}
//Read index and data offset
indexOffset = dataInputStream.readInt() + ignoredBytes;
dataOffset = dataInputStream.readLong() + ignoredBytes;
} finally {
//Close metadata
dataInputStream.close();
inputStream.close();
}
//Create Mapped file in read-only mode
mappedFile = new RandomAccessFile(path, "r");
channel = mappedFile.getChannel();
long fileSize = path.length();
//Create index buffer
indexBuffer = channel.map(FileChannel.MapMode.READ_ONLY, indexOffset, dataOffset - indexOffset);
//Create data buffers
dataSize = fileSize - dataOffset;
//Check if data size fits in memory map limit
if (!config.getBoolean(Configuration.MMAP_DATA_ENABLED)) {
//Use classical disk read
mMapData = false;
dataBuffers = null;
} else {
//Use Mmap
mMapData = true;
//Build data buffers
int bufArraySize = (int) (dataSize / segmentSize) + ((dataSize % segmentSize != 0) ? 1 : 0);
dataBuffers = new MappedByteBuffer[bufArraySize];
int bufIdx = 0;
for (long offset = 0; offset < dataSize; offset += segmentSize) {
long remainingFileSize = dataSize - offset;
long thisSegmentSize = Math.min(segmentSize, remainingFileSize);
dataBuffers[bufIdx++] = channel.map(FileChannel.MapMode.READ_ONLY, dataOffset + offset, thisSegmentSize);
}
}