
Directory类继承树
在IndicesService的createIndexService函数中,IndicesService创建IndexModule实例对象,IndexModule创建IndexStore实例对象,IndexStore创建FsDirectoryService实例对象
IndicesService::createIndexService
private synchronized IndexService createIndexService(final String reason, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
...
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry); // 创建IndexModule
...
return indexModule.newIndexService(nodeEnv, xContentRegistry, this, circuitBreakerService, bigArrays, threadPool, scriptService,
clusterService, client, indicesQueryCache, mapperRegistry, indicesFieldDataCache); // 托管IndexModule创建IndexService
}
IndexModule::newIndexService
public IndexService newIndexService(NodeEnvironment environment, NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ThreadPool threadPool, ScriptService scriptService,
ClusterService clusterService, Client client, IndicesQueryCache indicesQueryCache, MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache) throws IOException {
...;
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
final IndexStore store;
if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) { // IndexModule提供了5种类型的文件系统:NIOFS,MMAPFS,SIMPLEFS,FS,DEFAULT
store = new IndexStore(indexSettings, indexStoreConfig);
} else { // 也可实现自定义的IndexStore,通过调用addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider)函数去注册自定义IndexStore
BiFunction<IndexSettings, IndexStoreConfig, IndexStore> factory = storeTypes.get(storeType);
if (factory == null) {
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
}
store = factory.apply(indexSettings, indexStoreConfig);
if (store == null) {
throw new IllegalStateException("store must not be null");
}
}
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType); // index.store.throttle.type, 节流控制
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
store::setMaxRate); // index.store.throttle.max_bytes_per_sec, 节流控制
...;
return new IndexService(indexSettings, environment, xContentRegistry, new SimilarityService(indexSettings, similarities),
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
clusterService, client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners);
}
IndexStore::newDirectoryService
public DirectoryService newDirectoryService(ShardPath path) {
return new FsDirectoryService(indexSettings, this, path);
}
FsDirectoryService
FsDirectoryService创建目录时主要做两件事:获取文件锁,以及根据指定类型创建对应类型的目录对象
FsDirectoryService::newDirectory
@Override
public Directory newDirectory() throws IOException {
final Path location = path.resolveIndex(); // 获取目录位置
final LockFactory lockFactory = indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING); // 获取文件锁
Files.createDirectories(location);
Directory wrapped = newFSDirectory(location, lockFactory); // 根据指定类型创建对应类型的目录对象
Set<String> preLoadExtensions = new HashSet<>(
indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
wrapped = setPreload(wrapped, location, lockFactory, preLoadExtensions); // MMapFS文件系统在将文件映射到应用程序内存时进行预加载操作,将数据写进MappedByteBuffer
if (indexSettings.isOnSharedFilesystem()) {
wrapped = new SleepingLockWrapper(wrapped, 5000);
}
return new RateLimitedFSDirectory(wrapped, this, this) ;
}
protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
final String storeType = indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(),
IndexModule.Type.FS.getSettingsKey());
if (IndexModule.Type.FS.match(storeType)) {
return FSDirectory.open(location, lockFactory); // use lucene defaults
} else if (IndexModule.Type.DEFAULT.match(storeType)) {
deprecationLogger.deprecated("[default] store type is deprecated, use [fs] instead");
return FSDirectory.open(location, lockFactory); // use lucene defaults
} else if (IndexModule.Type.SIMPLEFS.match(storeType)) {
return new SimpleFSDirectory(location, lockFactory);
} else if (IndexModule.Type.NIOFS.match(storeType)) {
return new NIOFSDirectory(location, lockFactory);
} else if (IndexModule.Type.MMAPFS.match(storeType)) {
return new MMapDirectory(location, lockFactory);
}
throw new IllegalArgumentException("No directory found for type [" + storeType + "]");
}
FSDirectoryService文件锁
由index.store.fs.fs_lock参数指定,可选[native, simple],默认native
| 创建方式 | 校验方式 | 解锁 | 备注 | |
|---|---|---|---|---|
| NativeFSLockFactory | Files.createFile(lockFile); // 文件存在时忽略异常 FileChannel channel = FileChannel.open(realPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE); FileLock lock = channel.tryLock(); FileTime creationTime = Files.readAttributes(realPath, BasicFileAttributes.class).creationTime(); |
lock.isValid(); channel.size() == 0; creationTime.equals(Files.readAttributes(path, BasicFileAttributes.class).creationTime()); |
没有显示释放文件锁。可能是等垃圾回收时自动释放 | 依托NIO的文件锁API实现 |
| SimpleFSLockFactory | Files.createFile(lockFile); // 文件存在时抛异常 FileTime creationTime = Files.readAttributes(lockFile, BasicFileAttributes.class).creationTime(); |
creationTime.equals(Files.readAttributes(path, BasicFileAttributes.class).creationTime()); | Files.delete(path); | 文件创建时间属性校验 |
FSDirectory文件类型
| type | openInput | read | createOutput and write | 备注 | |
|---|---|---|---|---|---|
| FSDirectory | Type.FS 或 Type.DEFAULT | if (Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) { return new MMapDirectory(path, lockFactory); } else if (Constants.WINDOWS) { return new SimpleFSDirectory(path, lockFactory); } else { return new NIOFSDirectory(path, lockFactory); } |
OutputStream out = Files.newOutputStream(directory.resolve(name), options); out.write(b, offset, chunk); |
FSDirectory是lucene库实现的文件系统类,根据操作系统是否为WINDOWS,操作系统是否为64位,操作系统的文件系统是否支持MMAP来决定使用SimpleFSDirectory、NIOFSDirectory、MMapDirectory中的一种方式 | |
| SimpleFSDirectory | Type.SIMPLEFS | SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ); | ByteBuffer bb = ByteBuffer.wrap(b, offset, len); int i = channel.read(bb); |
面向WINDOWS平台的文件系统 | |
| NIOFSDirectory | Type.NIOFS | FileChannel fc = FileChannel.open(path, StandardOpenOption.READ); | ByteBuffer bb = ByteBuffer.wrap(b, offset, len); int i = channel.read(bb, pos); |
面向LINUX平台的NIO库的文件系统 | |
| MMapDirectory | Type.MMAPFS | FileChannel c = FileChannel.open(path, StandardOpenOption.READ); | MappedByteBuffer buffer = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize); if (preload) { buffer.load(); } |
面向LINUX平台的支持MMap方式的文件系统 |