Lucene索引全流程学习笔记-之一

一 简介

在上一篇Lucene学习笔记:Lucene检索全流程学习笔记 中,笔者描述了为什么学习Lucene源码,以及学习笔记的写作方式。本文写作方式类似,描述一篇文档经过处理,最终倒排数据写入磁盘的全过程。聚焦于全流程调用链是如何被贯穿起来的,1)从官方demo中的 indexDocs(writer, docDir); 语句出发,顺着调用链,一步一步走到生成内存中的倒排中间数据全流程;2)从官方demo中的 writer.close(); 出发,顺着调用链将倒排数据落盘的全流程贯穿起来。因此基于这个目的,本文在描述的过程中会略去相当多的内容:

  • Lucene生成的索引文件多种多样,有倒排方面的.tim .tip .doc .pos .pay文件,也有其他的如点数据、docValues、termVector等,由于本文的主要目的是倒排数据落盘全流程,因此主要围绕倒排文件展开,其他索引文件会简单提一句

  • 在落盘的过程中,涉及到索引的合并、索引的删除等,它们笔记复杂,也足够重要,重要到完全应该单独成文。另外一方面,为了突出本文的重点:文档->倒排(内存)、倒排(内存) -> 落盘,也会舍弃这部分的介绍,或者在调用链中遇到的时候简单提一句

  • 在创建索引的调用链中,有很多逻辑存在多个分支,本文会选择最简单的一个分支来介绍,以期降低学习难度和成本。例如在创建索引的时候,会面临2种情况,1) 当前目录已经存在索引文件 2) 当前目录没有索引文件。在这种情况下,本文会选择2)这种场景来分析介绍,舍弃1)的部分。

  • 触发索引文件落盘有多种场景,例如自动落盘:索引阶段系统检测到索引文件的内存总消耗量超过了某一个阈值,便会自动触发落盘;主动落盘:用户调用了close接口,触发落盘逻辑执行,等等。本文基于官方demo来介绍,focus在close触发的落盘这一个路径,其他路径略去不介绍。

二 从官方demo开始

为了描述上、理解上的简洁,笔者将官方demo裁剪重整后,贴出来,如下:

位置:org/apache/lucene/demo/IndexFiles.java

public static void main(String[] args) {
      Directory dir = FSDirectory.open(Paths.get(indexPath));
      Analyzer analyzer = new StandardAnalyzer();
      IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
      iwc.setOpenMode(OpenMode.CREATE);
      IndexWriter writer = new IndexWriter(dir, iwc);
      //filePath是dir目录中需要索引的一个原始文档,例如./lucene.txt
      Path filePath = xxx; 

      try (InputStream stream = Files.newInputStream(filePath)) {
          // make a new, empty document
          Document doc = new Document();

          Field pathField = new StringField("path", file.toString(), Field.Store.YES);
          doc.add(pathField);   
          doc.add(new LongPoint("modified", lastModified)); 
          doc.add(new TextField("contents", new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))));
          writer.addDocument(doc);
    }
    writer.close();
}
  1. 首先是打开需要索引的文件:dir = FSDirectory.open(Paths.get(indexPath)); dir目录下的文件都是需要索引的

  2. 实例化分词器 Analyzer analyzer = new StandardAnalyzer(); 标准分词器是一个简单的以空格来分词的工具

  3. 索引配置项iwc = new IndexWriterConfig(analyzer),配置了很多重要的属性,简单罗列几个

    1. maxBufferedDocs 内存中索引的文件超过这个数触发落盘

    2. delPolicy ,例如KeepOnlyLastCommitDeletionPolicy 删除策略,每次只保留上一个最新的commit

    3. mergeScheduler 索引merge调度器

    4. codec 描述了各种索引文件的版本,此处是lucene87, 据此能找到各种索引文件的生成工具类,参见 Lucene检索全流程学习笔记

    5. mergePolicy 索引合并策略

  4. 实例化indexWriter IndexWriter writer = new IndexWriter(dir, iwc); 索引落盘的主要工具类

  5. 生成中间索引数据,存在内存中 writer.addDocument(doc);

  6. 落盘writer.close(); 触发内存中的索引中间数据落盘

小结 5、6 是索引落盘的2个核心阶段,也是本文重点介绍对象

三 索引:从文档到内存

构建索引的第一个阶段是将磁盘上待索引的文件经过处理后,生成倒排索引的中间数据,并存储在内存中,供后续落盘阶段进一步处理。

IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
IndexWriter writer = new IndexWriter(dir, iwc)
writer.addDocument(doc);

1 初始化:DefaultIndexingChain

new IndexWriterConfig(analyzer) 的构造函数(基类)里,有一行关键代码如下:

indexingChain = DocumentsWriterPerThread.defaultIndexingChain;


static final IndexingChain defaultIndexingChain = new IndexingChain() {

    @Override
    DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
                         FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
                         Consumer<Throwable> abortingExceptionConsumer) {
      return new DefaultIndexingChain(indexCreatedVersionMajor, segmentInfo, directory, fieldInfos, indexWriterConfig, abortingExceptionConsumer);
    }
  };

getChain 函数返回的 <mark>new DefaultIndexingChain</mark> 非常重要,生成倒排中间数据便是由它负责。后文再详细介绍之

2 初始化:构建IndexWriter

new IndexWriter(dir, iwc) 由上文可知iwc的成员IndexingChain是专门负责将文档转化为倒排中间数据,而它本身作为IndexWriter 构造函数的参数,因此不难看出IndexWriter是一个总控类,负责做一些生成倒排前的准备工作,以及在特定的时间点调用对应的索引生成工具产出对应的索引数据。IndexWriter构造函数里面由2条分支 1) Create模式:新建索引 2) Append模式:目录下已经由索引文件,通过Append的方式创建索引。由于1)比较简单,本文以此为例来介绍

  1. conf.setIndexWriter(this); 以IndexWriter对象自身为参数,set到conf中(前文提到的iwc变量) 目的是防止IndexWriter被复用

  2. 软删除相关:softDeletesEnabled = config.getSoftDeletesField() != null; 软删除是对某些被删除的文档做一个标记,并不真正从物理层面删掉数据,而是检索阶段过滤掉被软删除的那些文档

  3. 获取写锁、索引合并调度器的获取+初始化

  4. 接下来以Create模式的分支来继续

  5. indexExists = DirectoryReader.indexExists(directory); 判断当前目录下有没有索引文件(后续回滚使用)

  6. 如果当前目录有索引文件,则读取,并创建回滚点,以防止意外情况下需要回滚当前操作

    final SegmentInfos sis = new SegmentInfos(config.getIndexCreatedVersionMajor());
            if (indexExists) {
              final SegmentInfos previous = SegmentInfos.readLatestCommit(directory);
              sis.updateGenerationVersionAndCounter(previous);
            }
            segmentInfos = sis;
            rollbackSegments = segmentInfos.createBackupSegmentInfos();
    }
    

    其中readLatestCommit 底层真正调度的是 public static final SegmentInfos readCommit(Directory directory, ChecksumIndexInput input, long generation) 这个函数,它的逻辑在Lucene检索全流程学习笔记 中已经详细介绍过了,主要是从读取当前目录下的索引到内存对象中。

    1. 读取当前索引中的域相关信息:globalFieldNumberMap = getFieldNumberMap();

      注:Lucene中域的编号是连续递增的,如果一个目录中已经有索引的情况下(假设域的最大编号是:N)再创建新的索引,新的索引中的域的编号要从N+1 开始,这也是这一块读取域相关信息的原因

  7. 接下来是很多工具类对象的创建和初始化,本文只介绍docWriter

3 初始化:DocumentsWriter

DocumentsWriter 是写倒排数据的总入口,下面看看它的初始化做了哪些事情

  1. 实例化DocumentsWriter 对象
docWriter = new DocumentsWriter(flushNotifications, segmentInfos.getIndexCreatedVersionMajor(), pendingNumDocs,
          enableTestPoints, this::newSegmentName,
          config, directoryOrig, directory, globalFieldNumberMap);
  1. 进入到DocumentsWriter 的构造函数

    this.perThreadPool = new DocumentsWriterPerThreadPool(() -> {
          final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
          return new DocumentsWriterPerThread(indexCreatedVersionMajor,
              segmentNameSupplier.get(), directoryOrig,
              directory, config, deleteQueue, infos,
              pendingNumDocs, enableTestPoints);
        });
    flushControl = new DocumentsWriterFlushControl(this, config);
    

    perThreadPool是一个关键点,索引数据落盘时。上面的代码里this 指针被设置到flushControl里面,后面索引落盘的时候flushControl会从this(DocumentsWriter)里面找到perThreadPool并会从这个pool里面取出一个dwpt来执行相关任务。

  2. 接2,在DocumentsWriterPerThread 构造函数里面,有一个关键调用:

    segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort());
    consumer = indexWriterConfig.getIndexingChain().getChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException);
    

    segmentInfo 描述了一次索引的提交,是索引数据落盘前的中间数据结果。consumer 是最关键的一个对象,它负责 1)待索引的文档经过处理,形成内存中的中间状态的待落盘的格式 2)将1)中的数据落盘 2个最核心的功能。

  3. 接3,consumer 实际上被赋值的是前文提及的DefaultIndexingChain ,它里面有一个关键对象:termsHash = new FreqProxTermsWriter(intBlockAllocator, byteBlockAllocator, bytesUsed, termVectorsWriter); termHash 负责3中提及的功能1)。

  4. 接4,termsHash的构造函数里有2个关键成员变量

    intPool = new IntBlockPool(intBlockAllocator);
    bytePool = new ByteBlockPool(byteBlockAllocator);
    

    bytePool的作用是存储term对应的文档id、词频、term在文档中的位置信息、偏移量信息、payload信息等。Lucene里面通过精巧的设计,使得可以流式地存储这些信息。intPool可以理解为存储的是特定term下指向bytePool中对应信息的位置。

4 生成倒排中间数据

再回到第 章部分的demo,从writer.addDocument(doc); 调用开始,就进入到了将文档数据进行处理,形成中间状态的索引数据。

  1. 经过基层嵌套调用后,走到如下这个函数

    private long updateDocuments(final DocumentsWriterDeleteQueue.Node<?> delNode, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
    
  2. 1中的这个函数里面有一行关键调用:docWriter.updateDocuments(docs, delNode)4.1小节介绍

4.1 docWriter.updateDocuments(xx)

为了描述上的简介,聚焦于本文写作的目标,下面的函数经过笔者删减

long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs,
                       final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {

    final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
    final DocumentsWriterPerThread flushingDWPT;
    long seqNo;
    final int dwptNumDocs = dwpt.getNumDocsInRAM();
    seqNo = dwpt.updateDocuments(docs, delNode, flushNotifications);
    final boolean isUpdate = delNode != null && delNode.isDelete();
    flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate);

    if (postUpdate(flushingDWPT, hasEvents)) {
      seqNo = -seqNo;
    }
    return seqNo;
  }
  • 代码中的dwpt便是第3小节第2部分提及的用于生成中间索引数据的工具对象

  • flushingDWPT 是在文档操作完毕后,会检查是不是需要将当前的索引数据罗盘,前文提及lucene中落盘的触发点比较多,本文只会聚焦于demo中的writer.close()函数触发的落盘逻辑

  • dwpt.updateDocuments 是关键调用,负责前文提及的 生成索引中间数据

     long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
        try {
          testPoint("DocumentsWriterPerThread addDocuments start");
          assert abortingException == null: "DWPT has hit aborting exception but is still indexing";
          if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
            infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + numDocsInRAM + " seg=" + segmentInfo.name);
          }
          final int docsInRamBefore = numDocsInRAM;
          boolean allDocsIndexed = false;
          try {
            for (Iterable<? extends IndexableField> doc : docs) {
              // Even on exception, the document is still added (but marked
              // deleted), so we don't need to un-reserve at that point.
              // Aborting exceptions will actually "lose" more than one
              // document, so the counter will be "wrong" in that case, but
              // it's very hard to fix (we can't easily distinguish aborting
              // vs non-aborting exceptions):
              reserveOneDoc();
              consumer.processDocument(numDocsInRAM++, doc);
            }
            allDocsIndexed = true;
            return finishDocuments(deleteNode, docsInRamBefore);
          } finally {
            if (!allDocsIndexed && !aborted) {
              // the iterator threw an exception that is not aborting
              // go and mark all docs from this block as deleted
              deleteLastDocs(numDocsInRAM - docsInRamBefore);
            }
          }
        } finally {
          maybeAbort("updateDocuments", flushNotifications);
        }
      }
    

    这个函数里面,会遍历每一个代索引的文档,并调用consumer.processDocument(numDocsInRAM++, doc); 进行具体的操作。从第3 小节的第3部分可知,consumer的实际类型是DefaultIndexingChain 下面进入4.1 小节介绍

    4.1 DefaultIndexingChain::processDocument

    for (IndexableField field : document) {
       fieldCount = processField(docID, field, fieldGen, fieldCount);
    }
    

    processDocument 函数里面的核心调用代码如上所示,遍历文档中的每一个域,然后调用processField(xx)函数进行详细的处理。下面按照业务逻辑的流转顺序介绍之

    1. 获取域名、域的类型. 在Lucene里面的一个文档对象包含多个域,每一个域对应具体的域值,域值的类型多种多样,例如字符串类型。在索引阶段也是按照域的维度来处理:全部文档相同的域对应的域值组织到一块来处理,fieldName、fieldType就是用来获取处理这个域名下的索引数据的工具类

      String fieldName = field.name();
      IndexableFieldType fieldType = field.fieldType();
      
    2. 获取1中提及的特定域的索引生成工具类对象,也就是下面代码片段中的 fp

      PerField fp = null;
      fp = getOrAddField(fieldName, fieldType, true);
      

      getOrAddField 做的事情是,先看看是不是已经有这个域对应的索引生成工具了,有则直接返回,否则生成一个新的,下面在 4.2 小节会详细介绍之

    3. fp.invert(docID, field, first); 生成倒排索引数据的真正入口,逻辑非常复杂,下面在第5 小节专门介绍

    4. 如果这个域是第一次出现,则将fp工具类对象存储起来,下次复用:也就是下次处理其他文档的时候,遇到相同域名,会再复用之

      if (first) {
          fields[fieldCount++] = fp;
          fp.fieldGen = fieldGen;
      }
      
    5. 存储域值。Lucene中生成文档对象时,对应的域可以添加是否存储的属性,如果设置存储的话,便会生成:fdt、fdx、等索引文件,它们记录了域值以及和docid相关的信息。可以看到和生成倒排数据类型,此处也会使用到fp小工具。同样写存储域的逻辑比较复杂storedFieldsConsumer.writeField 会在第6小节来介绍

      if (fieldType.stored()) {
           fp = getOrAddField(fieldName, fieldType, false);
           String value = field.stringValue();
           storedFieldsConsumer.writeField(fp.fieldInfo, field);
      }
      
    6. 下面还有点数据、docValue等索引数据的处理,本文不介绍这一块

    4.2 getOrAddField

    位置:org/apache/lucene/index/DefaultIndexingChain.java 进入到该函数的逻辑:

    1. 先从现有的链表里面看看能不能找到现成的. 下面代码片段中的fieldHash的每一个入口是一个链表,至于为啥有多个链表入口,在hashCode函数里面有介绍,为了减少冲突竞争,至于设计的原理不是本文的重点,按下不表。

       final int hashPos = name.hashCode() & hashMask;
          PerField fp = fieldHash[hashPos];
          while (fp != null && !fp.fieldInfo.name.equals(name)) {
            fp = fp.next;
       }
      
    2. 如果1中没有找到,则开始生成新的fp的逻辑

      FieldInfo fi = fieldInfos.getOrAdd(name);
      initIndexOptions(fi, fieldType.indexOptions());
      

      在上一篇关于Lucene检索流程的学习笔记里,介绍过FieldInfo,它记录了某个域的属性,例如这个域是否存储payload、termVector、norms等索引数据。同样也是看看有现成的复用,没有新生成返回。initIndexOptions 会将一些属性信息,设置到刚才的FieldInfo里,如果不一致则抛出异常:换句话说,不同的文档里面有相同的域名,其属性缺不同,是不行的。

    3. 以2中的数据为参数,构造fp工具类对象

      fp = new PerField(indexCreatedVersionMajor, fi, invert,
                indexWriterConfig.getSimilarity(), indexWriterConfig.getInfoStream(), indexWriterConfig.getAnalyzer());
      

      PerField的构造函数里面会设置一些成员变量,如:FieldInfo类型的fi、similarity(例如BM25), 如果生成这个PerField传了invert = true,说明目的是生成倒排数据,会继续执行setInvertState 操作

    4. 接3,setInvertState主要做了如下事情

      termsHashPerField = termsHash.addField(invertState, fieldInfo);
      

      从第3小节的第3步,可以知道termsHash的实际类型是FreqProxTermsWriter ,因此addField实际调用的是下面这个函数

        public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) {
          return new FreqProxTermsWriterPerField(invertState, this, fieldInfo, nextTermsHash.addField(invertState, fieldInfo));
        }
      

      可见termsHashPerField实际上是FreqProxTermsWriterPerField类型的对象。

    5. FreqProxTermsWriterPerField 里面持有一些关键数据结构,用于存储索引中间格式的数据。它的构造函数里面会调用基类的构造函数,代码如下:

        TermsHashPerField(int streamCount, IntBlockPool intPool, ByteBlockPool bytePool, ByteBlockPool termBytePool,
                          Counter bytesUsed, TermsHashPerField nextPerField, String fieldName, IndexOptions indexOptions) {
          this.intPool = intPool;
          this.bytePool = bytePool;
          this.streamCount = streamCount;
          this.fieldName = fieldName;
          this.nextPerField = nextPerField;
          assert indexOptions != IndexOptions.NONE;
          this.indexOptions = indexOptions;
          PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed);
          bytesHash = new BytesRefHash(termBytePool, HASH_INIT_SIZE, byteStarts);
        }
      

      1)intPool和bytePool前文介绍过,直接存储文档号、term频率、term位置等信息

      2)此处streamCount是1或者2;a) 如果只存储 文档号和词频则是1,b) 如果还额外存储位置、payload等信息则是2. 在bytePool里面分别有两条逻辑上的分片,每一个分片对应存储此处提及的a) b)

      3)nextPerField 指的是termsVector,这里采用了职责链的设计模式

    6. bytesHash管理了terms和其ID的映射,输入分词后的term, 返回一个编号ID

    7. 回到4中提及的setInvertState 函数,还有一些处理norms数据的逻辑

    5 生成倒排索引

    回顾前文提到的fp.invert(docID, field, first) 函数,它是索引中间数据生成的入口。函数的第一个参数是待处理文档的ID、第二个参数是这个文档中待处理的域、第三个参数表示是这个域是否第一次出现。

    5.1 主流程

    进入fp.invert函数,看看总体上做了哪些事,然后在后续的小节再深入其中的关键点。

    1. 读出这个域对应的域值,然后分词为Term流,后续依次处理每一个term

      TokenStream stream = tokenStream = field.tokenStream(analyzer, tokenStream)
      

      field对象内部有成员变量存储域名、域值、编号等信息,此处的tokenStream会将域值分词后形成一个数据流stream,后续可以遍历这个stream取出每一个term得到它的诸如term的位置、offset等信息.

    2. invertState.setAttributeSource(stream); 将stream的一些属性设置到invertState中,后续每一栋stream的指针到下一个term的时候,可以通过invertState获得term的值、在文档中的位置、payload数据等。invertState是对stream 的简单包装

    3. termsHashPerField.start(field, first); 在第章,第4.2小节我们知道 termsHashPerField 的实际类型是<mark>FreqProxTermsWriterPerField</mark>,接下来索引数据的处理便是由该函数接手。

    4. 接3,start函数里面会把invertState 的涉及到term的成员变量赋值过来。那么FreqProxTermsWriterPerField 是如何跟invertState联系起来的呢,可以参见本章4.2 节第4步骤。

    5. Term位置的处理: 获取当前term所在的位置(笔者注:例如同一个term多次出现,此次是该term的第几次出现 待确认) ,然后累加到全局变量invertState.position上,后面会据此做一些异常边界的判断,例如超过阈值抛出异常等等

      int posIncr = invertState.posIncrAttribute.getPositionIncrement();
      invertState.position += posIncr;
      
    6. 获取term在文档中的起始位置

      int startOffset = invertState.offset + invertState.offsetAttribute.startOffset();
      int endOffset = invertState.offset + invertState.offsetAttribute.endOffset();
      
    7. 接下来索引数据的生成会交给termsHashPerField.add处理,在本章5.3小节详细介绍

    8. 接下来是一些收尾性质的逻辑

    5.2 流式存储

    在介绍生成索引中间数据及它的存储格式前,先抛出几个问题,有利于理解Lucene中流式索引数据的存储格式。

    5.2.1 层级关系

如上图所示,Lucene中处理文档的顺序是:域-> term -> 文档。所有文档的同一个域名组织在一起;相同域名下的同一个term组织在一起,term下面挂的是包含这个term的文档列表。当然图中省略了位置、offset、payload等信息。

5.2.2 一个例子

假设有一个文档, 是我们处理的第一个文档,因而文档号是docid=0, 它有一个名为content 的域,域值是 good good study , 经过分词后有3个term:goodgoodstudy (忽略payload)

一个存储格式如上图所示:第一个4表示接下来有一个长度为4的term :good (真正存储的应该是二进制,如ASIIC码等,此处直接用可视化字符来描述)。接下来的红色的数字0表示这个term 出现在编号为0的文档中,0后面的红色数字2表示good 这个term在文档0里面出现的频率是2. 接下来的0表示这个term出现的位置是0,接下来的0,3表示它在域值里面的起始位置是从0开始到第3个字符,接下来的158分别表示第二个同名的term good,它出现的位置是2,从第5个字符到第8个字符。再接下来存储的是term strudy相关的信息。

5.2.3 缺陷

下面看一下5.2.2 中的缺陷:

  1. 假设现在开始处理文档1,它的content域里面也有good这个term,那么后面应该把文档号和词频写入上面的数据中,跟在good后面,但是从图片可以看出good后面已经没有空间来存相应的信息了。

  2. 针对1,一个解决办法是在good后面留足够的buffer,但问题是每一个term对应的文档号的数量不同,无法预知

  3. 针对1,专门申请一块单独的buffer用于写入文档号和词频,这样随着索引的数据越来越多,term的数量也越来越多,需要申请非常多的单独的buffer,且它们的大小无法预知,这样内存使用不紧凑。在落盘期间也难以还原域、term、文档、位置、offset等数据之间的关系

  4. 文档号和词频:在一个文档处理结束后,才知道词频,而term的位置信息在文档尚未处理结束之前,就随着处理不断地产生数据写入内存。

5.2.4 解决办法

  1. 紧凑式内存使用

如图所示,宏观上lucene分配若干等长的buffer,每一个固定大小:例如8M,然后每次使用的时候按顺序从这些buffer里面去分配

  1. 无法预知长度的解决办法

如图所示,Lucene里面有一个变长slice的概念,第一次使用的时候分配 Size1个字节,等到不够用的时候,再次重新分配,会找到下一个level应该分配的字节数Size2个字节。图中1.1 1.2分别对应了size1 、size2个字节的slice

  1. 衔接问题

    1)2中的1.1 和1.2 如何衔接到一块呢?Lucene中每一个slice的末尾处会存储一些元信息,例如当我们在1.1里面移动指针不断地写入数据,某个时刻会遇到buffer的末尾的若干字节里面存储有数字,这个数字告诉我们下一次分配slice应该分配多长的size,然后分配后,这几个特殊的字节,会存入一个表示新分配的slice的偏移量的值。

2)这样顺着这些元信息,就可以将物理上不连续的slice在逻辑上连到一块,供我们遍历之。(当然图中的这个例子,两块slice是紧挨着的,后面会介绍不是紧挨着的情况)

3)因为存储下一个slice的偏移量的值可能比较大,我们可能需要多个字节,这样有可能把老的数据覆盖掉,因此老的若干字节,会被挪到新的slice里面。

  1. 写入频率不一致的解决办法

文档词频位置、offset、payload 从逻辑上分成2条线,各自在自己的slice里面处理,例如图中绿色的slice存储文档词频、橙色的slice存储 位置、offset、payload信息等。这个也显示了 3中提到的一个逻辑上连续的slice在物理上并不连续的情况,它们是靠着相应的元信息来保持一个逻辑线条的完整连贯性。

小结 由于上面的设计,使得Lucene在写倒排数据时不仅内存使用非常紧凑,还可以再次基础上,包装更高级的迭代器,使得我们遍历索引中间数据就像遍历vector那么容易。后文结合代码再详细分析,会有一个更全面的了解

5.2.5 相关数据结构

下面先来介绍下承载索引中间数据的数据结构,然后在 5.2.6 小节开始深入对应的业务逻辑中。

  • byteHash:int termID = bytesHash.add(termBytes);可以根据term的值,得到一个对应该term的编号。

  • ByteBlockPool 里面有一个byte [][]类型的buffer。这个里面就是存储term、文档+词频、term位置、term offset、term payload信息的地方

  • postingsArray.addressOffset 根据termID, 可以定位到对应的IntBlockPool

  • IntBlockPool 里面有一个int[][]类型的buffer。里面有指针指向ByteBlockPool中特定term对应的slice的位置(一开始指向起点,随着数据的写入,指针也在移动,相当于指向对应逻辑slice的尾部)note 这里的指针实际是一个偏移量,为了描述的形象,直接使用之。

  • postingsArray.byteStarts[termID] 指向这个term对应在ByteBlocPool中的slice的起点,且这个值不会变化。

用一个图来描述上述各个变量之间的关系

5.3 termsHashPerField.add

经过5.2 小节的介绍,我们对Lucene中存储索引中间数据的格式有了初步了解。让我们再一次回到主流程的termsHashPerField.add 函数,来看一看索引生成的真正逻辑是啥样的。

int termID = bytesHash.add(termBytes);
    //System.out.println("add term=" + termBytesRef.utf8ToString() + " doc=" + docState.docID + " termID=" + termID);
    if (termID >= 0) { // New posting
      // Init stream slices
      initStreamSlices(termID, docID);
    } else {
      termID = positionStreamSlice(termID, docID);
    }

如上面代码片段所示,termID如果小于0,说明第一次遇见这个term,否则之前已经出现过了。为了描述上的方便,我们选取if (termID >= 0) {} 这个分支来介绍。

  1. initStreamSlices 会给该term在intBlockPool和byteBlockPool里面分配slice,然后写入

  2. 接1,进入initStreamSlices 函数内部

    if (streamCount + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
          // not enough space remaining in this buffer -- jump to next buffer and lose this remaining
          // piece
          intPool.nextBuffer();
    }
    

    由前文可知,在Lucene里面,文档+词频、term信息(offset pos payload)是分成2个逻辑上的slice处理的,代码里面的streamCount描述的就是这个信息,因此它的数值是2. intPool.intUpto描述的是在当前那个buffer内部,截止intUpto偏移量的地方,已经被使用了。因此这个if 条件判断的是intPool里面当前这个buffer是否还有足够的空间存储新的term的指针信息,如果没有则移动到下一个buffer,然后再里面分配

  3. 接2,看下nextBuffer的逻辑

      public void nextBuffer() {
        if (1+bufferUpto == buffers.length) {
          int[][] newBuffers = new int[(int) (buffers.length*1.5)][];
          System.arraycopy(buffers, 0, newBuffers, 0, buffers.length);
          buffers = newBuffers;
        }
        buffer = buffers[1+bufferUpto] = allocator.getIntBlock();
        bufferUpto++;
    
        intUpto = 0;
        intOffset += INT_BLOCK_SIZE;
      }
    

    上面代码的逻辑是,首先判断有没有空闲的buffer了,如果没有扩容1.5倍,然后把老的buffer的指针赋值过来,在1+bufferUpto下标处,分配一块block,然后调整内部的各种成员变量。note buffers是一个int[][]类型的二维数组,扩容的时候int[x][]对应的真正内存buffer并没有改变,后续只是单纯指针拷贝,成本不大。

  4. 回到主流程,继续判断byteBlockPool是否需要扩容,类似3,不再赘述

  5. 建立TermID到IntBlockPool的映射关系

        termStreamAddressBuffer = intPool.buffer;
        streamAddressOffset = intPool.intUpto;
        intPool.intUpto += streamCount; // advance the pool to reserve the N streams for this term
        postingsArray.addressOffset[termID] = streamAddressOffset + intPool.intOffset;
    

    intPool.intOffset 变量是此前所有buffer的大小总和,例如当前是第6个buffer,intOffset = 5 * len(buffer); streamAddressOffset 是在第6个buffer中已经使用的偏移量。因此addressOffset[termID] 存储的便是新term对应在intBlockPool中的起始位置(逻辑上的连续偏移量)。

  6. intBlockPool到byteBlockPool的映射关系

    for (int i = 0; i < streamCount; i++) {
          // initialize each stream with a slice we start with ByteBlockPool.FIRST_LEVEL_SIZE)
          // and grow as we need more space. see ByteBlockPool.LEVEL_SIZE_ARRAY
          final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
          termStreamAddressBuffer[streamAddressOffset + i] = upto + bytePool.byteOffset;
    }
    

    循环内部分配对应的slice,由于此处是新的term第一次出现,分配的是FIRST_LEVEL_SIZE 大小的slice。upto返回的是新分配的slice在当前buffer的首地址偏移量。因此termStreamAddressBuffer[streamAddressOffset + i]记录的是每一个stream对应分配的slice的首地址偏移量。当然在索引写入的时候偏移量会随着更新,它是一个变化的值

  7. 记录第一个stream的第一个slice的首地址偏移量,这是一个不变的值,不会随着索引的写入而更新

    postingsArray.byteStarts[termID] = termStreamAddressBuffer[streamAddressOffset];
    
  8. 接下来索引生成的逻辑交给newTerm(termID, docID); 函数,在 5.4 小节介绍

5.3 newTerm 函数

由于派生类实现了newTerm函数,因此实际调用的是FreqProxTermsWriterPerField.java 中的newTerm函数,截取其中写term offset数据的逻辑如下:

if (hasOffsets) {
    writeOffsets(termID, fieldState.offset);
}
  1. 深入writeOffsets

      void writeOffsets(int termID, int offsetAccum) {
        final int startOffset = offsetAccum + offsetAttribute.startOffset();
        final int endOffset = offsetAccum + offsetAttribute.endOffset();
        assert startOffset - freqProxPostingsArray.lastOffsets[termID] >= 0;
        writeVInt(1, startOffset - freqProxPostingsArray.lastOffsets[termID]);
        writeVInt(1, endOffset - startOffset);
        freqProxPostingsArray.lastOffsets[termID] = startOffset;
      }
    

    第一个writeVInt函数将term的起始偏移写入byteBlockPool中,当然写的偏移量实际上是和上一个term起始位置的差值。差值存储可以节省内存空间,Lucene中有各种各样优化内存的压缩手段,值得单独学习,此处忽略之。

    <mark>note</mark> writeVInt的第一个参数是1,表示的是第二个(从0开始计数)stream,在lucene里面第一个stream描述的是文档id和词频数据;第二个stream描述的是pos. offset payload数据。而此处即将写入的是offset,因此参数传递的是1

  2. 深入writeVInt函数,会将数据分成一个个byte然后调用writeByte 写入

  3. 深入writeByte函数

      final void writeByte(int stream, byte b) {
        int streamAddress = streamAddressOffset + stream;
        int upto = termStreamAddressBuffer[streamAddress];
        byte[] bytes = bytePool.buffers[upto >> ByteBlockPool.BYTE_BLOCK_SHIFT];
        assert bytes != null;
        int offset = upto & ByteBlockPool.BYTE_BLOCK_MASK;
        if (bytes[offset] != 0) {
          // End of slice; allocate a new one
          offset = bytePool.allocSlice(bytes, offset);
          bytes = bytePool.buffer;
          termStreamAddressBuffer[streamAddress] = offset + bytePool.byteOffset;
        }
        bytes[offset] = b;
        (termStreamAddressBuffer[streamAddress])++;
      }
    

    代码的一开始根据stream的值,去定位这个stream对应的slice当前已经写到的那个偏移量,5.3节中已经介绍过这些变量的作用和它们之间的关系。

  4. if (bytes[offset] != 0) 说明到达了这个slice的末尾,slice的末尾存储的是下一次分配slice的大小,它是逐次增大的一个值。

  5. 前文提到过如何在各个物理上不连续的slice之间进行跳转,它们所依赖的关键信息可以从bytePool.allocSlice 中体现

  6. 深入allocSlice

    public int allocSlice(final byte[] slice, final int upto) {
        final int level = slice[upto] & 15;
        final int newLevel = NEXT_LEVEL_ARRAY[level];
        final int newSize = LEVEL_SIZE_ARRAY[newLevel];
    
        // Maybe allocate another block
        if (byteUpto > BYTE_BLOCK_SIZE-newSize) {
          nextBuffer();
        }
    
        final int newUpto = byteUpto;
        final int offset = newUpto + byteOffset;
        byteUpto += newSize;
    
        // Copy forward the past 3 bytes (which we are about
        // to overwrite with the forwarding address):
        buffer[newUpto] = slice[upto-3];
        buffer[newUpto+1] = slice[upto-2];
        buffer[newUpto+2] = slice[upto-1];
    
        // Write forwarding address at end of last slice:
        slice[upto-3] = (byte) (offset >>> 24);
        slice[upto-2] = (byte) (offset >>> 16);
        slice[upto-1] = (byte) (offset >>> 8);
        slice[upto] = (byte) offset;
            
        // Write new level:
        buffer[byteUpto-1] = (byte) (16|newLevel);
    
        return newUpto+3;
      }
    

    1)在前文提及,新分配slice后,会在上一个slice末尾写入新的slice的首地址偏移量,会占用多个字节,因此老的数据会覆盖,它们会被挪到新的slice前面几个字节里,上面代码中的buffer[newUpto] = slice[upto-3]; 那3行做的就是这件事。

    2)slice[upto-3] = (byte) (offset >>> 24); 开始的4行代码,便是把新的slice偏移量写入老的slice的最后4个字节。

    3)buffer[byteUpto-1] = (byte) (16|newLevel); 这行代码是在新的slice的末尾写入下一次分配slice的level,这个level对应了一个更大的size。

    4)return newUpto+3; 因为新的slice前3个字节已经被占用了,返回的可以使用的偏移量应该 + 3

  7. 回到writeByte函数,此时可以将目标字节写入了 bytes[offset] = b; 这个bytes就是从byteBlockPool里面取出的

  8. (termStreamAddressBuffer[streamAddress])++; 体现出前文提及的

    随着索引数据的写入,指针偏移量也在跟着变化

小结 本文介绍了term的offset数据的写入,pos payload的数据的写入本质上笔记类似,不再赘述。文档号和词频在一篇文档处理完后才能写入,在5.4 小节简要介绍

5.4 写入文档号和词频

回到5.3小节一开始,那段代码等else 分支 termID = positionStreamSlice(termID, docID); 这个分支里面有触发文档号+词频写入内存的逻辑

  1. 最终会调用void addTerm(final int termID, final int docID)这个函数,同样调用的是派生类中的函数

  2. 这个函数里面有一个检测:if (docID != postings.lastDocIDs[termID]) 如果当次处理的文档号和上一次处理的不一样,说明上一个文档处理完毕了,可以将文档号、词频等信息写入了

  3. 写入文档号+词频

    if (1 == postings.termFreqs[termID]) {
            writeVInt(0, postings.lastDocCodes[termID]|1);
    } else {
            writeVInt(0, postings.lastDocCodes[termID]);
            writeVInt(0, postings.termFreqs[termID]);
    }
    

    1)第一个分支里面,这个term只在这个文档里面出现了一次,则将文档号左移一位和1做或运算,然后写入 <mark>note</mark> writeVInt的参数是0

    2)如果词频大于1,则文档号和词频分开写入

6 storedFieldsConsumer.writeField

这个部分涉及到域的写入,由于篇幅关系,放在下一篇学习笔记里面介绍

四 索引:从内存到磁盘

将内存中的索引中间数据写入磁盘,是另一块非常复杂的内容,鉴于本篇笔记已达到8000字了,这块内容放到下一块介绍

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容