概述 #
如果为表定义了主键,那么您就可以在该表中插入、更新或删除记录。
主键由一组包含每个记录唯一值的列组成。Paimon 通过在每个桶内对主键进行排序来强制数据排序,这使得用户能够通过在主键上应用筛选条件来实现高性能操作。请参阅 CREATE TABLE.
。
Bucket
未分区表或分区表中的分区会被进一步划分为桶,以提供额外的数据结构,从而能够用于更高效的查询操作。
每个桶目录都包含一个 LSM 树及其 变更日志文件。
桶的范围由记录中一个或多个列的哈希值所决定。用户可以通过提供 bucket-key 选项来指定分桶列。如果未指定 bucket-key 选项,则会使用主键(如果已定义)或完整记录作为桶键。
一个桶是用于读写操作的最小存储单元,因此桶的数量限制了最大并行处理能力。不过,这个数量不宜过大,因为过大则会导致大量小文件以及读取性能低下。一般来说,每个桶中推荐的数据大小约为 200MB 至 1GB。
另外,如果您希望在创建表之后调整桶的数量,请查看“重新调整桶大小”选项。
LSM Trees #
Paimon采用了 LSM 树(日志结构合并树)作为文件存储的数据结构。本文档简要介绍了有关 LSM 树的概念。
Sorted Runs #
LSM 树将文件组织成若干个有序的运行块。一个有序运行块由一个或多个数据文件组成,且每个数据文件恰好属于一个有序运行块。
数据文件内的记录是按照其主键进行排序的。在排序过程中,数据文件的主键范围不会出现重叠的情况。

如您所见,不同的排序运行可能会存在重叠的主键范围,并且甚至可能包含相同的主键。在查询 LSM 树时,所有排序运行都必须合并在一起,并且具有相同主键的所有记录都必须根据用户指定的合并引擎以及每条记录的时间戳进行合并。
写入 LSM 树的新记录首先会暂存在内存中进行缓冲。当内存缓冲区已满时,内存中的所有记录都会被排序并写入磁盘。此时会创建一个新的排序运行块。
数据分发 #
桶是用于读写操作的最小存储单元,每个桶目录都包含一个 LSM 树。
Fixed Bucket #
使用固定桶模式配置一个大于 0 的桶大小,并根据 Math.abs(key_hashcode % numBuckets) 计算记录所在的桶。
重新调整存储桶大小只能通过离线方式进行,详情请参阅“调整存储桶大小”。存储桶数量过多会导致文件数量过多,而数量过少则会导致写入性能不佳。
Dynamic Bucket #
主键表的默认模式,或配置'bucket' = '-1'.
最先到达的键将落入旧的桶中,新键将落入新桶中,桶和键的分布取决于数据到达的顺序。Paimon维护一个索引来确定哪个键对应于哪个bucket。
Paimon将自动扩展桶的数量。
- Option 1: 'dynamic-bucket.target-row-num': 控制一个bucket的目标行数。
- Option 2: 'dynamic-bucket.initial-buckets': 控制初始化桶的数量。
动态桶只支持单个写作业。请不要启动多个作业来写入同一个分区(这可能导致重复数据)。即使启用“只写”并启动专用的压缩作业,它也不会起作用。
标准动态捅模式 #
当您的更新没有跨分区(没有分区,或者主键包含所有分区字段)时,动态桶模式使用HASH索引来维护从键到桶的映射,它比固定桶模式需要更多的内存。
性能:
- 一般来说,没有性能损失,但会有一些额外的内存消耗,一个分区中的1亿个条目会多占用1 GB的内存,不再活动的分区不会占用内存。
- 对于更新率较低的表,建议使用该模式,以显著提高性能。
Normal Dynamic Bucket Mode 支持排序压缩以加快查询速度。 参见排序压缩.
跨分区Upsert动态桶模式 #
当需要跨分区upsert(主键不包含所有分区字段)时,Dynamic Bucket模式直接维护键到分区和桶的映射,使用本地磁盘,并在开始流写作业时通过读取表中所有现有键来初始化索引。不同的合并引擎有不同的行为:
- Deduplicate: 删除旧分区中的数据,并将新数据插入新分区。
- PartialUpdate & Aggregation: 将新数据插入旧分区。
- FirstRow: 如果有旧值,则忽略新数据
性能:对于具有大量数据的表,将会有很大的性能损失。而且,初始化需要很长时间。
如果您的upsert不依赖于太旧的数据,您可以考虑配置索引TTL来减少索引和初始化时间:
'cross-partition-upsert.index-ttl': 在rocksdb索引和初始化中的TTL,这样可以避免维护过多的索引而导致性能越来越差。
但请注意,这也可能导致数据重复。
选择分区字段 #
以下三种类型的字段可以定义为仓库中的分区字段:
- Creation Time (推荐): 创建时间通常是不可变的,因此可以放心地将其视为分区字段并将其添加到主键中。
- Event Time: 事件时间是原始表中的一个字段。对于CDC数据,例如从MySQL CDC同步的表或者由Paimon生成的changelog,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使声明主键包含分区字段,也可以达到唯一的效果(要求'changelog-producer'='input')。
- CDC op_ts: 它不能被定义为分区字段,不能知道以前的记录时间戳。所以你需要使用跨分区upsert,它会消耗更多的资源。
表模式 #

主键表的文件结构大致如图所示。表或分区包含多个桶,每个桶是一个单独的LSM树结构,其中包含多个文件。
LSM的写入过程大致如下:Flink检查点刷新L0文件,并根据需要触发压缩以合并数据。根据书写过程中处理方式的不同,有三种模式:
- MOR (Merge On Read): 默认模式,只执行较小的压缩,并且在读取时需要合并。
- COW (Copy On Write): 使用'full-compact .delta-commits' = '1',完全压缩将被同步,这意味着合并在写入时完成。
- MOW (Merge On Write): Using 'deletion-vectors.enabled' = 'true', 在写入阶段,查询LSM生成数据文件的删除向量文件,读取时直接过滤掉不需要的行。
对于一般的主键表,建议使用Merge On Write模式(Merge -engine是默认的重复数据删除)。
Merge On Read #
MOR是主键表的默认模式。

当模式为MOR时,需要合并所有文件进行读取,因为所有文件都是有序的,并且进行了多路合并,其中包括主键的比较计算。
这里有一个明显的问题,单个LSM树只能有一个线程进行读取,因此读取并行性是有限的。如果桶中的数据量太大,可能会导致读性能差。因此,为了提高读性能,建议分析查询需求表,将桶中的数据量设置在200MB到1GB之间。但如果桶太小,则会有大量的小文件读写,对文件系统造成压力。
另外,由于合并过程的原因,不能对非主键列进行基于Filter的数据跳转,否则会过滤掉新数据,导致旧数据不正确。
- 写性能:很好。
- 读性能:不太好。
Copy On Write #
ALTER TABLE orders SET ('full-compaction.delta-commits' = '1');
将full-compact .delta-commits设置为1,这意味着每次写都将被完全合并,并且所有数据都将合并到最高级别。读取时,此时不需要合并,读取性能最高。但是每次写入都需要完全合并,并且写入放大非常严重。

- 写性能:很差。
- 阅读性能:非常好。
Merge On Write #
ALTER TABLE orders SET ('deletion-vectors.enabled' = 'true');
由于Paimon的LSM结构,它具有按主键查询的能力。我们可以在写入时生成删除向量文件,表示文件中的哪些数据已被删除。这直接过滤掉读取过程中不需要的行,相当于合并,不影响读取性能。

一个简单的例子:

通过先删除旧记录然后添加新记录来更新数据。
- 写性能:良好。
- 读性能:良好。
可见性保证:表在删除向量模式下,级别为0的文件只有在压缩后才可见。因此,默认情况下,压缩是同步的,如果打开异步,则可能存在数据延迟。
MOR Read Optimized #
如果你不想使用删除向量模式,你想查询足够快的MOR模式,但只能找到旧的数据,你也可以:
- 写数据的时候配置'compaction.optimization-interval'
- 查询 read-optimized system table.从优化文件的结果中读取可以避免合并具有相同键的记录,从而提高读取性能。
您可以在读取时灵活地平衡查询性能和数据延迟。
Merge Engine #
Overview #
当Paimon sink接收到具有相同主键的两条或多条记录时,它会将它们合并到一条记录中,以保持主键的唯一性。通过指定merge-engine表属性,用户可以选择如何将记录合并在一起。
在Flink SQL TableConfig中总是将`table.exec.sink.upsert-materialize`设置为NONE, upsert-materialize可能会导致奇怪的行为。当输入是无序的,我们建议您使用[Sequence Field](https://paimon.apache.org/docs/0.9/primary-key-table/sequence-rowkind/#sequence-field) 来纠正混乱。
Deduplicate #
重复数据删除合并引擎是默认的合并引擎。Paimon将只保留最新的记录,并丢弃具有相同主键的其他记录。
具体来说,如果最新的记录是一条DELETE记录,那么所有具有相同主键的记录都将被删除。可以通过配置ignore-delete来忽略它。
Partial Update #
通过指定'merge-engine' = 'partial-update',用户可以通过多次更新来更新记录的列,直到记录完成。这是通过使用相同主键下的最新数据逐个更新值字段来实现的。但是,在此过程中不会覆盖空值。
例如,假设Paimon接收到三条记录:
- <1, 23.0, 10, NULL>-
- <1, NULL, NULL, 'This is a book'>
- <1, 25.2, NULL, NULL>
假设第一列是主键,那么最终结果将是< 1,25.2,10,'This is a book'>。
对于流查询,部分更新合并引擎必须与lookup 或 full-compaction changelog producer
一起使用。(' input ' changelog生产者也被支持,但只返回输入记录。)
默认情况下,部分更新不能接受删除记录,您可以选择以下解决方案之一:
- 配置‘ ignore-delete ’来忽略删除记录。
- 配置的局部迭代。remove-record-on-delete '用于在接收删除记录时删除整行。
- 配置“sequence-group”来收回部分列。
Sequence Group #
序列字段可能无法解决具有多个流更新的部分更新表的无序问题,因为在多流更新期间,序列字段可能被另一个流的最新数据覆盖。
因此,我们引入了部分更新表的序列组机制。它可以解决:
- 多流更新过程中的混乱。每个流定义自己的序列组。
- 真正的部分更新,而不仅仅是非空更新。
看例子:
CREATE TABLE t
(
k INT,
a INT,
b INT,
g_1 INT,
c INT,
d INT,
g_2 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.g_1.sequence-group' = 'a,b',
'fields.g_2.sequence-group' = 'c,d'
);
INSERT INTO t
VALUES (1, 1, 1, 1, 1, 1, 1);
-- g_2 is null, c, d should not be updated
INSERT INTO t
VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));
SELECT *
FROM t;
-- output 1, 2, 2, 2, 1, 1, 1
-- g_1 is smaller, a, b should not be updated
INSERT INTO t
VALUES (1, 3, 3, 1, 3, 3, 3);
SELECT *
FROM t; -- output 1, 2, 2, 2, 3, 3, 3
对于fields.<field-name>.sequence-group,有效可比较的数据类型包括:DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.
也可以在sequence-group中配置多个排序字段,如字段<field-name1>,<field-name2>.sequence-group,,多个字段将按顺序进行比较。
看例子:
CREATE TABLE SG
(
k INT,
a INT,
b INT,
g_1 INT,
c INT,
d INT,
g_2 INT,
g_3 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.g_1.sequence-group' = 'a,b',
'fields.g_2,g_3.sequence-group' = 'c,d'
);
INSERT INTO SG
VALUES (1, 1, 1, 1, 1, 1, 1, 1);
-- g_2, g_3 should not be updated
INSERT INTO SG
VALUES (1, 2, 2, 2, 2, 2, 1, CAST(NULL AS INT));
SELECT *
FROM SG;
-- output 1, 2, 2, 2, 1, 1, 1, 1
-- g_1 should not be updated
INSERT INTO SG
VALUES (1, 3, 3, 1, 3, 3, 3, 1);
SELECT *
FROM SG;
-- output 1, 2, 2, 2, 3, 3, 3, 1
Aggregation For Partial Update #
您可以为输入字段指定聚合函数,aggregation中的所有函数都支持。
看例子:
CREATE TABLE t
(
k INT,
a INT,
b INT,
c INT,
d INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.a.sequence-group' = 'b',
'fields.b.aggregate-function' = 'first_value',
'fields.c.sequence-group' = 'd',
'fields.d.aggregate-function' = 'sum'
);
INSERT INTO t
VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO t
VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
INSERT INTO t
VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO t
VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
SELECT *
FROM t; -- output 1, 2, 1, 2, 3
还可以为多个排序字段中的sequence-group配置聚合函数。
看例子:
CREATE TABLE AGG
(
k INT,
a INT,
b INT,
g_1 INT,
c VARCHAR,
g_2 INT,
g_3 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.a.aggregate-function' = 'sum',
'fields.g_1,g_3.sequence-group' = 'a',
'fields.g_2.sequence-group' = 'c');
-- a in sequence-group g_1, g_3 with sum agg
-- b not in sequence-group
-- c in sequence-group g_2 without agg
INSERT INTO AGG
VALUES (1, 1, 1, 1, '1', 1, 1);
-- g_2 should not be updated
INSERT INTO AGG
VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT), 2);
SELECT *
FROM AGG;
-- output 1, 3, 2, 2, "1", 1, 2
-- g_1, g_3 should not be updated
INSERT INTO AGG
VALUES (1, 3, 3, 2, '3', 3, 1);
SELECT *
FROM AGG;
-- output 1, 6, 3, 2, "3", 3, 2
你可以使用fields.default-aggregate-function为所有输入字段指定一个默认的聚合函数,示例如下:
CREATE TABLE t
(
k INT,
a INT,
b INT,
c INT,
d INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.a.sequence-group' = 'b',
'fields.c.sequence-group' = 'd',
'fields.default-aggregate-function' = 'last_non_null_value',
'fields.d.aggregate-function' = 'sum'
);
INSERT INTO t
VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO t
VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
INSERT INTO t
VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO t
VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
SELECT *
FROM t; -- output 1, 2, 2, 2, 3
Aggregation #
NOTE: Always set table.exec.sink.upsert-materialize to NONE in Flink SQL TableConfig.
有时用户只关心聚合的结果。聚合合并引擎根据聚合函数,在相同的主键下,将具有最新数据的每个值字段逐一聚合。
每个不属于主键的字段都可以被赋予一个聚合函数,由fields.<field-name>.aggregate-function 表属性指定,否则将默认使用last_non_null_value聚合。例如,考虑下面的表定义。
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);
字段price将通过max函数进行汇总,字段sales将通过sum函数进行汇总。给定两个输入记录< 1,23,15 >和< 1,30.2,20>,最终结果将是< 1,30.2,35>。
Aggregation Functions #
当前支持的聚合函数和数据类型有:
sum #
sum函数将跨多行的值聚合起来。它支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE数据类型。
product #
product函数可以跨多个行计算乘积值。它支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE数据类型。
count #
在需要对匹配特定条件的行进行计数的场景中,可以使用SUM函数来实现这一点。通过将条件表示为布尔值(TRUE或FALSE)并将其转换为数值,可以有效地计算行数。在这种方法中,TRUE被转换为1,FALSE被转换为0。
例如,如果您有一个表orders,并且想要计算满足特定条件的行数,您可以使用以下查询:
SELECT SUM(CASE WHEN condition THEN 1 ELSE 0 END) AS count
FROM orders;
max #
max函数识别并保留最大值。它支持CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ数据类型。
min #
min函数识别并保留最小值。它支持CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ数据类型。
last_value #
last_value函数用最近导入的值替换之前的值。它支持所有数据类型。
last_non_null_value #
last_non_null_value函数用最近的非空值替换之前的值。它支持所有数据类型。
listagg #
listagg函数将多个字符串值连接成一个字符串。它支持STRING数据类型。每个不属于主键的字段都可以指定一个由字段指定的列表聚合分隔符。List-agg-delimiter表属性,否则它将使用“,”作为默认值。
略......
Retraction #
只有sum、product、collect、merge_map、nested_update、last_value和last_non_null_value支持回缩(UPDATE_BEFORE和DELETE),其他聚合函数不支持回缩。如果您允许某些函数忽略撤回消息,您可以配置:'fields.${field_name}.ignore- withdraw '='true'。
last_value和last_non_null_value只是在接受撤回消息时将字段设置为空。
collect和merge_map尽最大努力处理撤回消息,但不能保证结果是准确的。在处理撤回消息时可能会发生以下行为:
- 如果记录是无序的,它可能无法处理撤回消息。例如,表使用collect,上游分别发送+I['A', 'B']和-U['A']。如果表先收到-U['A'],它就什么也做不了;然后它接收到+I['A', 'B'],合并结果将是+I['A', 'B']而不是+I['B']。
- 来自一个上游的收回消息将收回来自多个上游合并的结果。例如,表使用merge_map,一个上游发送+I[1->A],另一个上游发送+I[1->B],之后发送- d [1->B]。表首先将两个插入值合并为+I[1- b> B],然后- d [1->B]将收回整个结果,因此最终结果是一个空映射而不是+I[1->A]
First Row #
通过指定'merge-engine' = 'first-row',用户可以保留相同主键的第一行。它与重复数据删除合并引擎的不同之处在于,在第一行合并引擎中,它将生成只插入的更改日志。
- 您不能指定sequence.field。
- 不接受DELETE和UPDATE_BEFORE消息。通过配置ignore-delete,可以忽略这两种记录。
- 可见性保证:具有First Row引擎的表,级别为0的文件只有在压缩后才可见。因此,默认情况下,压缩是同步的,如果打开异步,则可能存在数据延迟。
这对于取代流计算中的日志重复数据删除有很大的帮助。
Changelog Producer #
流写可以持续不断地为流读产生最新的变化。
通过在创建表时指定changelog-producer表属性,用户可以选择从表文件生成的更改模式。
Changelog-producer可能会显著降低压缩性能,除非必要,否则请不要启用它。
None #
默认值,不会向表的写入器应用额外的变更日志生成器。Paimon源只能看到快照之间合并的更改,例如删除了哪些键以及某些键的新值是什么。
然而,这些合并的更改不能形成一个完整的更改日志,因为我们不能直接从中读取键的旧值。合并后的更改要求使用者“记住”每个键的值,并在不看到旧值的情况下重写这些值。然而,一些消费者需要旧的值来确保正确性或效率。
考虑一个消费者,它计算一些分组键的总和(可能不等于主键)。如果消费者只看到一个新值5,它就无法确定应该向求和结果添加哪些值。例如,如果旧值是4,它应该在结果上加1。但是如果旧的值是6,它应该反过来从结果中减去1。对于这类消费者来说,旧的价值观很重要。
综上所述,无更改日志生成器最适合于数据库系统这样的消费者。Flink还有一个内置的“normalize”操作符,用于保存状态中每个键的值。很容易看出,这种操作成本非常高,应该避免使用。(你可以通过‘scan.remove-normalize’强制删除“ normalize ”操作符。)

Input #
通过指定' changlog -producer' = 'input', Paimon编写者依赖于他们的输入作为完整更改日志的来源。所有输入记录将保存在单独的变更日志文件中,并将由Paimon源提供给消费者。
当Paimon编写器的输入是完整的更改日志(例如来自数据库CDC)或由Flink状态计算生成时,可以使用input changelog producer。

Lookup #
如果您的输入不能生成完整的更改日志,但您仍然希望摆脱昂贵的规范化操作符,则可以考虑使用‘lookup’更改日志生成器。
通过指定' changlog -producer' = ‘lookup‘, Paimon将在提交数据写入之前通过’lookup’生成更改日志。

查找将缓存内存和本地磁盘上的数据,您可以使用以下选项来调优性能:
| Option | Default | Type | Description |
|---|---|---|---|
| lookup.cache-file-retention | 1 h | Duration | 缓存文件的查找保留时间。文件过期后,如果需要访问,将从DFS重新读取它,以便在本地磁盘上建立索引。 |
| lookup.cache-max-disk-size | unlimited | MemorySize | 查找缓存的最大磁盘大小,您可以使用此选项来限制本地磁盘的使用。 |
| lookup.cache-max-memory-size | 256 mb | MemorySize | 查找缓存的最大内存大小。 |
lookup changelog-producer支持changelog-producer.row-deduplicate以避免为同一条记录生成-U, +U更改日志。
(注意:请增加'execution.checkpointing.max-concurrent-checkpoints'的Flink配置(这对性能非常重要)。
Full Compaction #
如果你认为“查找”的资源消耗太大,你可以考虑使用“全压缩”变更日志生成器,它可以将数据写入和变更日志生成分离,并且更适合高延迟的场景(例如,10分钟)。
通过指定' changlog -producer' = 'full-compaction', Paimon将比较完全压缩的结果,并将差异生成为changelog。更改日志的延迟受到完全压缩频率的影响。
通过指定full-compact .delta-commits表属性,将在delta提交(检查点)之后不断触发完全压缩。默认设置为1,因此每个检查点都有一个完整的压缩并生成一个更改日志。

完全压缩变更日志生成器可以为任何类型的源生成完整的变更日志。但是,它不如输入更改日志生成器高效,并且生成更改日志的延迟可能很高。
全压缩changelog-producer支持changelog-producer.row-deduplicate以避免为同一条记录生成-U, +U更改日志。
(注意:请增加execution.checkpointing.max-concurrent-checkpoints的Flink配置(这对性能非常重要)。
Sequence and Rowkind #
在创建表时,您可以通过指定字段'sequence.field'来确定更新的顺序,或者您可以指定'rowkind.field'来确定更改日志记录的类型。
Sequence Field #
默认情况下,主键表根据输入顺序确定合并顺序(最后一个输入记录将是最后一个合并)。然而,在分布式计算中,会出现一些导致数据混乱的情况。此时,您可以使用时间字段作为sequence.field,例如:
CREATE TABLE my_table (
pk BIGINT PRIMARY KEY NOT ENFORCED,
v1 DOUBLE,
v2 BIGINT,
update_time TIMESTAMP
) WITH (
'sequence.field' = 'update_time'
);
sequence.field 最大记录值将是最后一个要合并的,如果值相同,将使用输入顺序来确定哪个是最后一个。sequence.field 支持所有数据类型的字段。
您可以为sequence.field定义多个字段,例如‘update_time,flag’,多个字段将按顺序进行比较。
用户定义的序列字段与first_row和first_value等特性冲突,这可能会导致意想不到的结果。
Row Kind Field #
默认情况下,主键表根据输入行确定行类型。您还可以定义“rowkind.field”来使用字段提取行类型。
有效的行类型字符串应该是‘+I’、‘-U’、‘+U’或‘-D’。
Compaction #
当越来越多的记录写入LSM树时,排序的runs将会增加。因为查询LSM树需要将所有排序的runs组合在一起,太多的排序runs将导致查询性能差,甚至导致内存不足。
为了限制排序runs,我们必须偶尔将几个排序runs合并为一个大的排序runs。这个过程称为压缩。
然而,压缩是一个资源密集型过程,它会消耗一定的CPU时间和磁盘IO,因此过于频繁的压缩可能会导致写速度变慢。这是查询性能和写性能之间的权衡。目前,Paimon采用了一种与Rocksdb的universal compaction类似的压实策略。
压缩解决:
- 减少0级文件,避免查询性能下降。
- 通过changelog-producer生成变更日志。
- 生成MOW 模式的删除向量。
- 快照过期、标签过期、分区过期。
限制:
- 同一个分区的压缩只能有一个作业,否则会导致冲突,一方会抛出异常失败。
写入性能几乎总是受到压缩的影响,因此其调优至关重要。
Asynchronous Compaction #
压缩本质上是异步的,但是如果您希望它是完全异步的,并且不阻塞写入,期望一个模式具有最大的写入吞吐量,压缩可以缓慢地完成,而不是匆忙地完成。您可以对您的表使用以下策略:
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
lookup-wait = false
这种配置可以在写高峰时生成更多的文件,在写低时逐渐融合成最佳的读性能。
Dedicated compaction job #
通常,如果您希望将多个作业写入同一个表,则需要分离压缩。您可以使用专用的压缩作业.
Record-Level expire #
在压缩中,您可以配置记录级别过期时间来过期记录,您应该配置:
- 'record-level.expire-time': 记录保留数据
- 'record-level.time-field': 记录级别过期的时间字段
- 'record-level.time-field-type': 记录级别过期的时间字段类型,可以是int类型的秒或long类型的毫秒。
过期是在压缩过程中发生的,并且不能保证及时过期记录。
Full Compaction #
Paimon压缩使用通用压缩。默认情况下,当增量数据过多时,将自动执行全压缩。你通常不用担心这个。
Paimon还提供了一个允许定期执行全压缩的配置。
- ‘compaction.optimization-interval’:该配置表示执行优化完全压缩的频率,用于确保读优化系统表的查询时效性。
- ‘full-compaction.delta-commits’:在增量提交之后,将不断触发完全压缩。它的缺点是只能同步执行压缩,这会影响写入效率。
Compaction Options #
Number of Sorted Runs to Pause Writing #
当排序runs很少时,Paimon写入器将在分开的线程中异步执行压缩,因此可以连续地将记录写入表中。然而,为了避免排序runs的无界增长,写入器将在排序runs的数量达到阈值时暂停写入。下表属性确定阈值。
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| num-sorted-run.stop-trigger | No | (none) | Integer | 触发写入停止的排序运行次数,默认值为'num-sorted-run.compact -trigger' + 3。 |
当num-sorted-run.stop-trigger变大时,写入停顿的频率会降低,从而提高写入性能。但是,如果这个值变得太大,在查询该表时将需要更多的内存和CPU时间。如果您担心OOM问题,请配置以下选项。它的值取决于你的内存大小。
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| sort-spill-threshold | No | (none) | Integer | 如果排序读取器的最大数量超过此值,将尝试溢出。这可以防止过多的读取器消耗过多的内存并导致OOM。 |
Number of Sorted Runs to Trigger Compaction #
Paimon使用支持大量更新的LSM tree。LSM在多个排序runs中组织文件。在从LSM树查询记录时,必须将所有排序的runs组合起来,以生成所有记录的完整视图。
可以很容易地看到,太多的排序runs将导致查询性能低下。为了将排序的runs数量保持在合理的范围内,Paimon编写器将自动执行压缩。下表属性确定触发压缩的最小排序运行数。
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| num-sorted-run.compaction-trigger | No | 5 | Integer | 触发压缩的排序runs数。包括0级文件(一个文件一个排序runs)和高级运行(一个第一级一个排序runs)。 |
当num-sorted-run.compact -trigger变大时,压缩将变得不那么频繁,从而提高写入性能。但是,如果这个值变得太大,在查询该表时将需要更多的内存和CPU时间。这是编写性能和查询性能之间的一种权衡。