ES5.6 search流程与scroll

ES search大致流程

请求转化,由RestSearchAction转为TransportSearchAction,执行其doExecute()方法

1、如果查询请求的索引含有正则表达式和别名,找出具体的索引

indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
            timeProvider.getAbsoluteStartMillis(), localIndices.indices());

2、找出routing,并根据routing找出shard

Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
      searchRequest.indices());
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
        concreteIndices, routingMap, searchRequest.preference());

3、解析请求中的权重boost

Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

4、根据请求的类型选择不同的query类

switch(searchRequest.searchType()) {
    case DFS_QUERY_THEN_FETCH:
        searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
            aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
            timeProvider, clusterStateVersion, task);
        break;
    case QUERY_AND_FETCH:
    case QUERY_THEN_FETCH:
        searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
            aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
            timeProvider, clusterStateVersion, task);
        break;
    default:
        throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
}

插播一条DFS_QUERY_THEN_FETCH与QUERY_THEN_FETCH的区别。

  • QUERY_THEN_FETCH
    请求执行有两个阶段。第一个阶段就是查询所有相关的shards。所有的shards执行请求并根据form和size返回一个排好序的结果。协调节点获取到所有的信息后merge并resort,然后根据form和size取出结果。第二个阶段则是根据第一阶段的结果在相应的shard上取出数据。
  • DFS_QUERY_THEN_FETCH
    与QUERY_THEN_FETCH几乎一样。只有在第一阶段为了更精确的打分,计算的是分布式的term frequencies。

5、SearchQueryThenFetchAsyncAction中则是执行标准的查询流程:

  • query
  • fetch
  • merge

SearchQueryThenFetchAsyncAction标准查询流程

1、Query

首先遍历shard,执行shard查询请求。

for (final SearchShardIterator shardIt : shardsIts) {
    shardIndex++;
    final ShardRouting shard = shardIt.nextOrNull();
    if (shard != null) {
        performPhaseOnShard(shardIndex, shardIt, shard);
    } else {
        // really, no shards active in this group
        onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
    }
}

每个shard通过SearchTransportService发送query请求:

getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
        buildShardSearchRequest(shardIt), getTask(), listener);

本地则是执行TransportService的sendLocalRequest方法.最终调用的是

handler.messageReceived(request, channel);

此处的channel是DirectResponseChannel,而此处的handler则是TransportSearchAction 初始化的时候写入SearchTransportService中的。

transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
  new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
    @Override
    public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
        SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
        channel.sendResponse(result);
    }
});

因此,最终执行的是searchService的executeQueryPhase方法。

真正的query本地查询:

1)生成一个searchContext,然后将request写入context:

final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
        engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase);

parseSource(context, request.source());

2)queryPhase预处理。比如说如果请求没有query,默认为match_all,如果没有boost,采用默认的值等等

queryPhase.preProcess(context);

3)执行查询

loadOrExecuteQueryPhase(request, context);

loadOrExecuteQueryPhase中则是调用queryPhase执行查询:

queryPhase.execute(context);

在queryPhase中执行真正的query查询:
1)重写query,比如说 讲一个prefixQuery转化为多个包含termQuery的BoolQuery
assert query == searcher.rewrite(query); // already rewritten

2)将各个子查询求用Collector包裹,Collector传给lucene,进行真正的lucene查询

if (terminateAfterSet) {
    final Collector child = collector;
    // throws Lucene.EarlyTerminationException when given count is reached
    collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
    if (doProfile) {
        collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT,
                Collections.singletonList((InternalProfileCollector) child));
    }
}

if (searchContext.parsedPostFilter() != null) {
    final Collector child = collector;
    // this will only get applied to the actual search collector and not
    // to any scoped collectors, also, it will only be applied to the main collector
    // since that is where the filter should only work
    final Weight filterWeight = searcher.createNormalizedWeight(searchContext.parsedPostFilter().query(), false);
    collector = new FilteredCollector(collector, filterWeight);
    if (doProfile) {
        collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_POST_FILTER,
                Collections.singletonList((InternalProfileCollector) child));
    }
}
// plug in additional collectors, like aggregations
final List<Collector> subCollectors = new ArrayList<>();
subCollectors.add(collector);
subCollectors.addAll(searchContext.queryCollectors().values());
collector = MultiCollector.wrap(subCollectors);

//最后包裹一个Cancellable
if (collector != null) {
    final Collector child = collector;
    collector = new CancellableCollector(searchContext.getTask()::isCancelled, searchContext.lowLevelCancellation(), collector);
    if (doProfile) {
        collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_CANCELLED,
            Collections.singletonList((InternalProfileCollector) child));
    }
}

3)执行lucene的查询

searcher.search(query, collector);

4)将查询结果以TopDocs的方式返回

queryResult.topDocs(topDocsCallable.call(), sortValueFormats);

topDocsCallable是根据不同的查询封装的。举个例子:

if (query.getClass() == MatchAllDocsQuery.class) {
    collector = null;
    topDocsCallable = new Callable<TopDocs>() {
        @Override
        public TopDocs call() throws Exception {
            int count = searcher.getIndexReader().numDocs();
            return new TopDocs(count, Lucene.EMPTY_SCORE_DOCS, 0);
        }
    };
} else if (query.getClass() == TermQuery.class && searcher.getIndexReader().hasDeletions() == false) {
    final Term term = ((TermQuery) query).getTerm();
    collector = null;
    topDocsCallable = new Callable<TopDocs>() {
        @Override
        public TopDocs call() throws Exception {
            int count = 0;
            for (LeafReaderContext context : searcher.getIndexReader().leaves()) {
                count += context.reader().docFreq(term);
            }
            return new TopDocs(count, Lucene.EMPTY_SCORE_DOCS, 0);
        }
    };
}

2、fetch阶段(+merge阶段):

执行的是FetchSearchPhase的dorun方法。
1)首先reduce

final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();

resultConsumer 是在构造SearchQueryThenFetchAsyncAction的时候创建的,执行的是SearchPhaseController的reducedQueryPhase方法。

searchPhaseController.newSearchPhaseResults(request, shardsIts.size())

2)查找reduce出的文档id,按照shardId存放

final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);

如果是scroll查询,将lastEmittedDocPerShard存入ShardFetchSearchRequest:

final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
               searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
               : null;
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
                            lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());

3)针对于每个shard,创建fetch请求,并执行

ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
    lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
    connection);

本地节点收到请求后执行的是SearchService的executeFetchPhase方法。这主要涉及些lucene的东西,不在详细解析。
4)当从所有的shard完获取完结果之后执行:

final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);

moveToNextPhase中开始执行merge:

final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);

merge结束后,通过nextPhaseFactory将结果发送出去:

    (response, scrollId) -> new ExpandSearchPhase(context, response,
                (finalResponse) -> sendResponsePhase(finalResponse, scrollId, context))

  private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
    return new SearchPhase("response") {
        @Override
        public void run() throws IOException {
            context.onResponse(context.buildSearchResponse(response, scrollId));
        }
    };
}

context则是SearchQueryThenFetchAsyncAction的父类AbstractSearchAsyncAction,调用buildSearchResponse方法构造response,并回调listener。

public final void onResponse(SearchResponse response) {
    listener.onResponse(response);
}

scroll查询

scroll查询原理

在第一次查询时,记录上一次查询的位置,在接下来的查询中获取到上次查询的位置,接着查询。

比如说将查询order by time offset 0 limit 100,改写成order by time where time>0 limit 100,记录最后一个$time_max,接下来的查询order by time offset 100 limit 100,改写成order by time where time>$time_max limit 100。如此往复,可以看出scroll是一个常量查询延迟和开销。
这个从源码中也可以看出:

if (returnsDocsInOrder(query, searchContext.sort())) {
    if (scrollContext.totalHits == -1) {
        // first round
        assert scrollContext.lastEmittedDoc == null;
        // there is not much that we can optimize here since we want to collect all
        // documents in order to get the total number of hits
    } else {
        // now this gets interesting: since we sort in index-order, we can directly
        // skip to the desired doc and stop collecting after ${size} matches
        if (scrollContext.lastEmittedDoc != null) {
        if (scrollContext.lastEmittedDoc != null) {
            BooleanQuery bq = new BooleanQuery.Builder()
                .add(query, BooleanClause.Occur.MUST)
                .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
                .build();
            query = bq;
        }
        searchContext.terminateAfter(numDocs);
    }
}

当scrollContext.lastEmittedDoc不为空也就是查询scroll的下一页时,将查询变为一个带偏移量的bool查询。
在本地查出数据后,更新scrollContext的值:

if (scrollContext != null) {
    if (scrollContext.totalHits == -1) {
        // first round
        scrollContext.totalHits = topDocs.totalHits;
        scrollContext.maxScore = topDocs.getMaxScore();
    } else {
        // subsequent round: the total number of hits and
        // the maximum score were computed on the first round
        topDocs.totalHits = scrollContext.totalHits;
        topDocs.setMaxScore(scrollContext.maxScore);
    }
    if (searchContext.request().numberOfShards() == 1) {
        // if we fetch the document in the same roundtrip, we already know the last emitted doc
        if (topDocs.scoreDocs.length > 0) {
            // set the last emitted doc
            scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
        }
    }
}

我们看到上述只更新了scrollContext的totalHits和maxScore,而lastEmittedDoc则是在fetch阶段中更新的:
先将lastEmittedDocPerShard存入ShardFetchSearchRequest:

final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
               searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
               : null;
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
                            lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());

然后每个节点在接收到fetch信息后,执行SearchService的executeFetchPhase方法:

 if (request.lastEmittedDoc() != null) {
                context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
 }

当使用scroll请求时,scrollID是不变的,从SearchScrollAsyncAction代码中可以看出:

protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryPhase,
                                  final AtomicArray<? extends SearchPhaseResult> fetchResults) {
    try {
        final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(),
            fetchResults::get);
        // the scroll ID never changes we always return the same ID. This ID contains all the shards and their context ids
        // such that we can talk to them abgain in the next roundtrip.
        String scrollId = null;
        if (request.scroll() != null) {
            scrollId = request.scrollId();
        }
        listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
            buildTookInMillis(), buildShardFailures()));
    } catch (Exception e) {
        listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容

  • 前两天突然被业务部的同事问了一句:“我现在要做搜索结果全量导,该用哪个接口,性能要好的?”之前虽然是知道这三种方法...
    华安火车迷阅读 24,819评论 27 35
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 走在风里,安静而自由,任袖口兜满向往,衣袂处飘着浓浓的思念,混着滚烫的泪,熬一锅难以入味的醒青春汤。 推开...
    隔壁人未眠阅读 179评论 0 0
  • 华为军团与蒙古帝国 华为越来越像一家大公司了, 中央集权与中层不当责:其实很多时候表现的是高层不当责,自我批判少了...
    高原_微风阅读 630评论 0 0
  • 2017.08.19. 星期六 晴天 今天早上,我们八点就要出发去东山,我非常高兴。我们昨天就收拾好东西了,今...
    王紫杨阅读 155评论 4 10