Overview #
如果一个表没有定义主键,它就是一个追加表。与主键表相比,它不具有直接接收更改日志的能力。它不能通过upsert直接更新数据。它只能接收来自追加数据的传入数据。
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
-- 'target-file-size' = '256 MB',
-- 'file.format' = 'parquet',
-- 'file.compression' = 'zstd',
-- 'file.compression.zstd-level' = '3'
);
典型应用场景下的批量写、批量读,类似于普通的Hive分区表,但与Hive表相比,可以带来:
- 对象存储(S3、OSS)友好
- 时间旅行和回滚
- 低成本的DELETE / UPDATE
- 流式插入自动合并小文件
- 像队列一样流式读写
- 借助排序和索引高性能查询
Streaming #
您可以通过Flink以一种非常灵活的方式流式写入Append表,或者通过Flink读取Append表,将其像队列一样使用。唯一的区别是它的延迟是以分钟为单位的。它的优点是成本非常低,并且能够向下推过滤器和投影。
Automatic small file merging #
在流写作业中,如果没有桶定义,writer就没有压缩,而是使用Compact Coordinator扫描小文件,然后将压缩任务传递给Compact Worker。在流模式下,如果你在flink中运行insert sql,拓扑将是这样的:

不用担心背压,压缩从不背压。
如果设置 write-only = true,则将在拓扑中删除 Compact Coordinator 和 Compact Worker.
自动压缩仅在Flink引擎流模式下支持。您还可以在paimon中通过flink动作启动一个压缩作业,并通过set write-only禁用所有其他压缩作业。
Streaming Query #
您可以流式Append表,并像使用消息队列一样使用它。与主键表一样,流式读取有两种选择:
- 默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的增量记录。
- 可以指定scan.mode or scan.snapshot-id 或 scan.timestamp-millis 或 scan.file-creation-time-millis ,只允许流式读增量。
与flink-kafka类似,默认情况下顺序是不保证的,如果你的数据有某种顺序要求,你也需要考虑定义一个桶键,参见 Bucketed Append
Query #
Data Skipping By Order #
默认情况下,Paimon记录清单文件中每个字段的最大值和最小值。
在查询中,根据查询的WHERE条件,根据manifest中的统计信息做文件过滤,如果过滤效果好,将查询本来分钟的查询加速到毫秒级完成执行。
通常数据分布并不总是有效的过滤,那么如果我们能在WHERE条件下按字段对数据进行排序呢?您可以查看Flink COMPACT Action 或 Flink COMPACT Procedure 或 Spark COMPACT Procedure.
Data Skipping By File Index #
你也可以使用文件索引,它在读取端通过索引过滤文件。
CREATE TABLE <PAIMON_TABLE> (<COLUMN> <COLUMN_TYPE> , ...) WITH (
'file-index.bloom-filter.columns' = 'c1,c2',
'file-index.bloom-filter.c1.items' = '200'
);
定义file-index.bloom-filter.columns,Paimon将为每个文件创建相应的索引文件。如果索引文件太小,它将直接存储在清单中,或者存储在数据文件的目录中。每个数据文件对应一个索引文件,索引文件有一个单独的文件定义,可以包含不同类型的索引和多个列。
数据文件索引是某个数据文件对应的外部索引文件。如果索引文件太小,它将直接存储在清单中,否则存储在数据文件的目录中。每个数据文件对应一个索引文件,索引文件有一个单独的文件定义,可以包含不同类型的索引和多个列。
不同的文件索引在不同的场景下可能是有效的。例如,在点查找场景中,bloom过滤器可以加快查询速度。使用位图可能会占用更多空间,但可以获得更高的准确性。
目前,文件索引仅支持在仅追加表中。
布隆过滤器:
- file-index.bloom-filter.columns: 指定需要布隆过滤器索引的列。
- file-index.bloom-filter.<column_name>.fpp 配置误报概率。
- file-index.bloom-filter.<column_name>.items 在一个数据文件中配置所期望的不同项。
位图:
file-index.bitmap.columns: 指定需要位图索引的列。
将支持更多的过滤器类型…
如果您想在现有表中添加文件索引,而不需要重写,您可以使用rewrite_file_index过程。在使用该过程之前,您应该在目标表中配置适当的配置。您可以使用ALTER子句配置file-index.<filter-type>.columns到表。
如何调用:参见 flink procedures
Update #
现在,只有Spark SQL支持DELETE和UPDATE,你可以看看Spark Write.
举例:
DELETE FROM my_table WHERE currency = 'UNKNOWN';
更新追加表有两种模式:
- COW (Copy on Write):搜索命中的文件,然后重写每个文件,从文件中删除需要删除的数据。这个操作代价很大。
- MOW(写合并):通过指定 'deletion-vectors.enabled' = 'true',则可以启用删除向量模式。只标记相应文件的某些记录进行删除,并写入删除文件,不重写整个文件。
Bucketed Append #
您可以定义bucket和bucket-key以获得bucket附加表。
创建分桶追加表的示例:
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
'bucket' = '8',
'bucket-key' = 'product_id'
);
Streaming #
一个普通的追加表对它的流写和读没有严格的顺序保证,但在某些情况下,你需要定义一个类似于Kafka的键。
同一bucket中的每条记录都是严格排序的,流式读取将按照写入的顺序将记录传输到下游。要使用这种模式,您不需要配置特殊配置,所有数据都将作为队列放入一个bucket中。

Compaction in Bucket #
默认情况下,汇聚节点将自动执行压缩以控制文件数量。以下选项控制压缩策略:
| Key | Default | Type | Description |
|---|---|---|---|
| write-only | false | Boolean | 如果设置为true,将跳过压缩和快照过期。此选项与专用紧凑作业一起使用。 |
| compaction.min.file-num | 5 | Integer | 对于文件集[f_0,…],f_N],满足sum(size(f_i)) >= targetFileSize触发追加表压缩的最小文件数。这个值避免了几乎整个文件被压缩,因为压缩是不划算的。 |
| compaction.max.file-num | 5 | Integer | 对于文件集[f_0,…],f_N],触发追加表压缩的最大文件数,即使sum(size(f_i)) < targetFileSize。此值可避免挂起过多的小文件,以免降低性能。 |
| full-compaction.delta-commits | (none) | Integer | 在增量提交之后,将不断触发完全压缩。 |
Streaming Read Order #
对于流式读取,记录按以下顺序生成:
- 对于来自两个不同分区的任意两条记录
- 如果scan.plan-sort-partition设置为true,将首先生成分区值较小的记录。
- 否则,将首先生成分区创建时间较早的记录。
- 对于来自同一分区和同一桶的任意两条记录,先写的记录将先被生产。
- 对于来自同一分区但不同桶的任意两条记录,不同桶由不同的任务处理,它们之间没有顺序保证。
Watermark Definition #
您可以为读取Paimon表定义水印:
CREATE TABLE t (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;
你也可以启用Flink Watermark alignment,这将确保没有源/分割/碎片/分区将其水印增加得太远。
| Key | Default | Type | Description |
|---|---|---|---|
| scan.watermark.alignment.group | (none) | String | 一组用于对齐水印的源。 |
| scan.watermark.alignment.max-drift | (none) | Duration | 在我们暂停从源/任务/分区消费之前,对齐水印的最大漂移。 |
Bounded Stream #
流源也可以有界,您可以指定‘ scan.bounded.watermark ’来定义有界流模式的结束条件,流读取将结束,直到遇到更大的水印快照。
快照中的水印是由writer生成的,例如可以指定一个kafka源,并声明水印的定义。当使用这个kafka源对Paimon表进行写操作时,Paimon表的快照会生成相应的水印,这样流式读取这个Paimon表时就可以使用有界水印的特性。
CREATE TABLE kafka_table (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);
-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;
-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
Batch #
在批量查询中,如果有必要,可以使用桶状表来避免shuffle,例如,您可以使用以下Spark SQL来读取Paimon表:
SET spark.sql.sources.v2.bucketing.enabled = true;
CREATE TABLE FACT_TABLE (order_id INT, f1 STRING) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');
CREATE TABLE DIM_TABLE (order_id INT, f2 STRING) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');
SELECT * FROM FACT_TABLE JOIN DIM_TABLE on t1.order_id = t4.order_id;
‘spark.sql.sources.v2.bucketing.enabled’配置用于启用V2数据源。打开后,Spark将通过V2数据源的SupportsReportPartitioning识别特定的分布式报告,并在必要时尝试避免shuffle。
如果两个表具有相同的存储桶策略和相同数量的存储桶,则可以避免代价高昂的联接shuffle。