一 简介
在上一篇Lucene学习笔记:Lucene检索全流程学习笔记 中,笔者描述了为什么学习Lucene源码,以及学习笔记的写作方式。本文写作方式类似,描述一篇文档经过处理,最终倒排数据写入磁盘的全过程。聚焦于全流程调用链是如何被贯穿起来的,1)从官方demo中的 indexDocs(writer, docDir); 语句出发,顺着调用链,一步一步走到生成内存中的倒排中间数据全流程;2)从官方demo中的 writer.close(); 出发,顺着调用链将倒排数据落盘的全流程贯穿起来。因此基于这个目的,本文在描述的过程中会略去相当多的内容:
Lucene生成的索引文件多种多样,有倒排方面的.tim .tip .doc .pos .pay文件,也有其他的如点数据、docValues、termVector等,由于本文的主要目的是倒排数据落盘全流程,因此主要围绕倒排文件展开,其他索引文件会简单提一句
在落盘的过程中,涉及到索引的合并、索引的删除等,它们笔记复杂,也足够重要,重要到完全应该单独成文。另外一方面,为了突出本文的重点:文档->倒排(内存)、倒排(内存) -> 落盘,也会舍弃这部分的介绍,或者在调用链中遇到的时候简单提一句
在创建索引的调用链中,有很多逻辑存在多个分支,本文会选择最简单的一个分支来介绍,以期降低学习难度和成本。例如在创建索引的时候,会面临2种情况,1) 当前目录已经存在索引文件 2) 当前目录没有索引文件。在这种情况下,本文会选择2)这种场景来分析介绍,舍弃1)的部分。
触发索引文件落盘有多种场景,例如自动落盘:索引阶段系统检测到索引文件的内存总消耗量超过了某一个阈值,便会自动触发落盘;主动落盘:用户调用了close接口,触发落盘逻辑执行,等等。本文基于官方demo来介绍,focus在close触发的落盘这一个路径,其他路径略去不介绍。
二 从官方demo开始
为了描述上、理解上的简洁,笔者将官方demo裁剪重整后,贴出来,如下:
位置:org/apache/lucene/demo/IndexFiles.java
public static void main(String[] args) {
Directory dir = FSDirectory.open(Paths.get(indexPath));
Analyzer analyzer = new StandardAnalyzer();
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
iwc.setOpenMode(OpenMode.CREATE);
IndexWriter writer = new IndexWriter(dir, iwc);
//filePath是dir目录中需要索引的一个原始文档,例如./lucene.txt
Path filePath = xxx;
try (InputStream stream = Files.newInputStream(filePath)) {
// make a new, empty document
Document doc = new Document();
Field pathField = new StringField("path", file.toString(), Field.Store.YES);
doc.add(pathField);
doc.add(new LongPoint("modified", lastModified));
doc.add(new TextField("contents", new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))));
writer.addDocument(doc);
}
writer.close();
}
首先是打开需要索引的文件:
dir = FSDirectory.open(Paths.get(indexPath));dir目录下的文件都是需要索引的实例化分词器
Analyzer analyzer = new StandardAnalyzer();标准分词器是一个简单的以空格来分词的工具-
索引配置项
iwc = new IndexWriterConfig(analyzer),配置了很多重要的属性,简单罗列几个maxBufferedDocs 内存中索引的文件超过这个数触发落盘
delPolicy ,例如KeepOnlyLastCommitDeletionPolicy 删除策略,每次只保留上一个最新的commit
mergeScheduler 索引merge调度器
codec 描述了各种索引文件的版本,此处是lucene87, 据此能找到各种索引文件的生成工具类,参见 Lucene检索全流程学习笔记
mergePolicy 索引合并策略
实例化indexWriter
IndexWriter writer = new IndexWriter(dir, iwc);索引落盘的主要工具类生成中间索引数据,存在内存中
writer.addDocument(doc);落盘
writer.close();触发内存中的索引中间数据落盘
小结 5、6 是索引落盘的2个核心阶段,也是本文重点介绍对象
三 索引:从文档到内存
构建索引的第一个阶段是将磁盘上待索引的文件经过处理后,生成倒排索引的中间数据,并存储在内存中,供后续落盘阶段进一步处理。
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
IndexWriter writer = new IndexWriter(dir, iwc)
writer.addDocument(doc);
1 初始化:DefaultIndexingChain
在new IndexWriterConfig(analyzer) 的构造函数(基类)里,有一行关键代码如下:
indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@Override
DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
Consumer<Throwable> abortingExceptionConsumer) {
return new DefaultIndexingChain(indexCreatedVersionMajor, segmentInfo, directory, fieldInfos, indexWriterConfig, abortingExceptionConsumer);
}
};
getChain 函数返回的 <mark>new DefaultIndexingChain</mark> 非常重要,生成倒排中间数据便是由它负责。后文再详细介绍之
2 初始化:构建IndexWriter
new IndexWriter(dir, iwc) 由上文可知iwc的成员IndexingChain是专门负责将文档转化为倒排中间数据,而它本身作为IndexWriter 构造函数的参数,因此不难看出IndexWriter是一个总控类,负责做一些生成倒排前的准备工作,以及在特定的时间点调用对应的索引生成工具产出对应的索引数据。IndexWriter构造函数里面由2条分支 Create模式:新建索引
Append模式:目录下已经由索引文件,通过Append的方式创建索引。由于1)比较简单,本文以此为例来介绍
conf.setIndexWriter(this);以IndexWriter对象自身为参数,set到conf中(前文提到的iwc变量) 目的是防止IndexWriter被复用软删除相关:
softDeletesEnabled = config.getSoftDeletesField() != null;软删除是对某些被删除的文档做一个标记,并不真正从物理层面删掉数据,而是检索阶段过滤掉被软删除的那些文档获取写锁、索引合并调度器的获取+初始化
接下来以Create模式的分支来继续
indexExists = DirectoryReader.indexExists(directory);判断当前目录下有没有索引文件(后续回滚使用)-
如果当前目录有索引文件,则读取,并创建回滚点,以防止意外情况下需要回滚当前操作
final SegmentInfos sis = new SegmentInfos(config.getIndexCreatedVersionMajor()); if (indexExists) { final SegmentInfos previous = SegmentInfos.readLatestCommit(directory); sis.updateGenerationVersionAndCounter(previous); } segmentInfos = sis; rollbackSegments = segmentInfos.createBackupSegmentInfos(); }其中readLatestCommit 底层真正调度的是
public static final SegmentInfos readCommit(Directory directory, ChecksumIndexInput input, long generation)这个函数,它的逻辑在Lucene检索全流程学习笔记 中已经详细介绍过了,主要是从读取当前目录下的索引到内存对象中。-
读取当前索引中的域相关信息:
globalFieldNumberMap = getFieldNumberMap();注:Lucene中域的编号是连续递增的,如果一个目录中已经有索引的情况下(假设域的最大编号是:N)再创建新的索引,新的索引中的域的编号要从N+1 开始,这也是这一块读取域相关信息的原因
-
接下来是很多工具类对象的创建和初始化,本文只介绍docWriter
3 初始化:DocumentsWriter
DocumentsWriter 是写倒排数据的总入口,下面看看它的初始化做了哪些事情
- 实例化DocumentsWriter 对象
docWriter = new DocumentsWriter(flushNotifications, segmentInfos.getIndexCreatedVersionMajor(), pendingNumDocs,
enableTestPoints, this::newSegmentName,
config, directoryOrig, directory, globalFieldNumberMap);
-
进入到DocumentsWriter 的构造函数
this.perThreadPool = new DocumentsWriterPerThreadPool(() -> { final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap); return new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig, directory, config, deleteQueue, infos, pendingNumDocs, enableTestPoints); }); flushControl = new DocumentsWriterFlushControl(this, config);perThreadPool是一个关键点,索引数据落盘时。上面的代码里this 指针被设置到flushControl里面,后面索引落盘的时候flushControl会从this(DocumentsWriter)里面找到perThreadPool并会从这个pool里面取出一个dwpt来执行相关任务。
-
接2,在DocumentsWriterPerThread 构造函数里面,有一个关键调用:
segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort()); consumer = indexWriterConfig.getIndexingChain().getChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException);segmentInfo 描述了一次索引的提交,是索引数据落盘前的中间数据结果。consumer 是最关键的一个对象,它负责 1)待索引的文档经过处理,形成内存中的中间状态的待落盘的格式 2)将1)中的数据落盘 2个最核心的功能。
接3,consumer 实际上被赋值的是前文提及的DefaultIndexingChain ,它里面有一个关键对象:
termsHash = new FreqProxTermsWriter(intBlockAllocator, byteBlockAllocator, bytesUsed, termVectorsWriter);termHash 负责3中提及的功能1)。-
接4,termsHash的构造函数里有2个关键成员变量
intPool = new IntBlockPool(intBlockAllocator); bytePool = new ByteBlockPool(byteBlockAllocator);bytePool的作用是存储term对应的文档id、词频、term在文档中的位置信息、偏移量信息、payload信息等。Lucene里面通过精巧的设计,使得可以流式地存储这些信息。intPool可以理解为存储的是特定term下指向bytePool中对应信息的位置。
4 生成倒排中间数据
再回到第二 章部分的demo,从writer.addDocument(doc); 调用开始,就进入到了将文档数据进行处理,形成中间状态的索引数据。
-
经过基层嵌套调用后,走到如下这个函数
private long updateDocuments(final DocumentsWriterDeleteQueue.Node<?> delNode, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { 1中的这个函数里面有一行关键调用:
docWriter.updateDocuments(docs, delNode)在4.1小节介绍
4.1 docWriter.updateDocuments(xx)
为了描述上的简介,聚焦于本文写作的目标,下面的函数经过笔者删减
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs,
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
long seqNo;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
seqNo = dwpt.updateDocuments(docs, delNode, flushNotifications);
final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate);
if (postUpdate(flushingDWPT, hasEvents)) {
seqNo = -seqNo;
}
return seqNo;
}
代码中的dwpt便是第3小节第2部分提及的用于生成中间索引数据的工具对象
flushingDWPT 是在文档操作完毕后,会检查是不是需要将当前的索引数据罗盘,前文提及lucene中落盘的触发点比较多,本文只会聚焦于demo中的writer.close()函数触发的落盘逻辑
-
dwpt.updateDocuments 是关键调用,负责前文提及的 生成索引中间数据
long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException { try { testPoint("DocumentsWriterPerThread addDocuments start"); assert abortingException == null: "DWPT has hit aborting exception but is still indexing"; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + numDocsInRAM + " seg=" + segmentInfo.name); } final int docsInRamBefore = numDocsInRAM; boolean allDocsIndexed = false; try { for (Iterable<? extends IndexableField> doc : docs) { // Even on exception, the document is still added (but marked // deleted), so we don't need to un-reserve at that point. // Aborting exceptions will actually "lose" more than one // document, so the counter will be "wrong" in that case, but // it's very hard to fix (we can't easily distinguish aborting // vs non-aborting exceptions): reserveOneDoc(); consumer.processDocument(numDocsInRAM++, doc); } allDocsIndexed = true; return finishDocuments(deleteNode, docsInRamBefore); } finally { if (!allDocsIndexed && !aborted) { // the iterator threw an exception that is not aborting // go and mark all docs from this block as deleted deleteLastDocs(numDocsInRAM - docsInRamBefore); } } } finally { maybeAbort("updateDocuments", flushNotifications); } }这个函数里面,会遍历每一个代索引的文档,并调用
consumer.processDocument(numDocsInRAM++, doc);进行具体的操作。从第3 小节的第3部分可知,consumer的实际类型是DefaultIndexingChain 下面进入4.1 小节介绍4.1 DefaultIndexingChain::processDocument
for (IndexableField field : document) { fieldCount = processField(docID, field, fieldGen, fieldCount); }processDocument 函数里面的核心调用代码如上所示,遍历文档中的每一个域,然后调用processField(xx)函数进行详细的处理。下面按照业务逻辑的流转顺序介绍之
-
获取域名、域的类型. 在Lucene里面的一个文档对象包含多个域,每一个域对应具体的域值,域值的类型多种多样,例如字符串类型。在索引阶段也是按照域的维度来处理:全部文档相同的域对应的域值组织到一块来处理,fieldName、fieldType就是用来获取处理这个域名下的索引数据的工具类
String fieldName = field.name(); IndexableFieldType fieldType = field.fieldType(); -
获取1中提及的特定域的索引生成工具类对象,也就是下面代码片段中的 fp
PerField fp = null; fp = getOrAddField(fieldName, fieldType, true);getOrAddField 做的事情是,先看看是不是已经有这个域对应的索引生成工具了,有则直接返回,否则生成一个新的,下面在 4.2 小节会详细介绍之
fp.invert(docID, field, first);生成倒排索引数据的真正入口,逻辑非常复杂,下面在第5 小节专门介绍-
如果这个域是第一次出现,则将fp工具类对象存储起来,下次复用:也就是下次处理其他文档的时候,遇到相同域名,会再复用之
if (first) { fields[fieldCount++] = fp; fp.fieldGen = fieldGen; } -
存储域值。Lucene中生成文档对象时,对应的域可以添加是否存储的属性,如果设置存储的话,便会生成:fdt、fdx、等索引文件,它们记录了域值以及和docid相关的信息。可以看到和生成倒排数据类型,此处也会使用到fp小工具。同样写存储域的逻辑比较复杂storedFieldsConsumer.writeField 会在第6小节来介绍
if (fieldType.stored()) { fp = getOrAddField(fieldName, fieldType, false); String value = field.stringValue(); storedFieldsConsumer.writeField(fp.fieldInfo, field); } 下面还有点数据、docValue等索引数据的处理,本文不介绍这一块
4.2 getOrAddField
位置:org/apache/lucene/index/DefaultIndexingChain.java 进入到该函数的逻辑:
-
先从现有的链表里面看看能不能找到现成的. 下面代码片段中的fieldHash的每一个入口是一个链表,至于为啥有多个链表入口,在hashCode函数里面有介绍,为了减少冲突竞争,至于设计的原理不是本文的重点,按下不表。
final int hashPos = name.hashCode() & hashMask; PerField fp = fieldHash[hashPos]; while (fp != null && !fp.fieldInfo.name.equals(name)) { fp = fp.next; } -
如果1中没有找到,则开始生成新的fp的逻辑
FieldInfo fi = fieldInfos.getOrAdd(name); initIndexOptions(fi, fieldType.indexOptions());在上一篇关于Lucene检索流程的学习笔记里,介绍过FieldInfo,它记录了某个域的属性,例如这个域是否存储payload、termVector、norms等索引数据。同样也是看看有现成的复用,没有新生成返回。
initIndexOptions会将一些属性信息,设置到刚才的FieldInfo里,如果不一致则抛出异常:换句话说,不同的文档里面有相同的域名,其属性缺不同,是不行的。 -
以2中的数据为参数,构造fp工具类对象
fp = new PerField(indexCreatedVersionMajor, fi, invert, indexWriterConfig.getSimilarity(), indexWriterConfig.getInfoStream(), indexWriterConfig.getAnalyzer());PerField的构造函数里面会设置一些成员变量,如:FieldInfo类型的fi、similarity(例如BM25), 如果生成这个PerField传了invert = true,说明目的是生成倒排数据,会继续执行
setInvertState操作 -
接3,setInvertState主要做了如下事情
termsHashPerField = termsHash.addField(invertState, fieldInfo);从第3小节的第3步,可以知道termsHash的实际类型是FreqProxTermsWriter ,因此addField实际调用的是下面这个函数
public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) { return new FreqProxTermsWriterPerField(invertState, this, fieldInfo, nextTermsHash.addField(invertState, fieldInfo)); }可见termsHashPerField实际上是FreqProxTermsWriterPerField类型的对象。
-
FreqProxTermsWriterPerField 里面持有一些关键数据结构,用于存储索引中间格式的数据。它的构造函数里面会调用基类的构造函数,代码如下:
TermsHashPerField(int streamCount, IntBlockPool intPool, ByteBlockPool bytePool, ByteBlockPool termBytePool, Counter bytesUsed, TermsHashPerField nextPerField, String fieldName, IndexOptions indexOptions) { this.intPool = intPool; this.bytePool = bytePool; this.streamCount = streamCount; this.fieldName = fieldName; this.nextPerField = nextPerField; assert indexOptions != IndexOptions.NONE; this.indexOptions = indexOptions; PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed); bytesHash = new BytesRefHash(termBytePool, HASH_INIT_SIZE, byteStarts); }1)intPool和bytePool前文介绍过,直接存储文档号、term频率、term位置等信息
2)此处streamCount是1或者2;a) 如果只存储 文档号和词频则是1,b) 如果还额外存储位置、payload等信息则是2. 在bytePool里面分别有两条逻辑上的分片,每一个分片对应存储此处提及的a) b)
3)nextPerField 指的是termsVector,这里采用了职责链的设计模式
bytesHash管理了terms和其ID的映射,输入分词后的term, 返回一个编号ID
回到4中提及的
setInvertState函数,还有一些处理norms数据的逻辑
5 生成倒排索引
回顾前文提到的
fp.invert(docID, field, first)函数,它是索引中间数据生成的入口。函数的第一个参数是待处理文档的ID、第二个参数是这个文档中待处理的域、第三个参数表示是这个域是否第一次出现。5.1 主流程
进入fp.invert函数,看看总体上做了哪些事,然后在后续的小节再深入其中的关键点。
-
读出这个域对应的域值,然后分词为Term流,后续依次处理每一个term
TokenStream stream = tokenStream = field.tokenStream(analyzer, tokenStream)field对象内部有成员变量存储域名、域值、编号等信息,此处的tokenStream会将域值分词后形成一个数据流stream,后续可以遍历这个stream取出每一个term得到它的诸如term的位置、offset等信息.
invertState.setAttributeSource(stream);将stream的一些属性设置到invertState中,后续每一栋stream的指针到下一个term的时候,可以通过invertState获得term的值、在文档中的位置、payload数据等。invertState是对stream 的简单包装termsHashPerField.start(field, first);在第三章,第4.2小节我们知道 termsHashPerField 的实际类型是<mark>FreqProxTermsWriterPerField</mark>,接下来索引数据的处理便是由该函数接手。接3,start函数里面会把invertState 的涉及到term的成员变量赋值过来。那么FreqProxTermsWriterPerField 是如何跟invertState联系起来的呢,可以参见本章4.2 节第4步骤。
-
Term位置的处理: 获取当前term所在的位置(笔者注:例如同一个term多次出现,此次是该term的第几次出现 待确认) ,然后累加到全局变量invertState.position上,后面会据此做一些异常边界的判断,例如超过阈值抛出异常等等
int posIncr = invertState.posIncrAttribute.getPositionIncrement(); invertState.position += posIncr; -
获取term在文档中的起始位置
int startOffset = invertState.offset + invertState.offsetAttribute.startOffset(); int endOffset = invertState.offset + invertState.offsetAttribute.endOffset(); 接下来索引数据的生成会交给
termsHashPerField.add处理,在本章5.3小节详细介绍接下来是一些收尾性质的逻辑
5.2 流式存储
在介绍生成索引中间数据及它的存储格式前,先抛出几个问题,有利于理解Lucene中流式索引数据的存储格式。
5.2.1 层级关系
-

如上图所示,Lucene中处理文档的顺序是:域-> term -> 文档。所有文档的同一个域名组织在一起;相同域名下的同一个term组织在一起,term下面挂的是包含这个term的文档列表。当然图中省略了位置、offset、payload等信息。
5.2.2 一个例子
假设有一个文档, 是我们处理的第一个文档,因而文档号是docid=0, 它有一个名为content 的域,域值是 good good study , 经过分词后有3个term:good、good、study (忽略payload)

一个存储格式如上图所示:第一个4表示接下来有一个长度为4的term :good (真正存储的应该是二进制,如ASIIC码等,此处直接用可视化字符来描述)。接下来的红色的数字0表示这个term 出现在编号为0的文档中,0后面的红色数字2表示good 这个term在文档0里面出现的频率是2. 接下来的0表示这个term出现的位置是0,接下来的0,3表示它在域值里面的起始位置是从0开始到第3个字符,接下来的158分别表示第二个同名的term good,它出现的位置是2,从第5个字符到第8个字符。再接下来存储的是term strudy相关的信息。
5.2.3 缺陷
下面看一下5.2.2 中的缺陷:
假设现在开始处理文档1,它的content域里面也有good这个term,那么后面应该把文档号和词频写入上面的数据中,跟在good后面,但是从图片可以看出good后面已经没有空间来存相应的信息了。
针对1,一个解决办法是在good后面留足够的buffer,但问题是每一个term对应的文档号的数量不同,无法预知
针对1,专门申请一块单独的buffer用于写入文档号和词频,这样随着索引的数据越来越多,term的数量也越来越多,需要申请非常多的单独的buffer,且它们的大小无法预知,这样内存使用不紧凑。在落盘期间也难以还原域、term、文档、位置、offset等数据之间的关系
文档号和词频:在一个文档处理结束后,才知道词频,而term的位置信息在文档尚未处理结束之前,就随着处理不断地产生数据写入内存。
5.2.4 解决办法
- 紧凑式内存使用

如图所示,宏观上lucene分配若干等长的buffer,每一个固定大小:例如8M,然后每次使用的时候按顺序从这些buffer里面去分配
- 无法预知长度的解决办法

如图所示,Lucene里面有一个变长slice的概念,第一次使用的时候分配 Size1个字节,等到不够用的时候,再次重新分配,会找到下一个level应该分配的字节数Size2个字节。图中1.1 1.2分别对应了size1 、size2个字节的slice
-
衔接问题
1)2中的1.1 和1.2 如何衔接到一块呢?Lucene中每一个slice的末尾处会存储一些元信息,例如当我们在1.1里面移动指针不断地写入数据,某个时刻会遇到buffer的末尾的若干字节里面存储有数字,这个数字告诉我们下一次分配slice应该分配多长的size,然后分配后,这几个特殊的字节,会存入一个表示新分配的slice的偏移量的值。
2)这样顺着这些元信息,就可以将物理上不连续的slice在逻辑上连到一块,供我们遍历之。(当然图中的这个例子,两块slice是紧挨着的,后面会介绍不是紧挨着的情况)
3)因为存储下一个slice的偏移量的值可能比较大,我们可能需要多个字节,这样有可能把老的数据覆盖掉,因此老的若干字节,会被挪到新的slice里面。
- 写入频率不一致的解决办法

将文档词频 和 位置、offset、payload 从逻辑上分成2条线,各自在自己的slice里面处理,例如图中绿色的slice存储文档词频、橙色的slice存储 位置、offset、payload信息等。这个也显示了 3中提到的一个逻辑上连续的slice在物理上并不连续的情况,它们是靠着相应的元信息来保持一个逻辑线条的完整连贯性。
小结 由于上面的设计,使得Lucene在写倒排数据时不仅内存使用非常紧凑,还可以再次基础上,包装更高级的迭代器,使得我们遍历索引中间数据就像遍历vector那么容易。后文结合代码再详细分析,会有一个更全面的了解
5.2.5 相关数据结构
下面先来介绍下承载索引中间数据的数据结构,然后在 5.2.6 小节开始深入对应的业务逻辑中。
byteHash:
int termID = bytesHash.add(termBytes);可以根据term的值,得到一个对应该term的编号。ByteBlockPool里面有一个byte [][]类型的buffer。这个里面就是存储term、文档+词频、term位置、term offset、term payload信息的地方postingsArray.addressOffset 根据termID, 可以定位到对应的IntBlockPool
IntBlockPool里面有一个int[][]类型的buffer。里面有指针指向ByteBlockPool中特定term对应的slice的位置(一开始指向起点,随着数据的写入,指针也在移动,相当于指向对应逻辑slice的尾部)note 这里的指针实际是一个偏移量,为了描述的形象,直接使用之。postingsArray.byteStarts[termID]指向这个term对应在ByteBlocPool中的slice的起点,且这个值不会变化。
用一个图来描述上述各个变量之间的关系

5.3 termsHashPerField.add
经过5.2 小节的介绍,我们对Lucene中存储索引中间数据的格式有了初步了解。让我们再一次回到主流程的termsHashPerField.add 函数,来看一看索引生成的真正逻辑是啥样的。
int termID = bytesHash.add(termBytes);
//System.out.println("add term=" + termBytesRef.utf8ToString() + " doc=" + docState.docID + " termID=" + termID);
if (termID >= 0) { // New posting
// Init stream slices
initStreamSlices(termID, docID);
} else {
termID = positionStreamSlice(termID, docID);
}
如上面代码片段所示,termID如果小于0,说明第一次遇见这个term,否则之前已经出现过了。为了描述上的方便,我们选取if (termID >= 0) {} 这个分支来介绍。
initStreamSlices 会给该term在intBlockPool和byteBlockPool里面分配slice,然后写入
-
接1,进入initStreamSlices 函数内部
if (streamCount + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) { // not enough space remaining in this buffer -- jump to next buffer and lose this remaining // piece intPool.nextBuffer(); }由前文可知,在Lucene里面,文档+词频、term信息(offset pos payload)是分成2个逻辑上的slice处理的,代码里面的streamCount描述的就是这个信息,因此它的数值是2. intPool.intUpto描述的是在当前那个buffer内部,截止intUpto偏移量的地方,已经被使用了。因此这个if 条件判断的是intPool里面当前这个buffer是否还有足够的空间存储新的term的指针信息,如果没有则移动到下一个buffer,然后再里面分配
-
接2,看下nextBuffer的逻辑
public void nextBuffer() { if (1+bufferUpto == buffers.length) { int[][] newBuffers = new int[(int) (buffers.length*1.5)][]; System.arraycopy(buffers, 0, newBuffers, 0, buffers.length); buffers = newBuffers; } buffer = buffers[1+bufferUpto] = allocator.getIntBlock(); bufferUpto++; intUpto = 0; intOffset += INT_BLOCK_SIZE; }上面代码的逻辑是,首先判断有没有空闲的buffer了,如果没有扩容1.5倍,然后把老的buffer的指针赋值过来,在1+bufferUpto下标处,分配一块block,然后调整内部的各种成员变量。note buffers是一个int[][]类型的二维数组,扩容的时候int[x][]对应的真正内存buffer并没有改变,后续只是单纯指针拷贝,成本不大。
回到主流程,继续判断byteBlockPool是否需要扩容,类似3,不再赘述
-
建立TermID到IntBlockPool的映射关系
termStreamAddressBuffer = intPool.buffer; streamAddressOffset = intPool.intUpto; intPool.intUpto += streamCount; // advance the pool to reserve the N streams for this term postingsArray.addressOffset[termID] = streamAddressOffset + intPool.intOffset;intPool.intOffset 变量是此前所有buffer的大小总和,例如当前是第6个buffer,intOffset = 5 * len(buffer); streamAddressOffset 是在第6个buffer中已经使用的偏移量。因此addressOffset[termID] 存储的便是新term对应在intBlockPool中的起始位置(逻辑上的连续偏移量)。
-
intBlockPool到byteBlockPool的映射关系
for (int i = 0; i < streamCount; i++) { // initialize each stream with a slice we start with ByteBlockPool.FIRST_LEVEL_SIZE) // and grow as we need more space. see ByteBlockPool.LEVEL_SIZE_ARRAY final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE); termStreamAddressBuffer[streamAddressOffset + i] = upto + bytePool.byteOffset; }循环内部分配对应的slice,由于此处是新的term第一次出现,分配的是FIRST_LEVEL_SIZE 大小的slice。upto返回的是新分配的slice在当前buffer的首地址偏移量。因此termStreamAddressBuffer[streamAddressOffset + i]记录的是每一个stream对应分配的slice的首地址偏移量。当然在索引写入的时候偏移量会随着更新,它是一个变化的值
-
记录第一个stream的第一个slice的首地址偏移量,这是一个不变的值,不会随着索引的写入而更新
postingsArray.byteStarts[termID] = termStreamAddressBuffer[streamAddressOffset]; 接下来索引生成的逻辑交给
newTerm(termID, docID);函数,在 5.4 小节介绍
5.3 newTerm 函数
由于派生类实现了newTerm函数,因此实际调用的是FreqProxTermsWriterPerField.java 中的newTerm函数,截取其中写term offset数据的逻辑如下:
if (hasOffsets) {
writeOffsets(termID, fieldState.offset);
}
-
深入writeOffsets
void writeOffsets(int termID, int offsetAccum) { final int startOffset = offsetAccum + offsetAttribute.startOffset(); final int endOffset = offsetAccum + offsetAttribute.endOffset(); assert startOffset - freqProxPostingsArray.lastOffsets[termID] >= 0; writeVInt(1, startOffset - freqProxPostingsArray.lastOffsets[termID]); writeVInt(1, endOffset - startOffset); freqProxPostingsArray.lastOffsets[termID] = startOffset; }第一个writeVInt函数将term的起始偏移写入byteBlockPool中,当然写的偏移量实际上是和上一个term起始位置的差值。差值存储可以节省内存空间,Lucene中有各种各样优化内存的压缩手段,值得单独学习,此处忽略之。
<mark>note</mark> writeVInt的第一个参数是1,表示的是第二个(从0开始计数)stream,在lucene里面第一个stream描述的是文档id和词频数据;第二个stream描述的是pos. offset payload数据。而此处即将写入的是offset,因此参数传递的是1
深入writeVInt函数,会将数据分成一个个byte然后调用
writeByte写入-
深入writeByte函数
final void writeByte(int stream, byte b) { int streamAddress = streamAddressOffset + stream; int upto = termStreamAddressBuffer[streamAddress]; byte[] bytes = bytePool.buffers[upto >> ByteBlockPool.BYTE_BLOCK_SHIFT]; assert bytes != null; int offset = upto & ByteBlockPool.BYTE_BLOCK_MASK; if (bytes[offset] != 0) { // End of slice; allocate a new one offset = bytePool.allocSlice(bytes, offset); bytes = bytePool.buffer; termStreamAddressBuffer[streamAddress] = offset + bytePool.byteOffset; } bytes[offset] = b; (termStreamAddressBuffer[streamAddress])++; }代码的一开始根据stream的值,去定位这个stream对应的slice当前已经写到的那个偏移量,5.3节中已经介绍过这些变量的作用和它们之间的关系。
if (bytes[offset] != 0)说明到达了这个slice的末尾,slice的末尾存储的是下一次分配slice的大小,它是逐次增大的一个值。前文提到过如何在各个物理上不连续的slice之间进行跳转,它们所依赖的关键信息可以从
bytePool.allocSlice中体现-
深入allocSlice
public int allocSlice(final byte[] slice, final int upto) { final int level = slice[upto] & 15; final int newLevel = NEXT_LEVEL_ARRAY[level]; final int newSize = LEVEL_SIZE_ARRAY[newLevel]; // Maybe allocate another block if (byteUpto > BYTE_BLOCK_SIZE-newSize) { nextBuffer(); } final int newUpto = byteUpto; final int offset = newUpto + byteOffset; byteUpto += newSize; // Copy forward the past 3 bytes (which we are about // to overwrite with the forwarding address): buffer[newUpto] = slice[upto-3]; buffer[newUpto+1] = slice[upto-2]; buffer[newUpto+2] = slice[upto-1]; // Write forwarding address at end of last slice: slice[upto-3] = (byte) (offset >>> 24); slice[upto-2] = (byte) (offset >>> 16); slice[upto-1] = (byte) (offset >>> 8); slice[upto] = (byte) offset; // Write new level: buffer[byteUpto-1] = (byte) (16|newLevel); return newUpto+3; }1)在前文提及,新分配slice后,会在上一个slice末尾写入新的slice的首地址偏移量,会占用多个字节,因此老的数据会覆盖,它们会被挪到新的slice前面几个字节里,上面代码中的
buffer[newUpto] = slice[upto-3];那3行做的就是这件事。2)
slice[upto-3] = (byte) (offset >>> 24);开始的4行代码,便是把新的slice偏移量写入老的slice的最后4个字节。3)
buffer[byteUpto-1] = (byte) (16|newLevel);这行代码是在新的slice的末尾写入下一次分配slice的level,这个level对应了一个更大的size。4)
return newUpto+3;因为新的slice前3个字节已经被占用了,返回的可以使用的偏移量应该 + 3 回到writeByte函数,此时可以将目标字节写入了
bytes[offset] = b;这个bytes就是从byteBlockPool里面取出的-
(termStreamAddressBuffer[streamAddress])++;体现出前文提及的随着索引数据的写入,指针偏移量也在跟着变化
小结 本文介绍了term的offset数据的写入,pos payload的数据的写入本质上笔记类似,不再赘述。文档号和词频在一篇文档处理完后才能写入,在5.4 小节简要介绍
5.4 写入文档号和词频
回到5.3小节一开始,那段代码等else 分支 termID = positionStreamSlice(termID, docID); 这个分支里面有触发文档号+词频写入内存的逻辑
最终会调用void addTerm(final int termID, final int docID)这个函数,同样调用的是派生类中的函数
这个函数里面有一个检测:
if (docID != postings.lastDocIDs[termID])如果当次处理的文档号和上一次处理的不一样,说明上一个文档处理完毕了,可以将文档号、词频等信息写入了-
写入文档号+词频
if (1 == postings.termFreqs[termID]) { writeVInt(0, postings.lastDocCodes[termID]|1); } else { writeVInt(0, postings.lastDocCodes[termID]); writeVInt(0, postings.termFreqs[termID]); }1)第一个分支里面,这个term只在这个文档里面出现了一次,则将文档号左移一位和1做或运算,然后写入 <mark>note</mark> writeVInt的参数是0
2)如果词频大于1,则文档号和词频分开写入
6 storedFieldsConsumer.writeField
这个部分涉及到域的写入,由于篇幅关系,放在下一篇学习笔记里面介绍
四 索引:从内存到磁盘
将内存中的索引中间数据写入磁盘,是另一块非常复杂的内容,鉴于本篇笔记已达到8000字了,这块内容放到下一块介绍