Doris系列11-数据导入之Stream load

一. Stream load概述

Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

1.1 基本原理

下图展示了 Stream load 的主要流程,省略了一些导入细节。

                         ^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。

用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。

导入的最终结果由 Coordinator BE 返回给用户。

1.2 支持数据格式

目前 Stream Load 支持两个数据格式:CSV(文本) 和 JSON

1.3 基本操作

1.3.1 创建导入

Stream load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。

用户也可以通过其他 HTTP client 进行操作。

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

Header 中支持属性见下面的 ‘导入任务参数’ 说明 
格式为: -H "key1:value1"

示例:

curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load

1.3.1.1 签名参数

user/passwd
Stream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。

1.3.1.2 导入任务参数

Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。下面主要介绍了 Stream load 导入任务参数的部分参数意义。

  1. label
    导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
    label 的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once
    当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。

  2. column_separator
    用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
    如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。
    可以使用多个字符的组合作为列分隔符。

  3. line_delimiter
    用于指定导入文件中的换行符,默认为\n。
    可以使用做多个字符的组合作为换行符。

  4. max_filter_ratio
    导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。
    如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
    计算公式为:
    (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio
    dpp.abnorm.ALL 表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。
    dpp.norm.ALL 指的是导入过程中正确数据的条数。可以通过 SHOW LOAD 命令查询导入任务的正确数据量。
    原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL

  5. where
    导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。

  6. partition
    待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL

  7. columns
    待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。

列顺序变换例子:原始数据有三列(src_c1,src_c2,src_c3), 目前doris表也有三列(dst_c1,dst_c2,dst_c3)

如果原始表的src_c1列对应目标表dst_c1列,原始表的src_c2列对应目标表dst_c2列,原始表的src_c3列对应目标表dst_c3列,则写法如下:
columns: dst_c1, dst_c2, dst_c3

如果原始表的src_c1列对应目标表dst_c2列,原始表的src_c2列对应目标表dst_c3列,原始表的src_c3列对应目标表dst_c1列,则写法如下:
columns: dst_c2, dst_c3, dst_c1

表达式变换例子:原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下:
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
  1. exec_mem_limit
    导入内存限制。默认为 2GB,单位为字节。

  2. strict_mode
    Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 strict_mode=true 。默认的 strict mode 为关闭。
    strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
    9.1) 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
    9.2) 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
    9.3) 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。

  3. merge_type
    数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理

1.3.1.3 strict mode 与 source data 的导入关系

这里以列类型为 TinyInt 来举例:
(注:当表中的列允许导入空值时)


image.png

这里以列类型为 Decimal(1,0) 举例:
(注:当表中的列允许导入空值时)


image.png

注意:
10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。

1.3.2 返回结果

由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。

示例:

{
    "TxnId": 1003,
    "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
    "Status": "Success",
    "ExistingJobStatus": "FINISHED", // optional
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}

下面主要解释了 Stream load 导入结果参数:

  1. TxnId
    导入的事务ID。用户可不感知。

  2. Label
    导入 Label。由用户指定或系统自动生成。

  3. Status
    导入完成状态:
    3.1) "Success":表示导入成功。
    3.2) "Publish Timeout":该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
    3.3) "Label Already Exists":Label 重复,需更换 Label。
    3.4) "Fail":导入失败。

  4. ExistingJobStatus
    已存在的 Label 对应的导入作业的状态。
    这个字段只有在当 Status 为 "Label Already Exists" 时才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行,"FINISHED" 表示作业成功。

  5. Message
    导入错误信息。

  6. NumberTotalRows
    导入总处理的行数。

  7. NumberLoadedRows
    成功导入的行数。

  8. NumberFilteredRows
    数据质量不合格的行数。

  9. NumberUnselectedRows
    被 where 条件过滤的行数。

  10. LoadBytes
    导入的字节数。

  11. LoadTimeMs
    导入完成时间。单位毫秒。

  12. BeginTxnTimeMs
    向Fe请求开始一个事务所花费的时间,单位毫秒。

  13. StreamLoadPutTimeMs
    向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。

  14. ReadDataTimeMs
    读取数据所花费的时间,单位毫秒。

  15. WriteDataTimeMs
    执行写入数据操作所花费的时间,单位毫秒。

  16. CommitAndPublishTimeMs
    向Fe请求提交并且发布事务所花费的时间,单位毫秒。

  17. ErrorURL
    如果有数据质量问题,通过访问这个 URL 查看具体错误行。

1.3.3 取消导入

用户无法手动取消 Stream load,Stream load 在超时或者导入错误后会被系统自动取消。

1.4 相关系统配置

1.4.1 FE 配置

stream_load_default_timeout_second
导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。

默认的 timeout 时间为 600 秒。如果导入的源文件无法在规定时间内完成导入,用户可以在 stream load 请求中设置单独的超时时间。

或者调整 FE 的参数stream_load_default_timeout_second 来设置全局的默认超时时间。

1.4.2 BE 配置

streaming_load_max_mb
Stream load 的最大导入大小,默认为 10G,单位是 MB。如果用户的原始文件超过这个值,则需要调整 BE 的参数 streaming_load_max_mb。

二. 导入实例

示例1:
以 "table1_20211207" 为 Label,使用本地文件 table1_data 导入 table1 表。

-- FE_HOST 是任一 FE 所在节点 IP,8030 为 fe.conf 中的 http_port。
-- 可以使用任一 BE 的 IP,以及 be.conf 中的 webserver_port 进行导入。如:BE_HOST:8040
curl --location-trusted -u test:test_passwd -H "label:table1_20211207" -H "column_separator:," -T table1_data http://FE_HOST:8030/api/example_db/table1/_stream_load

本地文件 table1_data 以 , 作为数据之间的分隔,具体内容如下:

1,1,jim,2
2,1,grace,2
3,2,tom,2
4,3,bush,3
5,3,helen,3

最终导入命令:

curl --location-trusted -u doris_user:abc123 -H "label:table1_20211207" -H "column_separator:," -T /tmp/table1_data http://10.31.1.119:8030/api/example_db/table1/_stream_load
image.png

示例2:
以 "table2_20211207" 为 Label,使用本地文件 table2_data 导入 table2 表。

curl --location-trusted -u doris_user:abc123 -H "label:table2_20211207" -H "column_separator:|" -T /tmp/table2_data http://127.0.0.1:8030/api/example_db/table2/_stream_load

本地文件 table2_data 以 | 作为数据之间的分隔,具体内容如下:

2017-07-03|1|1|jim|2
2017-07-05|2|1|grace|2
2017-07-12|3|2|tom|2
2017-07-15|4|3|bush|3
2017-07-12|5|3|helen|3
image.png
image.png

参考:

  1. https://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容