一. Broker Load 概述
Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。
用户需要通过 MySQL 协议 创建 Broker load 导入,并通过查看导入命令检查导入结果。
适用场景:
源数据在 Broker 可以访问的存储系统中,如 HDFS。
数据量在 几十到百GB 级别。
名词解释:
- Frontend(FE):Doris 系统的元数据和调度节点。在导入流程中主要负责导入 plan 生成和导入任务的调度工作。
- Backend(BE):Doris 系统的计算和存储节点。在导入流程中主要负责数据的 ETL 和存储。
- Broker:Broker 为一个独立的无状态进程。封装了文件系统接口,提供 Doris 读取远端存储系统中文件的能力。
- Plan:导入执行计划,BE 会执行导入执行计划将数据导入到 Doris 系统中。
基本原理:
用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。
BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。
+
| 1. user create broker load
v
+----+----+
| |
| FE |
| |
+----+----+
|
| 2. BE etl and load the data
+--------------------------+
| | |
+---v---+ +--v----+ +---v---+
| | | | | |
| BE | | BE | | BE |
| | | | | |
+---+-^-+ +---+-^-+ +--+-^--+
| | | | | |
| | | | | | 3. pull data from broker
+---v-+-+ +---v-+-+ +--v-+--+
| | | | | |
|Broker | |Broker | |Broker |
| | | | | |
+---+-^-+ +---+-^-+ +---+-^-+
| | | | | |
+---v-+-----------v-+----------v-+-+
| HDFS/BOS/AFS cluster |
| |
+----------------------------------+
二. 基本操作
2.1 创建导入
语法:
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
* data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
[PRECEDING FILTER predicate]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
* broker_properties:
(key1=value1, ...)
示例:
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
),
DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
where col1 > 1
)
WITH BROKER 'broker'
(
"username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600"
);
创建导入的详细语法执行 HELP BROKER LOAD 查看语法帮助。这里主要介绍 Broker load 的创建导入语法中参数意义和注意事项。
2.1.1 Label
导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 Label。Label 是用户在导入命令中自定义的名称。通过这个 Label,用户可以查看对应导入任务的执行情况。
Label 的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once 语义
当 Label 对应的导入作业状态为 CANCELLED 时,可以再次使用该 Label 提交导入作业。
2.1.2 数据描述类参数
数据描述类参数主要指的是 Broker load 创建导入语句中的属于 data_desc 部分的参数。每组 data_desc 主要表述了本次导入涉及到的数据源地址,ETL 函数,目标表及分区等信息。
下面主要对数据描述类的部分参数详细解释:
多表导入
Broker load 支持一次导入任务涉及多张表,每个 Broker load 导入任务可在多个 data_desc 声明多张表来实现多表导入。每个单独的 data_desc 还可以指定属于该表的数据源地址。Broker load 保证了单次导入的多张表之间原子性成功或失败。negative
data_desc中还可以设置数据取反导入。这个功能主要用于,当数据表中聚合列的类型都为 SUM 类型时。如果希望撤销某一批导入的数据。则可以通过 negative 参数导入同一批数据。Doris 会自动为这一批数据在聚合列上数据取反,以达到消除同一批数据的功能。partition
在 data_desc 中可以指定待导入表的 partition 信息,如果待导入数据不属于指定的 partition 则不会被导入。同时,不在指定 Partition 的数据会被认为是错误数据。set column mapping
在 data_desc 中的 SET 语句负责设置列函数变换,这里的列函数变换支持所有查询的等值表达式变换。如果原始数据的列和表中的列不一一对应,就需要用到这个属性。preceding filter predicate
用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。where predicate
在 data_desc 中的 WHERE 语句中负责过滤已经完成 transform 的数据,被 filter 的数据不会进入容忍率的统计中。如果多个 data_desc 中声明了同一张表的多个条件的话,则会 merge 同一张表的多个条件,merge 策略是 AND 。
2.1.3 导入作业参数
导入作业参数主要指的是 Broker load 创建导入语句中的属于 opt_properties部分的参数。导入作业参数是作用于整个导入作业的。
下面主要对导入作业参数的部分参数详细解释:
timeout
导入作业的超时时间(以秒为单位),用户可以在 opt_properties 中自行设置每个导入的超时时间。导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。Broker load 的默认导入超时时间为4小时。
通常情况下,用户不需要手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。
如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
计算公式为:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )
dpp.abnorm.ALL 表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。
dpp.norm.ALL 指的是导入过程中正确数据的条数。可以通过 SHOW LOAD 命令查询导入任务的正确数据量。
原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALLexec_mem_limit
导入内存限制。默认是 2GB。单位为字节。strict_mode
Broker load 导入可以开启 strict mode 模式。开启方式为 properties ("strict_mode" = "true") 。默认的 strict mode 为关闭。
strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
4.1) 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
4.2) 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
4.3) 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。merge_type
数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理。
2.1.4 strict mode 与 source data 的导入关系
这里以列类型为 TinyInt 来举例:
这里以列类型为 Decimal(1,0) 举例:
2.2 查看导入
Broker load 导入方式由于是异步的,所以用户必须将创建导入的 Label 记录,并且在查看导入命令中使用 Label 来查看导入结果。查看导入命令在所有导入方式中是通用的,具体语法可执行 HELP SHOW LOAD 查看。
示例:
mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
下面主要介绍了查看导入命令返回结果集中参数意义:
JobId
导入任务的唯一ID,每个导入任务的 JobId 都不同,由系统自动生成。与 Label 不同的是,JobId永远不会相同,而 Label 则可以在导入任务失败后被复用。Label
导入任务的标识。State
导入任务当前所处的阶段。在 Broker load 导入过程中主要会出现 PENDING 和 LOADING 这两个导入中的状态。如果 Broker load 处于 PENDING 状态,则说明当前导入任务正在等待被执行;LOADING 状态则表示正在执行中。
导入任务的最终阶段有两个:CANCELLED 和 FINISHED,当 Load job 处于这两个阶段时,导入完成。其中 CANCELLED 为导入失败,FINISHED 为导入成功。Progress
导入任务的进度描述。分为两种进度:ETL 和 LOAD,对应了导入流程的两个阶段 ETL 和 LOADING。目前 Broker load 由于只有 LOADING 阶段,所以 ETL 则会永远显示为 N/A
LOAD 的进度范围为:0~100%。
LOAD 进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%
如果所有导入表均完成导入,此时 LOAD 的进度为 99% 导入进入到最后生效阶段,整个导入完成后,LOAD 的进度才会改为 100%。
导入进度并不是线性的。所以如果一段时间内进度没有变化,并不代表导入没有在执行。Type
导入任务的类型。Broker load 的 type 取值只有 BROKER。EtlInfo
主要显示了导入的数据量指标 unselected.rows , dpp.norm.ALL 和 dpp.abnorm.ALL。用户可以根据第一个数值判断 where 条件过滤了多少行,后两个指标验证当前导入任务的错误率是否超过 max_filter_ratio。
三个指标之和就是原始数据量的总行数。TaskInfo
主要显示了当前导入任务参数,也就是创建 Broker load 导入任务时用户指定的导入任务参数,包括:cluster,timeout 和max_filter_ratio。ErrorMsg
在导入任务状态为CANCELLED,会显示失败的原因,显示分两部分:type 和 msg,如果导入任务成功则显示 N/A。
type的取值意义:
8.1) USER_CANCEL: 用户取消的任务
8.2) ETL_RUN_FAIL:在ETL阶段失败的导入任务
8.3) ETL_QUALITY_UNSATISFIED:数据质量不合格,也就是错误数据率超过了 max_filter_ratio
8.4) LOAD_RUN_FAIL:在LOADING阶段失败的导入任务
8.5) TIMEOUT:导入任务没在超时时间内完成
8.6) UNKNOWN:未知的导入错误CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime
这几个值分别代表导入创建的时间,ETL阶段开始的时间,ETL阶段完成的时间,Loading阶段开始的时间和整个导入任务完成的时间。
Broker load 导入由于没有 ETL 阶段,所以其 EtlStartTime, EtlFinishTime, LoadStartTime 被设置为同一个值。
导入任务长时间停留在 CreateTime,而 LoadStartTime 为 N/A 则说明目前导入任务堆积严重。用户可减少导入提交的频率。
LoadFinishTime - CreateTime = 整个导入任务所消耗时间
LoadFinishTime - LoadStartTime = 整个 Broker load 导入任务执行时间 = 整个导入任务所消耗时间 - 导入任务等待的时间
URL
导入任务的错误数据样例,访问 URL 地址既可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL 字段则为 N/A。JobDetails
显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的 BE 节点 Id,未完成的 BE 节点 Id。
其中已处理的原始行数,每 5 秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以 EtlInfo 中显示的为准。
2.3 取消导入
当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 HELP CANCEL LOAD查看。
mysql> HELP CANCEL LOAD;
Name: 'CANCEL LOAD'
Description:
该语句用于撤销指定 load label 的批次的导入作业。
这是一个异步操作,任务提交成功则返回。执行后可使用 SHOW LOAD 命令查看进度。
语法:
CANCEL LOAD
[FROM db_name]
WHERE LABEL = "load_label";
Examples:
1. 撤销数据库 example_db 上, label 为 example_db_test_load_label 的导入作业
CANCEL LOAD
FROM example_db
WHERE LABEL = "example_db_test_load_label";
mysql>
三. 相关系统配置
3.1 FE 配置
下面几个配置属于 Broker load 的系统级别配置,也就是作用于所有 Broker load 导入任务的配置。主要通过修改 fe.conf来调整配置值。
min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency
前两个配置限制了单个 BE 处理的数据量的最小和最大值。第三个配置限制了一个作业的最大的导入并发数。最小处理的数据量,最大并发数,源文件的大小和当前集群 BE 的个数 共同决定了本次导入的并发数。
本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)
本次导入单个BE的处理量 = 源文件大小/本次导入的并发数
通常一个导入作业支持的最大数据量为 max_bytes_per_broker_scanner * BE 节点数。如果需要导入更大数据量,则需要适当调整 max_bytes_per_broker_scanner 参数的大小。
默认配置:
参数名:min_bytes_per_broker_scanner, 默认 64MB,单位bytes。
参数名:max_broker_concurrency, 默认 10。
参数名:max_bytes_per_broker_scanner,默认 3G,单位bytes。
四. 案例
4.1 导入Hive orc格式表
doris端创建表
CREATE TABLE broker_test1
(
id bigint,
sale_date varchar(100),
prod_name varchar(32),
sale_nums BIGINT SUM DEFAULT '0'
) ENGINE=OLAP
AGGREGATE KEY(id,sale_date,prod_name)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "3");
准备load 命令
LOAD LABEL broker_test1_20211215_1
(
DATA INFILE("hdfs://10.31.1.123:8020/user/hive/warehouse/test.db/ods_fact_sale_orc/*")
INTO TABLE broker_test1
FORMAT AS 'orc'
(id,sale_date,prod_name,sale_nums)
SET
(
id=id,
sale_date=sale_date,
prod_name=prod_name,
sale_nums=sale_nums
)
)
WITH BROKER broker_name
(
"username"="doris_user",
"password"="abc123"
)
PROPERTIES
(
"timeout"="3600",
"max_filter_ratio"="1"
);
查看导入的进度
SHOW LOAD WHERE LABEL = "broker_test1_20211215_1"\G
等待执行完成:
4.2 导入Hive text格式表
doris端创建表
CREATE TABLE broker_test2
(
id bigint,
sale_date varchar(100),
prod_name varchar(32),
sale_nums BIGINT SUM DEFAULT '0'
) ENGINE=OLAP
AGGREGATE KEY(id,sale_date,prod_name)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "3");
准备load 命令
LOAD LABEL broker_test2_20211215_4
(
DATA INFILE("hdfs://10.31.1.123:8020/user/hive/warehouse/ods_fact_sale/*")
INTO TABLE broker_test2
COLUMNS TERMINATED BY "\\x01"
(id,sale_date,prod_name,sale_nums)
SET
(
id=id,
sale_date=sale_date,
prod_name=prod_name,
sale_nums=sale_nums
)
)
WITH BROKER broker_name
(
"username"="doris_user",
"password"="abc123"
)
PROPERTIES
(
"timeout"="3600",
"max_filter_ratio"="1"
);
查看导入的进度
SHOW LOAD WHERE LABEL = "broker_test2_20211215_4"\G