Elasticsearch源码分析-搜索分析(三)

0. 前言

上一篇文章中,我们主要讲解了elasticsearch搜索过程的Query部分,包括初始化参数、query解析和lucene search操作。
本篇文章将继续分析搜索,主要是QUERY_THEN_FETCH搜索的Fetch阶段,包括文档排序和数据合并,具体的操作流程如下:
(1)首先对query结果进行排序,获取要fetch的doc id
(2)按shard把这些id填充到docIdsToLoad对象中
(3)按shard去elasticsearch获取文档详情
(4)将第一步中已经排好序的文档与fetch到的结果按照顺序合并
(5)如果请求中有scroll,重新构建scroll id
(6)向调用方返回响应数据
(7)如果不是scroll请求,则释放search context

1. Query结果排序

  1. 数据准备
    在Query阶段获取到shard的query结果后,将会调用processFirstPhaseResult()方法将结果放入firstResults对象中,用于一阶段全局排序。
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
    protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
        firstResults.set(shardIndex, result);
        AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
        if (shardFailures != null) {
            shardFailures.set(shardIndex, null);
        }
    }
}
  1. query结果排序和截断
    在获取第一阶段Query结果后,开始调用SearchPhaseController.sortDocs()方法对文档进行排序。
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
    @Override
    protected void moveToSecondPhase() throws Exception {
        boolean useScroll = !useSlowScroll && request.scroll() != null;
        sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
        if (docIdsToLoad.asList().isEmpty()) {
            finishHim();
            return;
        }
        ...
    }
}

假设请求的size为N,那么每个shard都会返回N条文档,那么需要进行全局排序获取topN条文档,elasticsearch采用的是使用lucene的

TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs)

进行数据的截断,只获取score比较高的文档。

3. 准备fetch doc

在获取merge后的doc id后,按shard将doc信息放入docsIdsToLoad对象中,用于fetch。

public class SearchPhaseController extends AbstractComponent {
    public void fillDocIdsToLoad(AtomicArray<IntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
        for (ScoreDoc shardDoc : shardDocs) {
            IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
            if (list == null) {
                list = new IntArrayList();
                docsIdsToLoad.set(shardDoc.shardIndex, list);
            }
            list.add(shardDoc.doc);
        }
    }
}

在获取到要fetch的doc id集合后,按shard进行遍历:
(1)获取shard所在的节点信息,然后创建fetch request
(2)调用executeFetch()方法fetch文档,入参包括shard信息、node信息和fetch request对象

public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
    @Override
    protected void moveToSecondPhase() throws Exception {
        ...
        for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
            QuerySearchResultProvider queryResult = firstResults.get(entry.index);
            DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
            ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
            executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
        }
    }
}

4. 执行fetch请求

执行fetch的过程,即将fetch请求发送到指定的节点执行,在获取执行结果后执行finishHim()进行数据的merge()

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
    searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
        @Override
        public void onResult(FetchSearchResult result) {
            result.shardTarget(shardTarget);
            fetchResults.set(shardIndex, result);
            if (counter.decrementAndGet() == 0) {
                finishHim();
            }
        }
        @Override
        public void onFailure(Throwable t) {
            docIdsToLoad.set(shardIndex, null);
            onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
        }
    });
}
  1. 发送fetch请求到指定节点
    FETCH_ID_ACTION_NAME变量值为"indices:data/read/search[phase/fetch/id]",elasticsearch将该ACTION和request发送到指定的node上
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
    sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
}
  1. 判断要执行的节点
    如果要执行的节点是当前节点,那么直接执行execute()方法,具体的步骤:
    (1)先执行callable的call()方法,即执行SearchService的executeFetchPhase()方法
    (2)根据执行结果,如果为null则执行listener.onFailure()方法,否则执行listener.onResult()
    如果要执行的节点不是当前节点,需要将ACTION和request发送到指定的node节点上
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
    if (clusterService.state().nodes().localNodeId().equals(node.id())) {
        execute(new Callable<FetchSearchResult>() {
            @Override
            public FetchSearchResult call() throws Exception {
                return searchService.executeFetchPhase(request);
            }
        }, listener);
    } else {
        transportService.sendRequest(node, action, request, new BaseTransportResponseHandler<FetchSearchResult>() {
            @Override
            public FetchSearchResult newInstance() {
                return new FetchSearchResult();
            }
            @Override
            public void handleResponse(FetchSearchResult response) {
                listener.onResult(response);
            }
            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }
        });
    }
}
  1. 远程节点handler接收消息
    在elasticsearch启动时会注入SearchServiceTransportAction对象,并且会将SearchFetchByIdTransportHandler注入FETCH_ID_ACTION_NAME中
public class SearchServiceTransportAction extends AbstractComponent {
    @Inject
    public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
        super(settings);
        transportService.registerHandler(FETCH_ID_ACTION_NAME, new SearchFetchByIdTransportHandler());
        ...
    }
}

FetchByIdTransportHandler类仅重写了父类FetchByIdTransportHandler的newInstance()方法,具体的类图如下:


SearchFetchByIdTransportHandler.png
private class SearchFetchByIdTransportHandler extends FetchByIdTransportHandler<ShardFetchSearchRequest> {
    @Override
    public ShardFetchSearchRequest newInstance() {
        return new ShardFetchSearchRequest();
    }
}

FetchByIdTransportHandler使用messageReceived()方法接收其他节点请求,然后执行SearchService的executeFetchPhase()方法

private abstract class FetchByIdTransportHandler<Request extends ShardFetchRequest> extends BaseTransportRequestHandler<Request> {
    public abstract Request newInstance();
    @Override
    public void messageReceived(Request request, TransportChannel channel) throws Exception {
        FetchSearchResult result = searchService.executeFetchPhase(request);
        channel.sendResponse(result);
    }
    @Override
    public String executor() {
        return ThreadPool.Names.SEARCH;
    }
}

5. fetch逻辑

SearchService的executeFetchPhase()方法主要逻辑如下:
(1)根据context id查找search context
(2)将要fetch的doc id放入context中
(3)fetch预处理
(4)调用FetchPhase.execute()方法执行Fetch
(5)如果不是scroll请求,释放search context
(6)记录慢fetch日志

public class SearchService extends AbstractLifecycleComponent<SearchService> {
    public FetchSearchResult executeFetchPhase(ShardFetchRequest request) throws ElasticsearchException {
        final SearchContext context = findContext(request.id());
        contextProcessing(context);
        try {
            if (request.lastEmittedDoc() != null) {
                context.lastEmittedDoc(request.lastEmittedDoc());
            }
            context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
            context.indexShard().searchService().onPreFetchPhase(context);
            long time = System.nanoTime();
            fetchPhase.execute(context);
            if (context.scroll() == null) {
                freeContext(request.id());
            } else {
                contextProcessedSuccessfully(context);
            }
            context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time);
            return context.fetchResult();
        } catch (Throwable e) {
            context.indexShard().searchService().onFailedFetchPhase(context);
            logger.trace("Fetch phase failed", e);
            processFailure(context, e);
            throw ExceptionsHelper.convertToRuntime(e);
        } finally {
            cleanContext(context);
        }
    }
}

在fetch阶段,具体逻辑为:
(1)首先创建FieldsVisitor对象,根据是否要fetch source,创建UidAndSourceFieldsVisitor或JustUidFieldsVisitor
(2)遍历每个要fetch的文档,判断文档是否为Nested结构,然后分别调用createNestedSearchHit()或者createSearchHit()得到SearchHit
(3)获取SearchHit后再补充如下阶段的结果:ScriptFieldsPhase, PartialFieldsPhase, MatchedQueriesPhase, ExplainPhase, HighlightPhase, FetchSourceSubPhase, VersionPhase, FieldDataFieldsFetchSubPhase, InnerHitsFetchSubPhase
(4)将hit结果集hits、全部命中结果数totalHits和最大得分maxScore放入context.fetchResult().hits对象中

public void execute(SearchContext context) {
    FieldsVisitor fieldsVisitor;
    // 实例化fieldsVisitor 及判断是否要fetch source
    ...
    InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoadSize()]; 
    FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();
    for (int index = 0; index < context.docIdsToLoadSize(); index++) {
        int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
        int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
        AtomicReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
        int subDocId = docId - subReaderContext.docBase;

        final InternalSearchHit searchHit;
        try {
            int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
            if (rootDocId != -1) {
                searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId, extractFieldNames, loadAllStored, fieldNames, subReaderContext);
            } else {
                searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId, extractFieldNames, subReaderContext);
            }
        } catch (IOException e) {
            throw ExceptionsHelper.convertToElastic(e);
        }
        hits[index] = searchHit;
        hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher().getIndexReader());
        for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
            if (fetchSubPhase.hitExecutionNeeded(context)) {
                fetchSubPhase.hitExecute(context, hitContext);
            }
        }
    }

    for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
        if (fetchSubPhase.hitsExecutionNeeded(context)) {
            fetchSubPhase.hitsExecute(context, hits);
        }
    }
    context.fetchResult().hits(new InternalSearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));
}

在创建SearchHit时,使用loadStoredFields从lucene中获取已经存储的字段信息,代码如下:

public class FetchPhase implements SearchPhase {
    private InternalSearchHit createSearchHit(SearchContext context, FieldsVisitor fieldsVisitor, int docId, int subDocId, List<String> extractFieldNames, AtomicReaderContext subReaderContext) {
        // fetch subDocId
        loadStoredFields(context, subReaderContext, fieldsVisitor, subDocId);
        ...
    }
    private void loadStoredFields(SearchContext searchContext, AtomicReaderContext readerContext, FieldsVisitor fieldVisitor, int docId) {
        fieldVisitor.reset();
        try {
            readerContext.reader().document(docId, fieldVisitor); 
        } catch (IOException e) {
            throw new FetchPhaseExecutionException(searchContext, "Failed to fetch doc id [" + docId + "]", e);
        }
    }
}
public final class SegmentReader extends AtomicReader implements Accountable {
  @Override
  public void document(int docID, StoredFieldVisitor visitor) throws IOException {
    checkBounds(docID);
    getFieldsReader().visitDocument(docID, visitor);
  }
}

读取的字段信息,是从lucene中解压获取获取的,这里仅能获取store为true的field数据

public final class CompressingStoredFieldsReader extends StoredFieldsReader {
  @Override
  public void visitDocument(int docID, StoredFieldVisitor visitor)
      throws IOException {
    fieldsStream.seek(indexReader.getStartPointer(docID));

    final int docBase = fieldsStream.readVInt();
    final int chunkDocs = fieldsStream.readVInt();
    // 具体代码略...
  }
}

6. 合并结果

在成功获取到fetch数据后,将调用listener.onResult() 进行设置fetch result及调用finishHim()完成数据合并并返回响应数据,具体逻辑如下:
(1)elasticsearch调用SearchPhaseController的merge()方法进行数据合并
(2)如果请求中包含scroll,则需要重写生成scroll id,用于后面继续scroll
(3)调用listener.onResponse()向调用放返回响应数据
(4)如果当前不是scroll请求,则需要释放search context

private void finishHim() {
    threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
        @Override
        public void doRun() throws IOException {
            final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
            String scrollId = null;

            if (request.scroll() != null) {
                scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
            }
            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));

            releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
        }

        @Override
        public void onFailure(Throwable t) {
            ...
        }
    });
}

合并数据的逻辑主要如下:
(1)获取文档的最大得分maxScore和命中总的文档数totalHits
(2)按照fetch开始时已经排好序的sortedDocs顺序,填充shard、score和fetch结果SearchHit
(3)merge其他信息,如suggest、facets、aggregation等等
(4)返回merge后的结果集

public InternalSearchResponse merge(ScoreDoc[] sortedDocs,
              AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
              AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
       
    long totalHits = 0;
    float maxScore = Float.NEGATIVE_INFINITY;
    for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
        QuerySearchResult result = entry.value.queryResult();
        totalHits += result.topDocs().totalHits;
        if (!Float.isNaN(result.topDocs().getMaxScore())) {
            maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
        }
    }
    if (Float.isInfinite(maxScore)) {
        maxScore = Float.NaN;
    }
    for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
        entry.value.fetchResult().initCounter();
    }
    List<InternalSearchHit> hits = new ArrayList<>();
    if (!fetchResults.isEmpty()) {
        for (ScoreDoc shardDoc : sortedDocs) {
            FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
            if (fetchResultProvider == null) {
                continue;
            }
            FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
            int index = fetchResult.counterGetAndIncrement();
            if (index < fetchResult.hits().internalHits().length) {
                InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
                searchHit.score(shardDoc.score);
                searchHit.shard(fetchResult.shardTarget());
                if (sorted) {
                    FieldDoc fieldDoc = (FieldDoc) shardDoc;
                    searchHit.sortValues(fieldDoc.fields);
                    if (sortScoreIndex != -1) {
                        searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
                    }
                }
                hits.add(searchHit);
            }
        }
    }
    InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);

    return new InternalSearchResponse(searchHits, facets, aggregations, suggest, timedOut, terminatedEarly);
}

至此,完成了elasticsearch搜索的全过程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容