StarRocks Elasticsearch Catalog原理简析

前言

Elasticsearch不仅是强大的全文搜索引擎,在很多场景下(特别是TiDB、ShardingSphere等框架成熟之前)也被当做分布式HTAP数据库使用,在存储、更新海量数据的同时,提供高效的点查和部分聚合查询能力。StarRocks从3.1版本开始支持Elasticsearch Catalog,极大方便了ES数据的联邦查询。本文简要分析其原理,并提出一个小问题和对应的临时解决方法。

元数据获取阶段

当用户创建一个ES Catalog时,本质是创建了ElasticsearchConnector和它对应的ElasticsearchMetadata,后者持有该Catalog的全部配置信息和访问ES集群的EsRestClient。这点和2.x版本中旧有的ES外表不同,每张ES外表都会对应一个EsRestClient,会导致目标ES集群的HTTP连接数比正常偏多,ES Catalog则基本不存在这个问题。

每个ES Catalog只有一个默认数据库default_db,以下则是ES实例中的索引,在FE中称为EsTable,相当于复用了原ES外表的实现(当然ES Catalog会自动获取并推断字段,无需自己建表)。每个EsTable对象都持有一个EsMetaStateTracker用于同步元数据,其中又分为3个阶段(phase),按顺序分别为:

  • VersionPhase:通过GET /请求获取ES集群的版本号;
  • MappingPhase:通过GET /indexName/_mapping请求获取索引的Mapping信息,同时解析keyword类型字段(包括text内嵌的keyword)和存在doc_values的字段(即允许排序、聚合的字段),并存入上下文;
  • PartitionPhase:通过GET /indexName/_search_shards请求获取索引的分片信息,再通过GET /_nodes/http请求获取ES集群数据节点的地址,将分片ID和所在节点的映射关系存入EsShardPartitions容器。

FE计划阶段

ES Catalog查询对应的物理节点是EsScanNode,在生成Fragment的过程中除了维护Catalog的信息外,还会负责计算ScanRangeLocation,即每个BE节点负责请求的ES分片的对应关系,同时会尽量做colocate分配,使得BE节点和请求的ES分片所在节点是同一个(当然实际部署中这种情况不多见)。另外执行EXPLAIN语句时,会打印查询谓词翻译出来的ES DSL,如下所示。注意这个DSL只是示意作用,实际执行时BE会重新生成一次。

MySQL [default_db]> EXPLAIN SELECT id,waybillCode,orderTime FROM realtimewaybillmonitor_202409 WHERE yn <= 0 AND orderTime >= hours_sub(now(), 1) AND waybillCode LIKE 'JDX%' AND length(sku) > 3 LIMIT 1000;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                                                             |
|  OUTPUT EXPRS:13: id | 130: waybillCode | 71: orderTime                                                                                                     |
|   PARTITION: UNPARTITIONED                                                                                                                                  |
|                                                                                                                                                             |
|   RESULT SINK                                                                                                                                               |
|                                                                                                                                                             |
|   2:EXCHANGE                                                                                                                                                |
|      limit: 1000                                                                                                                                            |
|                                                                                                                                                             |
| PLAN FRAGMENT 1                                                                                                                                             |
|  OUTPUT EXPRS:                                                                                                                                              |
|   PARTITION: RANDOM                                                                                                                                         |
|                                                                                                                                                             |
|   STREAM DATA SINK                                                                                                                                          |
|     EXCHANGE ID: 02                                                                                                                                         |
|     UNPARTITIONED                                                                                                                                           |
|                                                                                                                                                             |
|   1:Project                                                                                                                                                 |
|   |  <slot 13> : 13: id                                                                                                                                     |
|   |  <slot 71> : 71: orderTime                                                                                                                              |
|   |  <slot 130> : 130: waybillCode                                                                                                                          |
|   |  limit: 1000                                                                                                                                            |
|   |                                                                                                                                                         |
|   0:EsScanNode                                                                                                                                              |
|      TABLE: realtimewaybillmonitor_202409                                                                                                                   |
|      PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%', length(14: sku) > 3                                      |
|      LOCAL_PREDICATES: length(14: sku) > 3                                                                                                                  |
|      REMOTE_PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%'                                                    |
|      ES_QUERY_DSL: {"bool":{"must":[{"range":{"yn":{"lte":0}}},{"range":{"orderTime":{"gte":"2024-09-26 15:46:17"}}},{"wildcard":{"waybillCode":"JDX*"}}]}} |
|      ES index/type: realtimewaybillmonitor_202409/realtimewaybillmonitor                                                                                    |
|      limit: 1000                                                                                                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+

可见上述查询的前三个谓词都可以下推到ES,但是第四个谓词无法下推,需要将结果拉取到SR端再进行过滤。

BE执行阶段

BE接收到前述EsScanNode后,将能够下推到ES的谓词封装为EsPredicate,分为几种情况:

  • 二元谓词,且一侧需为字面量,形如yn <= 0orderTime >= hours_sub(now(), 1)(右侧可以做常量折叠)都符合条件;
  • 函数调用谓词,支持esquery()(直接透传DSL的SR内置函数)、IS NULLIS NOT NULLLIKE,其他的均无法下推。即如果把上述示例的waybillCode LIKE 'JDX%'改成starts_with(waybillCode, 'JDX') = 1,这个条件就不能下推了;
  • INNOT IN谓词,对应terms query,简单直接;
  • 复合的AND谓词,实际上是对以上三种情况的组合做分解。

下推到ES的谓词会从谓词列表中删除。接下来每个BE会分别创建ESScanReader以扫描ES数据,这里需要注意,如果不是所有谓词都下推到了ES(即谓词列表中还有剩余),那么为了保证结果准确,原始查询中的LIMIT子句也不能下推。

上一节中的查询实际生成的DSL JSON如下所示。如果无法命中doc_values,则会改用source查询。

{
    "query": {
        "bool": {
            "filter": [{
                "bool": {
                    "should": [{
                        "range": {
                            "yn": {
                                "lte": "0"
                            }
                        }
                    }]
                }
            }, {
                "bool": {
                    "should": [{
                        "range": {
                            "orderTime": {
                                "gte": "1727336859000"
                            }
                        }
                    }]
                }
            }, {
                "bool": {
                    "should": [{
                        "wildcard": {
                            "waybillCode": "JDX*"
                        }
                    }]
                }
            }]
        }
    },
    "stored_fields": "_none_",
    "docvalue_fields": ["waybillCode", "orderTime", "yn", "id", "sku"],
    "sort": ["_doc"],
    "size": 4096
}

正式执行查询时,又分为两种情况。

  • LIMIT子句下推到了ES,那么BE会认为这是一个"exactly-once"的查询(代码中如此),可以类比流式处理引擎中exactly-once的含义,即“只查询一次就可以了”。此时组装的搜索请求URL形如{target}/{index}/{type}/_search?terminate_after={limit}&preference=_shards:{shards}&{filter_path}
  • 若没有LIMIT子句下推到ES,则需要执行Scroll查询,分页获取结果。此时组装的搜索请求URL形如{target}/{index}/{type}/_search?scroll={keep_alive}&preference=_shards:{shards}&{filter_path}。Scroll上下文的TTL由BE参数es_scroll_keepalive设定,默认是5m

接下来ESScanReader每次请求上述URL获取一批数据,调用超时由BE参数es_http_timeout_ms设定,默认是5000(即5秒),在网络环境欠佳时,应适当调大。获取到的数据经过JSON解析,获取到doc_values或者_source,逐行填充到Chunk中(没有值的则填充默认值)。这里实际上可以优化为按列填充,代码中也有相应的TODO标记。

ES数组类型的问题

ES没有显式的数组类型,当某字段插入了多个值时,它会自然地变为数组类型,但在索引Mapping中无法直接区分该字段是否为数组。在我们的历史ES集群中,有大量ES索引含有实际为数组的字段,使用SR ES Catalog查询时则会抛出异常或只返回第一个值,影响体验。这里提出一个不优雅的临时解决方案,在Catalog参数中增加array_fields配置项,让用户创建ES Catalog时手动指定数组字段。

// Fields that should be treated as arrays when building Elasticsearch external table.                      
// Since Elasticsearch makes no distinction between scalar and array types, we should manually specify them.
// The format is: `field1,index2:field2...`                                                                 
// which means `field1` in all indices and `field2` in `index2` are arrays.                                 
@Config(key = KEY_ARRAY_FIELDS,                                                                             
        desc = "Fields that should be treated as arrays when building Elasticsearch external table. " +     
                "The format is: `field1,index2:field2,...`.",                                               
        defaultValue = "")                                                                                  
private String arrayFields;                                                                                 

然后在ElasticsearchMetadata中获取并缓存每个索引中的数组字段名。

private Map<String, Set<String>> indicesWithArrayFields;                                                     
                                                                                                             
public ElasticsearchMetadata(EsRestClient esRestClient, Map<String, String> properties, String catalogName) {
    this.esRestClient = esRestClient;                                                                        
    this.properties = properties;                                                                            
    this.catalogName = catalogName;                                                                          
                                                                                                             
    this.indicesWithArrayFields = Arrays.stream(StringUtils.split(properties.get(KEY_ARRAY_FIELDS), ","))     
            .map(s -> StringUtils.split(s, ":"))                                                              
            .filter(kv -> kv.length <= 2)                                                                    
            .collect(                                                                                        
                    Collectors.toMap(                                                                        
                            kv -> kv.length == 2 ? kv[0] : "",                                               
                            kv -> new HashSet<>(Collections.singletonList(kv.length == 2 ? kv[1] : kv[0])),  
                            (v1, v2) -> {                                                                    
                                v1.addAll(v2);                                                               
                                return v1;                                                                   
                            }                                                                                
                    )                                                                                        
            );                                                                                               
}                                                                                                            

构建EsTable时,会调用EsUtil.convertColumnSchema()方法创建ES表的Schema,将对应索引的arrayFields参数传递给它,并将数组字段重新用ArrayType包装起来即可。

public static List<Column> convertColumnSchema(EsRestClient client, String index, Set<String> arrayFields)
        throws AnalysisException {                                                                        
    List<Column> columns = new ArrayList<>();                                                             
    String mappings = client.getMapping(index);                                                           
    JSONObject properties = parseProperties(index, mappings);                                             
    if (null == properties) {                                                                             
        return columns;                                                                                   
    }                                                                                                     
    for (String columnName : properties.keySet()) {                                                       
        JSONObject columnAttr = (JSONObject) properties.get(columnName);                                  
        // default set json.                                                                              
        Type type = Type.JSON;                                                                            
        if (columnAttr.has("type")) {                                                                     
            type = convertType(columnAttr.get("type").toString());                                        
            if (arrayFields.contains(columnName)) {                                                       
                type = new ArrayType(type);                                                               
            }                                                                                             
        }                                                                                                 
        Column column = new Column(columnName, type, true);                                               
        columns.add(column);                                                                              
    }                                                                                                     
    return columns;                                                                                       
}                                                                                                         

The End

大家晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容