es源码笔记-Routing

ES 建索引时默认是根据文档标识符 _id 将文档均分至多个分片。当搜索数据时,默认查询所有分片结果然后汇总,而并不必须知道数据到底存在哪个分片上。

路由算法就是根据routing和文档id计算目标shardid的过程。
一般情况下,路由计算方式为下面的公式:

shard_num = hash(_routing) % num_primary_shards

默认情况下,_routing值就是文档id。
ES使用随机id和Hash算法来确保文档均匀地分配给分片。当使用自定义id或routing时, id 或 routing 值可能不够随机,造成数据倾斜,部分分片过大。在这种情况下,可以使用index.routing_partition_size 配置来减少倾斜的风险。routing_partition_size越大,数据的分布越均匀。
在设置了index.routing_partition_size的情况下,计算公式为:

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

也就是说,对于同一个routing值,hash(_routing)的结果固定的,hash(_id) % routing_partition_size的结果有 routing_partition_size 个可能的值,两个组合在一起,对于同一个routing值的多个doc,也就能计算出 routing_partition_size 可能的shard了,即一个shard集合。
index.routing_partition_size取值应具有大于1且小于index.number_of_shards的值

计算过程的实现如下

    private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
        final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
        // of original index to hash document
        return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
    }

Search时如何根据routing找到指定的分片?

例子

GET /{index}/{type}/_search?routing=beijing

通过发送search请求查询数据。指定了routing是beijing

1、解析流程

当es接收到上面的请求时,交给org.elasticsearch.rest.action.search.RestSearchAction处理,repareRequest方法中将请求体解析为SearchRequest数据结构

public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        SearchRequest searchRequest = new SearchRequest();      
        IntConsumer setSize = size -> searchRequest.source().size(size);
        request.withContentOrSourceParamParserOrNull(parser ->
            parseSearchRequest(searchRequest, request, parser, setSize));

        return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
    }

    public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
                                          XContentParser requestContentParser,
                                          IntConsumer setSize) throws IOException {      
        String searchType = request.param("search_type");
        parseSearchSource(searchRequest.source(), request, setSize);        searchRequest.requestCache(request.paramAsBoolean("request_cache", null));
        String scroll = request.param("scroll");
        searchRequest.routing(request.param("routing")); //接收到routing参数,并封装到searchRequest中
        searchRequest.preference(request.param("preference"));
        searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));        searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true));
        checkRestTotalHits(request, searchRequest);
    }
构造目的shard列表

prepareRequest方法构造请求后通过transport模块发送给org.elasticsearch.action.search.TransportSearchAction处理

    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
                               OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,
                               Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
                               List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
                               ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {

        Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
        GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
                concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
        GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
            searchRequest.getLocalClusterAlias(), remoteShardIterators);

       。。。。
    }

将请求涉及的本集群shard列表和远程集群的shard列表(远程集群用于跨集群访问)合并

其中routing查找指定分片的流程就在org.elasticsearch.cluster.routing.OperationRouting.searchShards(ClusterState, String[], Map<String, Set<String>>, String, ResponseCollectorService, Map<String, Long>)方法中

    public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState,
                                                           String[] concreteIndices,
                                                           @Nullable Map<String, Set<String>> routing,
                                                           @Nullable String preference,
                                                           @Nullable ResponseCollectorService collectorService,
                                                           @Nullable Map<String, Long> nodeCounts) {
        final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
····
    }

    private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
                                                              @Nullable Map<String, Set<String>> routing) {
   
        for (String index : concreteIndices) {
           ····
            if (effectiveRouting != null) {
                for (String r : effectiveRouting) {
                 
                        set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset)));
                   
                }
        
        }
        return set;
    }

最终调用calculateScaledShardId方法计算出指定的分片

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容