非主键表

Overview #

如果一个表没有定义主键,它就是一个追加表。与主键表相比,它不具有直接接收更改日志的能力。它不能通过upsert直接更新数据。它只能接收来自追加数据的传入数据。

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    -- 'target-file-size' = '256 MB',
    -- 'file.format' = 'parquet',
    -- 'file.compression' = 'zstd',
    -- 'file.compression.zstd-level' = '3'
);

典型应用场景下的批量写、批量读,类似于普通的Hive分区表,但与Hive表相比,可以带来:

  1. 对象存储(S3、OSS)友好
  2. 时间旅行和回滚
  3. 低成本的DELETE / UPDATE
  4. 流式插入自动合并小文件
  5. 像队列一样流式读写
  6. 借助排序和索引高性能查询

Streaming #

您可以通过Flink以一种非常灵活的方式流式写入Append表,或者通过Flink读取Append表,将其像队列一样使用。唯一的区别是它的延迟是以分钟为单位的。它的优点是成本非常低,并且能够向下推过滤器和投影。

Automatic small file merging #

在流写作业中,如果没有桶定义,writer就没有压缩,而是使用Compact Coordinator扫描小文件,然后将压缩任务传递给Compact Worker。在流模式下,如果你在flink中运行insert sql,拓扑将是这样的:


image.png

不用担心背压,压缩从不背压。

如果设置 write-only = true,则将在拓扑中删除 Compact Coordinator 和 Compact Worker.

自动压缩仅在Flink引擎流模式下支持。您还可以在paimon中通过flink动作启动一个压缩作业,并通过set write-only禁用所有其他压缩作业。

Streaming Query #

您可以流式Append表,并像使用消息队列一样使用它。与主键表一样,流式读取有两种选择:

  1. 默认情况下,流式读取在第一次启动时生成表上的最新快照,并继续读取最新的增量记录。
  2. 可以指定scan.mode or scan.snapshot-id 或 scan.timestamp-millis 或 scan.file-creation-time-millis ,只允许流式读增量。

与flink-kafka类似,默认情况下顺序是不保证的,如果你的数据有某种顺序要求,你也需要考虑定义一个桶键,参见 Bucketed Append


Query #

Data Skipping By Order #

默认情况下,Paimon记录清单文件中每个字段的最大值和最小值。

在查询中,根据查询的WHERE条件,根据manifest中的统计信息做文件过滤,如果过滤效果好,将查询本来分钟的查询加速到毫秒级完成执行。

通常数据分布并不总是有效的过滤,那么如果我们能在WHERE条件下按字段对数据进行排序呢?您可以查看Flink COMPACT ActionFlink COMPACT ProcedureSpark COMPACT Procedure.

Data Skipping By File Index #

你也可以使用文件索引,它在读取端通过索引过滤文件。

CREATE TABLE <PAIMON_TABLE> (<COLUMN> <COLUMN_TYPE> , ...) WITH (
    'file-index.bloom-filter.columns' = 'c1,c2',
    'file-index.bloom-filter.c1.items' = '200'
);

定义file-index.bloom-filter.columns,Paimon将为每个文件创建相应的索引文件。如果索引文件太小,它将直接存储在清单中,或者存储在数据文件的目录中。每个数据文件对应一个索引文件,索引文件有一个单独的文件定义,可以包含不同类型的索引和多个列。

数据文件索引是某个数据文件对应的外部索引文件。如果索引文件太小,它将直接存储在清单中,否则存储在数据文件的目录中。每个数据文件对应一个索引文件,索引文件有一个单独的文件定义,可以包含不同类型的索引和多个列。

不同的文件索引在不同的场景下可能是有效的。例如,在点查找场景中,bloom过滤器可以加快查询速度。使用位图可能会占用更多空间,但可以获得更高的准确性。

目前,文件索引仅支持在仅追加表中。

布隆过滤器:

  • file-index.bloom-filter.columns: 指定需要布隆过滤器索引的列。
  • file-index.bloom-filter.<column_name>.fpp 配置误报概率。
  • file-index.bloom-filter.<column_name>.items 在一个数据文件中配置所期望的不同项。

位图:
file-index.bitmap.columns: 指定需要位图索引的列。

将支持更多的过滤器类型…

如果您想在现有表中添加文件索引,而不需要重写,您可以使用rewrite_file_index过程。在使用该过程之前,您应该在目标表中配置适当的配置。您可以使用ALTER子句配置file-index.<filter-type>.columns到表。
如何调用:参见 flink procedures


Update #

现在,只有Spark SQL支持DELETE和UPDATE,你可以看看Spark Write.
举例:

DELETE FROM my_table WHERE currency = 'UNKNOWN';

更新追加表有两种模式:

  1. COW (Copy on Write):搜索命中的文件,然后重写每个文件,从文件中删除需要删除的数据。这个操作代价很大。
  2. MOW(写合并):通过指定 'deletion-vectors.enabled' = 'true',则可以启用删除向量模式。只标记相应文件的某些记录进行删除,并写入删除文件,不重写整个文件。

Bucketed Append #

您可以定义bucket和bucket-key以获得bucket附加表。
创建分桶追加表的示例:

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'bucket' = '8',
    'bucket-key' = 'product_id'
);

Streaming #

一个普通的追加表对它的流写和读没有严格的顺序保证,但在某些情况下,你需要定义一个类似于Kafka的键。

同一bucket中的每条记录都是严格排序的,流式读取将按照写入的顺序将记录传输到下游。要使用这种模式,您不需要配置特殊配置,所有数据都将作为队列放入一个bucket中。

image.png

Compaction in Bucket #

默认情况下,汇聚节点将自动执行压缩以控制文件数量。以下选项控制压缩策略:

Key Default Type Description
write-only false Boolean 如果设置为true,将跳过压缩和快照过期。此选项与专用紧凑作业一起使用。
compaction.min.file-num 5 Integer 对于文件集[f_0,…],f_N],满足sum(size(f_i)) >= targetFileSize触发追加表压缩的最小文件数。这个值避免了几乎整个文件被压缩,因为压缩是不划算的。
compaction.max.file-num 5 Integer 对于文件集[f_0,…],f_N],触发追加表压缩的最大文件数,即使sum(size(f_i)) < targetFileSize。此值可避免挂起过多的小文件,以免降低性能。
full-compaction.delta-commits (none) Integer 在增量提交之后,将不断触发完全压缩。

Streaming Read Order #

对于流式读取,记录按以下顺序生成:

  • 对于来自两个不同分区的任意两条记录
    • 如果scan.plan-sort-partition设置为true,将首先生成分区值较小的记录。
    • 否则,将首先生成分区创建时间较早的记录。
  • 对于来自同一分区和同一桶的任意两条记录,先写的记录将先被生产。
  • 对于来自同一分区但不同桶的任意两条记录,不同桶由不同的任务处理,它们之间没有顺序保证。

Watermark Definition #

您可以为读取Paimon表定义水印:

CREATE TABLE t (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
 TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

你也可以启用Flink Watermark alignment,这将确保没有源/分割/碎片/分区将其水印增加得太远。

Key Default Type Description
scan.watermark.alignment.group (none) String 一组用于对齐水印的源。
scan.watermark.alignment.max-drift (none) Duration 在我们暂停从源/任务/分区消费之前,对齐水印的最大漂移。

Bounded Stream #

流源也可以有界,您可以指定‘ scan.bounded.watermark ’来定义有界流模式的结束条件,流读取将结束,直到遇到更大的水印快照。

快照中的水印是由writer生成的,例如可以指定一个kafka源,并声明水印的定义。当使用这个kafka源对Paimon表进行写操作时,Paimon表的快照会生成相应的水印,这样流式读取这个Paimon表时就可以使用有界水印的特性。

CREATE TABLE kafka_table (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);

-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;

-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

Batch #

在批量查询中,如果有必要,可以使用桶状表来避免shuffle,例如,您可以使用以下Spark SQL来读取Paimon表:

SET spark.sql.sources.v2.bucketing.enabled = true;

CREATE TABLE FACT_TABLE (order_id INT, f1 STRING) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');

CREATE TABLE DIM_TABLE (order_id INT, f2 STRING) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');

SELECT * FROM FACT_TABLE JOIN DIM_TABLE on t1.order_id = t4.order_id;

‘spark.sql.sources.v2.bucketing.enabled’配置用于启用V2数据源。打开后,Spark将通过V2数据源的SupportsReportPartitioning识别特定的分布式报告,并在必要时尝试避免shuffle。

如果两个表具有相同的存储桶策略和相同数量的存储桶,则可以避免代价高昂的联接shuffle。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容