IndexReader/IndexWriter 的打开/关闭操作是一个非常损耗性能过程,在并发量大的情况下,甚至会影响到应用的稳定性。lucene-candy对IndexReader/IndexWriter的操作进行顶层封装,屏蔽开发者的直接调度,由组件自动进行初始化、销毁。
- Directory初始化
根据缓存的索引信息初始化索引目录:
....
Map<String, IndexBO> map = this.indexInfoCache.getAll();
map.forEach((k, v) -> this.initDirectory(v.getIndexName()));
log.debug("完成索引目录初始化,size:{}", CollUtil.size(DIRECTORY_MAP));
private void initDirectory(String index) {
if (DIRECTORY_MAP.containsKey(index)) {
return;
}
synchronized (DIRECTORY_MAP) {
if (DIRECTORY_MAP.containsKey(index)) {
return;
}
File file = FileUtil.file(this.luceneCandyConfig.getDataPath(), index);
FileUtil.mkdir(file);
try {
Directory directory = FSDirectory.open(Paths.get(file.getAbsolutePath()));
DIRECTORY_MAP.put(index, directory);
} catch (Exception e) {
log.error("init directory error", e);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}
}
- IndexReader初始化
应用启动后组件根据索引目录自动初始化IndexReader,并添加到本地缓存。
private void initIndexReader(String index) {
Directory directory = this.directoryCache.get(index, false);
if (READER_MAP.containsKey(index)) {
return;
}
synchronized (READER_MAP) {
if (!READER_MAP.containsKey(index)) {
try {
READER_MAP.put(index, DirectoryReader.open(directory));
} catch (Exception e) {
log.error("init indexReader error", e);
}
}
}
}
- 获取IndexReader
在这里,调用的是DirectoryReader.openIfChanged 而不是DirectoryReader.open API。两者区别在于索引没发生变化的情况下openIfChanged返回的是null。前文有提到过IndexReader的open操作是一个开销非常大的操作,在没必要重新open的情况下,复用旧的IndexReader即可。
public IndexReader get(String key, Boolean deepCopy) {
if (!READER_MAP.containsKey(key)) {
return null;
}
IndexReader indexReader = READER_MAP.get(key);
IndexWriter indexWriter = this.indexWriterCache.get(key, false);
IndexReader newReader = null;
synchronized (READER_MAP) {
try {
newReader = DirectoryReader.openIfChanged((DirectoryReader) indexReader, indexWriter);
} catch (Exception e) {
log.error("获取 IndexReader 异常", e);
}
if (Objects.nonNull(newReader)) {
READER_MAP.put(key, newReader);
Map<IndexReader, Long> map = new HashMap<>(1);
map.put(indexReader, System.currentTimeMillis());
boolean flag = OLD_INDEXREADER_QUEUE.offer(map);
if (!flag) {
log.warn("回收队列可能已满,旧索引无法添加到回收队列");
}
}
}
return Objects.nonNull(newReader) ? newReader : indexReader;
}
IndexReaderCache 定义有两个队列
/**
* 可回收的IndexReader
*/
private static final LinkedBlockingQueue<IndexReader> RECYCLE_INDEXREADER_QUEUE = new LinkedBlockingQueue<>(NumberConstant.ONE_THOUSAND);
/**
* 整型最大值近似无界队列
*/
private static final LinkedBlockingQueue<Map<IndexReader, Long>> OLD_INDEXREADER_QUEUE = new LinkedBlockingQueue<>();
两个队列的作用在于区分等待回收和可回收的IndexReader,组件默认在OLD_INDEXREADER_QUEUE队列里面等待时长超过5秒将会进入RECYCLE_INDEXREADER_QUEUE等待组件关闭。
- 初始化IndexWriter
应用启动后组件根据索引目录自动初始化IndexWriter,并添加到本地缓存。
public void refresh() {
Map<String, IndexBO> map = this.indexInfoCache.getAll();
map.forEach((k, v) -> this.initIndexWriter(v.getIndexName()));
log.debug("完成IndexWriter初始化,size:{}", CollUtil.size(WRITER_MAP));
}
- 获取IndexWriter
lucene-candy给每个索引都维护一个IndexWriter,即所有线程共用一个索引的IndexWriter。
public IndexWriter get(String key, Boolean deepCopy) {
return WRITER_MAP.get(key);
}
IndexWriter的commit操作交由批量处理
public void commit() {
WRITER_MAP.forEach((k, v) -> {
try {
v.commit();
} catch (Exception e) {
log.error("index:{}, commit indexWriter error", k, e);
}
});
}