说明
这个类就是索引服务,提供查询key,根据key构建索引,加载IndexFile到内存,destroy过期的IndexFile等功能,UML图如下
属性
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
/**
* Maximum times to attempt index file creation.
*/
private static final int MAX_TRY_IDX_CREATE = 3;//尝试创建IndexFile,最多3次
private final DefaultMessageStore defaultMessageStore;
private final int hashSlotNum;//每个索引文件的槽数,默认500w
private final int indexNum;//索引文件最多记录的索引个数,默认2千万
private final String storePath;//索引文件存储路径,默认 user.home/store/index
private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();//索引文件列表
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
函数
destroy函数不介绍,start,shutdown是空函数
构造函数
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
默认值参考 属性中的讲解
load
加载函数,将持久化文件的相关信息,读到内存中
加载所有IndexFile,根据lastExitOK判断是否要destroy不合理的记录,更新indexFileList
public boolean load(final boolean lastExitOK) {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
try {
IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
f.load();//读取IndexFile头信息,加载到IndexHeader的内存中
if (!lastExitOK) {
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
.getIndexMsgTimestamp()) {
//上次不ok,且时间比store记录的检查点要新的话,就无效
f.destroy(0);
continue;
}
}
log.info("load index file OK, " + f.getFileName());
this.indexFileList.add(f);
} catch (IOException e) {
log.error("load file {} error", file, e);
return false;
} catch (NumberFormatException e) {
log.error("load file {} error", file, e);
}
}
}
return true;
}
deleteExpiredFile函数
分为deleteExpiredFile(long offset) 以及 deleteExpiredFile(List<IndexFile> files)
deleteExpiredFile(long offset)就是找到IndexFile中,EndPhyOffset<offset的,调用deleteExpiredFile(fileList)删掉
deleteExpiredFile(List<IndexFile> files),就是destroy所有"过期"的IndexFile
/**
* 找到IndexFile中,EndPhyOffset<offset的
* 调用deleteExpiredFile(fileList)删掉
*/
public void deleteExpiredFile(long offset) {
Object[] files = null;
try {
this.readWriteLock.readLock().lock();
if (this.indexFileList.isEmpty()) {
return;
}
long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
if (endPhyOffset < offset) {
files = this.indexFileList.toArray();
}
} catch (Exception e) {
log.error("destroy exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
if (files != null) {
List<IndexFile> fileList = new ArrayList<IndexFile>();
for (int i = 0; i < (files.length - 1); i++) {
IndexFile f = (IndexFile) files[i];
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
break;
}
}
this.deleteExpiredFile(fileList);
}
}
//destroy所有"过期"的IndexFile
private void deleteExpiredFile(List<IndexFile> files) {
if (!files.isEmpty()) {
try {
this.readWriteLock.writeLock().lock();
for (IndexFile file : files) {
boolean destroyed = file.destroy(3000);
destroyed = destroyed && this.indexFileList.remove(file);
if (!destroyed) {
log.error("deleteExpiredFile remove failed.");
break;
}
}
} catch (Exception e) {
log.error("deleteExpiredFile has exception.", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
}
}
queryOffset
根据topic和key作为关键字,在所有IndexFile中找到匹配的记录,且存储时间在[begin,end]时间内,最多记录maxNum条
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
List<Long> phyOffsets = new ArrayList<Long>(maxNum);
long indexLastUpdateTimestamp = 0;
long indexLastUpdatePhyoffset = 0;
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());//默认64
try {
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
for (int i = this.indexFileList.size(); i > 0; i--) {//从新往旧搜索
IndexFile f = this.indexFileList.get(i - 1);
boolean lastFile = i == this.indexFileList.size();
if (lastFile) {
indexLastUpdateTimestamp = f.getEndTimestamp();//最新IndexFile的最后时间以及物理偏移
indexLastUpdatePhyoffset = f.getEndPhyOffset();
}
if (f.isTimeMatched(begin, end)) {//IndexFile的开始结束时间和[begin,end]是否有重叠
//根据topic key在IndexFile找到匹配的记录,记录在phyOffsets中
f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
}
if (f.getBeginTimestamp() < begin) {
break;
}
if (phyOffsets.size() >= maxNum) {
break;
}
}
}
} catch (Exception e) {
log.error("queryMsg exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
//返回QueryOffsetResult
return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
}
buildKey
这个是个辅助的,就是把topic和key拼起来
private String buildKey(final String topic, final String key) {
return topic + "#" + key;
}
构建索引相关
buildIndex
根据DispatchRequest构建索引
如果getSysFlag是 ROLLBACK的,就返回,不建立索引(也不会删除旧索引)
根据 UniqKey 以及 keys分别建立索引
public void buildIndex(DispatchRequest req) {
IndexFile indexFile = retryGetAndCreateIndexFile();//尝试三次,获取或者创建一个最新的可写的,IndexFile
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {//如果是老记录
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE://rollback就不建立索引
return;
}
//根据req.getUniqKey 构建索引
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
//根据msg.getKeys()按照" "来split,再构建索引
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
里面调用了retryGetAndCreateIndexFile,以及putKey如下
retryGetAndCreateIndexFile
尝试3次获取或者创建一个最新的,可写的IndexFile
如果原来最新的没写满就返回原来最新的
否则创建最新的
public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;
try {
log.info("Tried to create index file " + times + " times");
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
if (null == indexFile) {
this.defaultMessageStore.getAccessRights().makeIndexFileError();
log.error("Mark index file cannot build flag");
}
return indexFile;
}
里面调用了getAndCreateLastIndexFile,如下
getAndCreateLastIndexFile
获取,创建最新的,可写的IndexFile
IndexFileList里面最新的IndexFile没写满,就用最新的
如果写满了,就以时间戳作为filename创建一个新的IndexFile,并且flush上一个IndexFile,将最后的时间戳记录在storeCheckPoint中
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {//索引文件列表非空
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
if (!tmp.isWriteFull()) {
indexFile = tmp;//如果最新的索引文件没写满,就用
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();//获取上一个写满了的文件的物理偏移 以及 最后更新时间
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
if (indexFile == null) {//最新一个文件写满了,indexFile为空
try {
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());//文件名是时间戳
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);//标记新文件开始的物理偏移,时间为上一个文件的结束偏移,时间
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);//加入列表
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);//上一个文件flush
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
里面调用了flush,如下
flush
flush一个文件,如果写满了,获取文件最后的存储时间,记录在StoreCheckpoint中
public void flush(final IndexFile f) {
if (null == f)
return;
long indexMsgTimestamp = 0;
if (f.isWriteFull()) {//写满了就获取最后时间
indexMsgTimestamp = f.getEndTimestamp();
}
f.flush();
if (indexMsgTimestamp > 0) {
this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);//更新storeCheckPoint
this.defaultMessageStore.getStoreCheckpoint().flush();
}
}
putKey
建立索引 将key,物理偏移,存储时间放入IndexFile
如果一次放失败了,就重新获取最新的可写的IndexFile再放
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
思考
IndexFile的文件名是什么
时间戳,getAndCreateLastIndexFile函数中注明了
哪些消息可以进入IndexFile进行记录
设置了keys或者UniqKey的消息,都会进入IndexFile
buildIndex函数说明了
重要函数
函数名 | 作用 |
---|---|
load(final boolean lastExitOK) | 加载所有IndexFile,根据lastExitOK判断是否要destroy不合理的记录,更新indexFileList |
deleteExpiredFile(long offset) | 找到IndexFile中,EndPhyOffset<offset的进行destroy |
queryOffset | 根据topic和key作为关键字,在所有IndexFile中找到匹配的记录,且存储时间在[begin,end]时间内,最多记录maxNum条 |
buildIndex(DispatchRequest req) | 根据DispatchRequest 的 UniqKey 以及 keys分别建立索引 |
吐槽
构建索引的代码嵌套复杂了点,要反复保证,IndexFile是最新的,可写的