elasticsearch-机制-文件系统

Directory类继承树

在IndicesService的createIndexService函数中,IndicesService创建IndexModule实例对象,IndexModule创建IndexStore实例对象,IndexStore创建FsDirectoryService实例对象

IndicesService::createIndexService
private synchronized IndexService createIndexService(final String reason, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
    final Index index = indexMetaData.getIndex();
    final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
    final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
    ...
 
    final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);  // 创建IndexModule
    ...
 
    return indexModule.newIndexService(nodeEnv, xContentRegistry, this, circuitBreakerService, bigArrays, threadPool, scriptService,
            clusterService, client, indicesQueryCache, mapperRegistry, indicesFieldDataCache);  // 托管IndexModule创建IndexService
}
IndexModule::newIndexService
public IndexService newIndexService(NodeEnvironment environment, NamedXContentRegistry xContentRegistry,
        IndexService.ShardStoreDeleter shardStoreDeleter, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
        ThreadPool threadPool, ScriptService scriptService,
        ClusterService clusterService, Client client, IndicesQueryCache indicesQueryCache, MapperRegistry mapperRegistry,
        IndicesFieldDataCache indicesFieldDataCache) throws IOException {
 
    ...;
    final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
    final IndexStore store;
    if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) {  // IndexModule提供了5种类型的文件系统:NIOFS,MMAPFS,SIMPLEFS,FS,DEFAULT
        store = new IndexStore(indexSettings, indexStoreConfig);
    } else {  // 也可实现自定义的IndexStore,通过调用addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider)函数去注册自定义IndexStore
        BiFunction<IndexSettings, IndexStoreConfig, IndexStore> factory = storeTypes.get(storeType);
        if (factory == null) {
            throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
        }
        store = factory.apply(indexSettings, indexStoreConfig);
        if (store == null) {
            throw new IllegalStateException("store must not be null");
        }
    }
    indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType);  // index.store.throttle.type, 节流控制
    indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
        store::setMaxRate);  // index.store.throttle.max_bytes_per_sec, 节流控制
    ...;
    return new IndexService(indexSettings, environment, xContentRegistry, new SimilarityService(indexSettings, similarities),
            shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
            clusterService, client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
            indicesFieldDataCache, searchOperationListeners, indexOperationListeners);
}
IndexStore::newDirectoryService
public DirectoryService newDirectoryService(ShardPath path) {
    return new FsDirectoryService(indexSettings, this, path);
}

FsDirectoryService

FsDirectoryService创建目录时主要做两件事:获取文件锁,以及根据指定类型创建对应类型的目录对象

FsDirectoryService::newDirectory
    @Override
    public Directory newDirectory() throws IOException {
        final Path location = path.resolveIndex();  // 获取目录位置
        final LockFactory lockFactory = indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING);  // 获取文件锁
        Files.createDirectories(location);
        Directory wrapped = newFSDirectory(location, lockFactory);  // 根据指定类型创建对应类型的目录对象
        Set<String> preLoadExtensions = new HashSet<>(
                indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
        wrapped = setPreload(wrapped, location, lockFactory, preLoadExtensions);  // MMapFS文件系统在将文件映射到应用程序内存时进行预加载操作,将数据写进MappedByteBuffer
        if (indexSettings.isOnSharedFilesystem()) {
            wrapped = new SleepingLockWrapper(wrapped, 5000);
        }
        return new RateLimitedFSDirectory(wrapped, this, this) ;
    }
  
    protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
        final String storeType = indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(),
            IndexModule.Type.FS.getSettingsKey());
        if (IndexModule.Type.FS.match(storeType)) {
            return FSDirectory.open(location, lockFactory); // use lucene defaults
        } else if (IndexModule.Type.DEFAULT.match(storeType)) {
            deprecationLogger.deprecated("[default] store type is deprecated, use [fs] instead");
            return FSDirectory.open(location, lockFactory); // use lucene defaults
        } else if (IndexModule.Type.SIMPLEFS.match(storeType)) {
            return new SimpleFSDirectory(location, lockFactory);
        } else if (IndexModule.Type.NIOFS.match(storeType)) {
            return new NIOFSDirectory(location, lockFactory);
        } else if (IndexModule.Type.MMAPFS.match(storeType)) {
            return new MMapDirectory(location, lockFactory);
        }
        throw new IllegalArgumentException("No directory found for type [" + storeType + "]");
    }
FSDirectoryService文件锁

由index.store.fs.fs_lock参数指定,可选[native, simple],默认native

创建方式 校验方式 解锁 备注
NativeFSLockFactory Files.createFile(lockFile); // 文件存在时忽略异常
FileChannel channel = FileChannel.open(realPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
FileLock lock = channel.tryLock();
FileTime creationTime = Files.readAttributes(realPath, BasicFileAttributes.class).creationTime();
lock.isValid();
channel.size() == 0;
creationTime.equals(Files.readAttributes(path, BasicFileAttributes.class).creationTime());
没有显示释放文件锁。可能是等垃圾回收时自动释放 依托NIO的文件锁API实现
SimpleFSLockFactory Files.createFile(lockFile); // 文件存在时抛异常
FileTime creationTime = Files.readAttributes(lockFile, BasicFileAttributes.class).creationTime();
creationTime.equals(Files.readAttributes(path, BasicFileAttributes.class).creationTime()); Files.delete(path); 文件创建时间属性校验
FSDirectory文件类型
type openInput read createOutput and write 备注
FSDirectory Type.FS 或 Type.DEFAULT if (Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
return new MMapDirectory(path, lockFactory);
} else if (Constants.WINDOWS) {
return new SimpleFSDirectory(path, lockFactory);
} else {
return new NIOFSDirectory(path, lockFactory);
}
OutputStream out = Files.newOutputStream(directory.resolve(name), options);
out.write(b, offset, chunk);
FSDirectory是lucene库实现的文件系统类,根据操作系统是否为WINDOWS,操作系统是否为64位,操作系统的文件系统是否支持MMAP来决定使用SimpleFSDirectory、NIOFSDirectory、MMapDirectory中的一种方式
SimpleFSDirectory Type.SIMPLEFS SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ); ByteBuffer bb = ByteBuffer.wrap(b, offset, len);
int i = channel.read(bb);
面向WINDOWS平台的文件系统
NIOFSDirectory Type.NIOFS FileChannel fc = FileChannel.open(path, StandardOpenOption.READ); ByteBuffer bb = ByteBuffer.wrap(b, offset, len);
int i = channel.read(bb, pos);
面向LINUX平台的NIO库的文件系统
MMapDirectory Type.MMAPFS FileChannel c = FileChannel.open(path, StandardOpenOption.READ); MappedByteBuffer buffer = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize);
if (preload) { buffer.load(); }
面向LINUX平台的支持MMap方式的文件系统
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。