Elasticsearch原理解析--scroll原理

scroll是ES用来解决全量遍历数据的功能。具体使用文档见:

https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#scroll-search-results

本篇文章将为大家分享scroll的原理。通过本篇文章,大家将会明白,为什么scroll是怎么做到全量遍历功能,为什么不推荐使用scroll用作深翻页。

先从基本功能,看看scroll是如何使用的。

scroll是通过search接口触发的,search接口默认最多只能返回size=10000条记录,通过scroll,可以继续往后面遍历数据。这个是如何做大的呢?

首先search接口带上scroll参数,本次search就会返回一个scrollId,比如如下请求:

POST /my-index-000001/_search?scroll=1m
{
  "query": {
    "match": {
      "message": "foo"
    }
  }
}

返回结果如下:

{
  "_scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFjY0MlkyRFFLVFdDc3RJd0UwT0VKcUEAAAAAAAAblxZfWjZMamlqb1JXeXJlcWtlN0xfUHNR",
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10372087,
      "relation" : "eq"
    },
    ......
  }
}

可以看到返回多了一个_scroll_id,然后使用scroll接口,传递scroll_id,就能不断遍历之前search DSL命中的全部数据。

POST /_search/scroll                                                               
{
  "scroll" : "1m",                                                                
  "scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFjY0MlkyRFFLVFdDc3RJd0UwT0VKcUEAAAAAAAAblxZfWjZMamlqb1JXeXJlcWtlN0xfUHNR" 
}

这里scroll设置为1m,说明scrollId只保存1分钟,一分钟后,scrollid就消失了,再使用就会报如下错误:

{
  "error" : {
    "root_cause" : [
      {
        "type" : "search_context_missing_exception",
        "reason" : "No search context found for id [7063]"
      }
    ],
    "type" : "search_phase_execution_exception",
    "reason" : "all shards failed",
    "phase" : "query",
    "grouped" : true,
    "failed_shards" : [
      {
        "shard" : -1,
        "index" : null,
        "reason" : {
          "type" : "search_context_missing_exception",
          "reason" : "No search context found for id [7063]"
        }
      }
    ],
    "caused_by" : {
      "type" : "search_context_missing_exception",
      "reason" : "No search context found for id [7063]"
    }
  },
  "status" : 404
}

接下来看看scroll是如何实现的。

scroll是以shard为单位保存scrollId的,

在search query流程中,如果search请求传递了scroll参数,在创建ReaderContext时,就会创建的是LegacyReaderContext,LegacyReaderContext中会生成一个ScrollContext。我们来看下ScrollContext的内容:

public final class ScrollContext {
    public TotalHits totalHits = null;
    public float maxScore = Float.NaN;
    public ScoreDoc lastEmittedDoc;
    public Scroll scroll;
}

其中有个关键变量lastEmittedDoc,这个记录了上次scroll遍历到的docId位置。

以一次遍历10000天记录,遍历索引全部数据为例,之前遍历到了10000条doc,那么lastEmittedDoc就是10000,再执行一次scroll,就能继续往后遍历10000条,lastEmittedDoc就设置为20000。所以scroll就能做到不断往后遍历,直到遍历全量数据。

这里lastEmittedDoc还需要注意,在多shard时,实际lastEmittedDoc的值,要以shard实际fetch到的数据为准,所以lastEmittedDoc是在SearchService.executeFetchPhase方法中设置:

public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
        final ReaderContext readerContext = findReaderContext(request.contextId(), request);
        final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
        final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
        runAsync(getExecutor(readerContext.indexShard()), () -> {
            try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) {
                if (request.lastEmittedDoc() != null) {
                    searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
                }
                searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds()));
                searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs()));
                searchContext.docIdsToLoad(request.docIds());
......
                return searchContext.fetchResult();
            } catch (Exception e) {
                assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
                // we handle the failure in the failure listener below
                throw e;
            }
        }, wrapFailureListener(listener, readerContext, markAsUsed));
    }

然后scroll的是保存在SearchService的activeReaders对象中,activeReaders是一个Map对象,有个Reaper会定时检查过期的ReaderContext,将它从activeReaders清除:

//默认一分钟检查一次
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING = Setting.positiveTimeSetting(
        "search.keep_alive_interval",
        timeValueMinutes(1),
        Property.NodeScope
    );

// 启动清理定时任务
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval, Names.SAME);

    //清理任务
    class Reaper implements Runnable {
        @Override
        public void run() {
            for (ReaderContext context : activeReaders.values()) {
                if (context.isExpired()) {
                    logger.debug("freeing search context [{}]", context.id());
                    freeReaderContext(context.id());
                }
            }
        }
    }

所以手动删除scrollId的接口,实现方式也很简单,就是清理掉ReaderContext对应的scrollId。

接下来再看下scrollId的编码。scrollId在ReaderContext中是以Long类型的id保存的。

然后在协调节点,会将全部shard的scrollId进行编码成一个字符串返回给客户端。

在TransportSearchHelper内中,有buildScrollId和parseScrollId两个方法用来进行编解码:

static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhaseResults) {
        try {
            BytesStreamOutput out = new BytesStreamOutput();
            out.writeString(INCLUDE_CONTEXT_UUID);
            out.writeString(searchPhaseResults.length() == 1 ? ParsedScrollId.QUERY_AND_FETCH_TYPE : ParsedScrollId.QUERY_THEN_FETCH_TYPE);
            out.writeCollection(searchPhaseResults.asList(), (o, searchPhaseResult) -> {
                o.writeString(searchPhaseResult.getContextId().getSessionId());
                o.writeLong(searchPhaseResult.getContextId().getId());
                SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();
                if (searchShardTarget.getClusterAlias() != null) {
                    o.writeString(
                        RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId())
                    );
                } else {
                    o.writeString(searchShardTarget.getNodeId());
                }
            });
            return Base64.getUrlEncoder().encodeToString(out.copyBytes().array());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    static ParsedScrollId parseScrollId(String scrollId) {
        try {
            byte[] bytes = Base64.getUrlDecoder().decode(scrollId);
            ByteArrayStreamInput in = new ByteArrayStreamInput(bytes);
            final boolean includeContextUUID;
            final String type;
            final String firstChunk = in.readString();
            if (INCLUDE_CONTEXT_UUID.equals(firstChunk)) {
                includeContextUUID = true;
                type = in.readString();
            } else {
                includeContextUUID = false;
                type = firstChunk;
            }
            SearchContextIdForNode[] context = new SearchContextIdForNode[in.readVInt()];
            for (int i = 0; i < context.length; ++i) {
                final String contextUUID = includeContextUUID ? in.readString() : "";
                long id = in.readLong();
                String target = in.readString();
                String clusterAlias;
                final int index = target.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR);
                if (index == -1) {
                    clusterAlias = null;
                } else {
                    clusterAlias = target.substring(0, index);
                    target = target.substring(index + 1);
                }
                context[i] = new SearchContextIdForNode(clusterAlias, target, new ShardSearchContextId(contextUUID, id));
            }
            if (in.getPosition() != bytes.length) {
                throw new IllegalArgumentException("Not all bytes were read");
            }
            return new ParsedScrollId(scrollId, type, context);
        } catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse scroll id", e);
        }
    }

Scroll在5.x版本还增加另一个Sliced scroll功能。这是可以支持对一个DSL进行并发scroll,提高拉取数据的性能。

Sliced scroll的原理是在scroll基础上增加了分片功能。建议分片是按照shard数量的倍数来。在一个shard中的sliced还能进一步切分。

实现原理如下:

在DSL传递slice参数后,生成的Query,会wrapper上slice的query,具体代码在SliceBuilder.createSliceQuery中:

    private Query createSliceQuery(int id, int max, SearchExecutionContext context, boolean isScroll) {
        if (field == null) {
            return isScroll ? new TermsSliceQuery(IdFieldMapper.NAME, id, max) : new DocIdSliceQuery(id, max);
        } else if (IdFieldMapper.NAME.equals(field)) {
            if (isScroll == false) {
                throw new IllegalArgumentException("cannot slice on [_id] when using [point-in-time]");
            }
            return new TermsSliceQuery(IdFieldMapper.NAME, id, max);
        } else {
            MappedFieldType type = context.getFieldType(field);
            if (type == null) {
                throw new IllegalArgumentException("field " + field + " not found");
            }
            if (type.hasDocValues() == false) {
                throw new IllegalArgumentException("cannot load numeric doc values on " + field);
            } else {
                IndexFieldData<?> ifm = context.getForField(type, MappedFieldType.FielddataOperation.SEARCH);
                if (ifm instanceof IndexNumericFieldData == false) {
                    throw new IllegalArgumentException("cannot load numeric doc values on " + field);
                }
                return new DocValuesSliceQuery(field, id, max);
            }
        }
    }

wrapper的slice query根据field的类型包括TermsSliceQuery、DocIdSliceQuery、DocValuesSliceQuery。

他们的功能都是根据传递的id、max值,确定在命中的docid集合中只返回slice id对应的docid。这样原DSL就只能返回slice query对应的docId列表。

以上就是scroll的原理实现,所以可以看到scroll只能往后面遍历,而且设置了有效期,所以如果频繁设置scroll用来进行深翻页,会导致生成过多的scrollId,而且scroll也不能跳页,所以从功能定位上scroll是用来遍历全量数据使用的。如果要进行翻页,ES推荐的使用Search After功能。

使用scroll还需要注意一个问题,如果访问出现了超时,由于不确定本次scroll是否已经进行,所以可能导致获取的数据缺失,所以要进行全部重试,这里也可以用一些技巧,通过修改DSL,定位到最新获取的数据,再重新使用scroll查询也是可以的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355

推荐阅读更多精彩内容