Elasticsearch源码分析-更新索引分析

0.前言

如果想更新索引的某条文档数据,可以通过如下几种方式:
(1)构造document的完整field和数据,然后使用Elasticsearch的Index API重新创建索引

curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}'

(2)构造document的部分field和数据,然后使用Elasticsearch 的Update API 更新索引

curl -XPOST 'localhost:9200/twitter/tweet/1/_update' -d '{
    "doc" : {
        "user" : "new_name"
    },
    "doc_as_upsert" : true
}' 

本篇文章讨论的是第二种方式,该请求对应的url pattern为 /{index}/{type}/{id}/_update,且仅支持Post 请求

1 请求Handler

update url对应的handler为RestUpdateAction,支持的参数如下:
retry_on_conflict: 控制在最终抛出异常之前重试update的次数。
routing: routing用于将更新请求路由到正确的分片上,并在更新的文档不存在时为upsert请求设置routing。不能用于更新已经存在文档的routing。
parent: parent用于将更新请求路由到正确的分片上,并在更新的文档不存在时为upsert请求设置parent。不能用于更新已经存在文档的parent。
timeout: 等待shard变为可用的超时时间。
consistency: 索引/删除操作的写入一致性。
refresh: 在操作发生后立即刷新相关的主分片和副本分片(而不是整个索引),以便更新的文档立即显示在搜索结果中。
fields: 返回更新文档中的相关字段。指定_source返回完整更新的source。
version & version_type: update API在内部使用Elasticsearch的版本控制支持,以确保在更新期间文档不会更改。

解析完请求参数后,会调用client.update()更新索引,传入的action参数值为UpdateAction.INSTANCE,它绑定的action为TransportUpdateAction类。

public class ActionModule extends AbstractModule {
    @Override
    protected void configure() {
        registerAction(UpdateAction.INSTANCE,TransportUpdateAction.class);
    }
}

因此,在执行TransportAction.execute() 时,会执行TransportUpdateAction.doExecute()方法,即为处理请求的Action入口

TransportUpdateAction 类图

2 处理 Action

在更新索引前,使用shouldAutoCreate()方法判断是否需要创建索引

public boolean shouldAutoCreate(String index, ClusterState state) {
        // action.auto_create_index 是 false
        // 不再继续检查, 不创建索引
        if (!needToCheck) {
            return false;
        }
        // 如果索引或者别名中已经包含了index
        if (state.metaData().hasConcreteIndex(index)) {
            return false;
        }
        // action.auto_create_index 是 false
        if (globallyDisabled) {
            return false;
        }
        // matches not set, default value of "true"
        // action.auto_create_index 是 null 或者是 true 或者是 false
        if (matches == null) {
            return true;
        }
        // 正则条件判断index是否满足action.auto_create_index
        for (int i = 0; i < matches.length; i++) {
            char c = matches[i].charAt(0);
            if (c == '-') {
                if (Regex.simpleMatch(matches2[i], index)) {
                    return false;
                }
            } else if (c == '+') {
                if (Regex.simpleMatch(matches2[i], index)) {
                    return true;
                }
            } else {
                if (Regex.simpleMatch(matches[i], index)) {
                    return true;
                }
            }
        }
        return false;
    }

①如果需要创建索引,则,调用TransportCreateIndexAction.execute()方法创建索引,然后执行innerExecute()方法更新文档
②否则直接调用innerExecute()更新文档

在更新文档前,需要获取文档索引所在的主分片信息,然后请请求发送到对应分片的节点上,执行shardOperation()

class AsyncSingleAction {
        protected boolean doStart() throws ElasticsearchException {
            // 集群的节点信息
            nodes = observer.observedState().nodes();
            try {
                // 获取要操作的primary shard
                shardIt = shards(observer.observedState(), internalRequest);
            } catch (Throwable e) {
               
            }

            // this transport only make sense with an iterator that returns a single shard routing (like primary)
            // 由于一个存在的文档, 必然最多只需要一个主分片
            assert shardIt.size() == 1;

            internalRequest.request().shardId = shardIt.shardId().id();

            // 如果shard 所在的节点为当前节点
            if (shard.currentNodeId().equals(nodes.localNodeId())) {
                internalRequest.request().beforeLocalFork();
                try {
                    // 使用线程池方式执行操作
                    threadPool.executor(executor).execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // 抽象方法, 需要子类实现
                                shardOperation(internalRequest, listener);
                            } catch (Throwable e) {
                                // ...
                            }
                        }
                    });
                } catch (Throwable e) {
                    // ...
                }
            } else {
                // 如果是远程节点, 需要将请求发送到对应节点
                DiscoveryNode node = nodes.get(shard.currentNodeId());
                transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions(), new BaseTransportResponseHandler<Response>() {
                    @Override
                    public void handleResponse(Response response) {
                        listener.onResponse(response);
                    }
        }
}

3 执行请求

执行索引operation的过程,主要是
①先构造索引请求,即先获取已经存在的文档信息
②merge已有文档和待更新的数据,或者执行请求中的脚本,获取完整的doc信息
③执行TransportIndexAction.execute() 创建索引重新创建文档,或者TransportDeleteAction.execute() 删除文档

/**
     * shard 操作逻辑
     * @param request   InternalRequest
     * @param listener  ActionListener
     * @param retryCount    retryCount 重试次数
     * @throws ElasticsearchException   Elasticsearch 异常
     */
    protected void shardOperation(final InternalRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) throws ElasticsearchException {
        // 获取index 对应的service
        IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
        // 根据shard id 获取对应的IndexShard
        IndexShard indexShard = indexService.shardSafe(request.request().shardId());
        // 对更新请求进行转换, 获取最终要更新的索引信息
        final UpdateHelper.Result result = updateHelper.prepare(request.request(), indexShard);
        switch (result.operation()) {
            case UPSERT:
                // 构造索引请求, 将result.action()中的type id routing 和source 拷贝到index request 对象中
                IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action(), request.request());
                // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
                final BytesReference upsertSourceBytes = upsertRequest.source();
                // 执行TransportIndexAction.execute(),  创建索引文档
                indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
                    @Override
                    public void onResponse(IndexResponse response) {
                        // 处理请求
                });
                break;
            case INDEX:
                IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action(), request.request());
                // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
                final BytesReference indexSourceBytes = indexRequest.source();
                indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
                    // 处理请求
                });
                break;
            case DELETE:
                DeleteRequest deleteRequest = new DeleteRequest((DeleteRequest)result.action(), request.request());
                deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
                    // 执行请求
                });
                break;
            case NONE:
                // ...
                break;
            default:
                throw new ElasticsearchIllegalStateException("Illegal operation " + result.operation());
        }
    }

在上面代码中,final UpdateHelper.Result result = updateHelper.prepare(request.request(), indexShard); 主要用来对更新请求进行转换, 获取最终要更新的索引信息
合并待更新数据和已经存在的数据的策略,主要是覆盖和补充更新的方式:

/**
    将提供的数据变更(changes)更新到source中
      1 已经存在source的字段将会被提供的变更数据(chanes)覆盖掉
      2 不存source在的字段会被提供的变更数据(changes)补充更新
**/
public static boolean update(Map<String, Object> source, Map<String, Object> changes, boolean checkUpdatesAreUnequal) {
        boolean modified = false;
        for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
            if (!source.containsKey(changesEntry.getKey())) {
                // safe to copy, change does not exist in source
                source.put(changesEntry.getKey(), changesEntry.getValue());
                modified = true;
                continue;
            }
            Object old = source.get(changesEntry.getKey());
            if (old instanceof Map && changesEntry.getValue() instanceof Map) {
                // recursive merge maps
                modified |= update((Map<String, Object>) source.get(changesEntry.getKey()),
                        (Map<String, Object>) changesEntry.getValue(), checkUpdatesAreUnequal && !modified);
                continue;
            }
            // update the field
            source.put(changesEntry.getKey(), changesEntry.getValue());
            if (modified) {
                continue;
            }
            if (!checkUpdatesAreUnequal) {
                modified = true;
                continue;
            }
            modified = !Objects.equal(old, changesEntry.getValue());
        }
        return modified;
    }

4 处理异常

在更新lucene索引时,会先检查获取的文档版本和索引中当前文档版本是否冲突,
版本类型分为如下4种:
INTERNAL
EXTERNAL
EXTERNAL_GTE
FORCE
对于每种类型,都有不同的判断冲突的标准

private void innerIndex(Index index) throws IOException {
        synchronized (dirtyLock(index.uid())) {
            // 获取当前版本号 currentVersion
            final long currentVersion;
            VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
            // 如果version map 中没有拿到当前version, 则需要从reader 中获取当前version
            if (versionValue == null) {
                currentVersion = loadCurrentVersionFromIndex(index.uid());
            } else {
                // 判断版本是否待删除
                if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                    currentVersion = Versions.NOT_FOUND; // deleted, and GC
                } else {
                    currentVersion = versionValue.version();
                }
            }

            // 更新后的Version
            long updatedVersion;
            // 待更新索引的版本号
            long expectedVersion = index.version();
            // 使用当前version 和待更新索引version 判断是否存在版本冲突
            // 判断条件为:
            // INTERNAL 为当前版本和待更新版本不一致
            // EXTERNAL 为当前版本大于等于待更新版本
            // EXTERNAL_GTE 为当前版本大于待更新版本
            // FORCE 为待更新版本未指定
            if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
                if (index.origin() == Operation.Origin.RECOVERY) {
                    return;
                } else {
                    throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
                }
            }

            // INTERNAL 如果当前版本为未找到或者未设置, 则为1, 否则为当前版本+1
            // EXTERNAL 待更新索引版本号
            // EXTERNAL_GTE 待更新索引版本号
            // FORCE 待更新索引版本号
            updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);

            index.updateVersion(updatedVersion);
            // 当前不存在文档版本, 则为create
            if (currentVersion == Versions.NOT_FOUND) {
                // document does not exists, we can optimize for create
                index.created(true);
                if (index.docs().size() > 1) {
                    indexWriter.addDocuments(index.docs(), index.analyzer());
                } else {
                    indexWriter.addDocument(index.docs().get(0), index.analyzer());
                }
            } else {
                // 已经存在文档版本, 则update
                if (versionValue != null) {
                    index.created(versionValue.delete()); // we have a delete which is not GC'ed...
                }
                if (index.docs().size() > 1) {
                    indexWriter.updateDocuments(index.uid(), index.docs(), index.analyzer());
                } else {
                    indexWriter.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
                }
            }
            // 增加translog
            Translog.Location translogLocation = translog.add(new Translog.Index(index));

            versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));

            indexingService.postIndexUnderLock(index);
        }
    }

在更新文档时,如果发生VersionConflictEngineException或者DocumentAlreadyExistsException,则会重新执行shardOperation进行重试,最大默认重试次数为3次

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容