Spark引擎

Quick Start #

Preparation #

Paimon目前支持Spark 3.5、3.4、3.3、3.2和3.1。为了获得更好的体验,我们建议使用最新的Spark版本。

下载对应版本的jar文件。

Version Jar
Spark 3.5 paimon-spark-3.5-0.9.0.jar
Spark 3.4 paimon-spark-3.4-0.9.0.jar
Spark 3.3 paimon-spark-3.3-0.9.0.jar
Spark 3.2 paimon-spark-3.2-0.9.0.jar
Spark 3.1 paimon-spark-3.1-0.9.0.jar

您也可以从源代码手动构建绑定的jar。

要从源代码构建,请克隆git存储库.

使用以下命令构建绑定的jar。

mvn clean install -DskipTests

对于Spark 3.3,你可以在。/paimon-spark/paimon-spark-3.3/target/paimon-spark-3.3-0.9.0.jar中找到捆绑的jar包。

Setup #

如果您使用的是HDFS,请确保设置了环境变量HADOOP_HOME或HADOOP_CONF_DIR。

Step 1: Specify Paimon Jar File
在启动spark-sql时,将paimon jar文件的路径附加到——jars参数中。

spark-sql ... --jars /path/to/paimon-spark-3.3-0.9.0.jar

或者使用——packages选项。

spark-sql ... --packages org.apache.paimon:paimon-spark-3.3:0.9.0

或者,您可以复制paimon-spark-3.3-0.9.0.jar到spark安装目录下的spark/jars下。

Step 2: Specify Paimon Catalog

Catalog:
在启动Spark -sql时,使用以下命令以Paimon的名称注册Paimon的Spark目录。仓库的表文件存储在/tmp/paimon下。

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=file:/tmp/paimon \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

使用spark.sql.catalog.(catalog_name)下的属性配置目录。在上面的例子中,‘ paimon ’是目录名,您可以将其更改为您自己喜欢的目录名。

在spark-sql命令行启动后,运行以下SQL创建并切换到数据库default。

USE paimon;
USE default;

切换到目录('USE paimon')后,Spark的现有表将不能直接访问,您可以使用spark_catalog.{database_name}。{table_name}访问Spark表。

Create Table #

Catalog:

create table my_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

Insert Table #

Paimon currently supports Spark 3.2+ for SQL write.

INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');

Query Table #

SQL:

SELECT * FROM my_table;

/*
1   Hi
2   Hello
*/

Spark Type Conversion #

本节列出了Spark和Paimon之间所有支持的类型转换。所有Spark的数据类型都可以在org.apache.spark.sql.types包中找到。

Spark Data Type Paimon Data Type Atomic Type
StructType RowType false
MapType MapType false
ArrayType ArrayType false
BooleanType BooleanType true
ByteType TinyIntType true
ShortType SmallIntType true
IntegerType IntType true
LongType BigIntType true
FloatType FloatType true
DoubleType DoubleType true
StringType VarCharType(Integer.MAX_VALUE) true
VarCharType (length) VarCharType(length) true
CharType (length) CharType(length) true
DateType DateType true
TimestampType LocalZonedTimestamp true
TimestampNTZType (Spark3.4+) TimestampType true
DecimalType (precision, scale) DecimalType(precision, scale) true
BinaryType VarBinaryType, BinaryType true
由于之前的设计,在Spark3.3及以下版本中,Paimon会将Paimon的TimestampType和LocalZonedTimestamp映射到Spark的TimestampType,并且只正确处理TimestampType。 
 
因此,当使用Spark3.3及以下版本时,读取其他引擎(如Flink)编写的LocalZonedTimestamp类型的Paimon表时,LocalZonedTimestamp类型的查询结果会有时区偏移,需要手动调整。

当使用Spark3.4及以上版本时,所有时间戳类型都可以正确解析。

SQL DDL #

Create Catalog #

Paimon catalog目前支持三种类型的元数据存储:

  • filesystem metastore (default), 它在文件系统中存储元数据和表文件。
  • hive metastore, 它还将元数据存储在Hive metastore中。用户可以直接从Hive访问这些表。
  • jdbc metastore, 它额外地将元数据存储在关系数据库中,如MySQL, Postgres等。

有关创建编目时的详细选项,请参阅CatalogOptions

Create Filesystem Catalog #

下面的Spark SQL注册并使用一个名为my_catalog的Paimon编目。元数据和表文件存放在hdfs:///path/to/warehouse下。

下面的shell命令注册一个名为paimon的paimon目录。元数据和表文件存放在hdfs:///path/to/warehouse下。

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse

对于在catalog中创建的表,您可以使用前缀spark.sql.catalog.paimon.table-default定义任何默认的表选项。

启动spark-sql之后,可以使用下面的SQL切换到paimon编目的default数据库。

USE paimon.default;

Creating Hive Catalog #

通过使用Paimon Hive catalog,对catalog的更改将直接影响到相应的Hive metastore。在这样的目录中创建的表也可以直接从Hive访问。

要使用Hive catalog, Database name, Table name和Field name应该是小写的。

您的Spark安装应该能够检测或已经包含Hive依赖项。更多信息请看这里

下面的shell命令注册一个名为Paimon的Paimon Hive目录。元数据和表文件存放在hdfs:///path/to/warehouse下。此外,元数据也存储在Hive metastore中。

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>

您可以使用前缀spark.sql.catalog.paimon.table-default定义任何默认的表选项。对于在目录中创建的表。

启动spark-sql之后,可以使用下面的SQL切换到paimon编目的默认数据库。

USE paimon.default;

另外,您可以创建SparkGenericCatalog.。

Synchronizing Partitions into Hive Metastore #

默认情况下,Paimon不会将新创建的分区同步到Hive metastore。用户将在Hive中看到一个未分区的表。分区下推将改为过滤器下推。

如果您想在Hive中看到一个分区表,并将新创建的分区同步到Hive metastore中,请设置表属性metastore.partitioned-table为true。参见CoreOptions

Creating JDBC Catalog #

通过使用Paimon JDBC编目,对编目的更改将直接存储在SQLite、MySQL、postgres等关系数据库中。

目前,锁配置只支持MySQL和SQLite。如果您使用不同类型的数据库进行目录存储,请不要配置lock.enabled。

Spark中的Paimon JDBC Catalog需要正确添加相应的连接数据库的jar包。您应该首先下载JDBC连接器绑定的jar并将其添加到类路径中。例如MySQL, postgres

database type Bundle Name SQL Client JAR
mysql mysql-connector-java Download
postgres postgresql Download
spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
    --conf spark.sql.catalog.paimon.metastore=jdbc \
    --conf spark.sql.catalog.paimon.uri=jdbc:mysql://<host>:<port>/<databaseName> \
    --conf spark.sql.catalog.paimon.jdbc.user=... \
    --conf spark.sql.catalog.paimon.jdbc.password=...
USE paimon.default;

Create Table #

在使用Paimon catalog之后,您可以创建和删除表。在Paimon Catalogs中创建的表由编目管理。当表从目录中删除时,它的表文件也将被删除。

下面的SQL假设您已经注册并正在使用Paimon编目。它在目录的默认数据库中创建一个名为my_table的托管表,其中有五列,其中dt、hh和user_id是主键。

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

您可以创建分区表:

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

Create Table As Select #

表可以通过查询的结果来创建和填充,例如,我们有这样一个sql: CREATE Table table_b AS SELECT id, name FORM table_a,结果表table_b相当于用下面的语句创建表并插入数据:INSERT INTO table_b FROM table_a

当使用CREATE TABLE作为SELECT时,我们可以指定主键或分区,语法请参考下面的sql。

CREATE TABLE my_table (
     user_id BIGINT,
     item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;

/* partitioned table*/
CREATE TABLE my_table_partition (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;

/* change TBLPROPERTIES */
CREATE TABLE my_table_options (
       user_id BIGINT,
       item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;


/* primary key */
CREATE TABLE my_table_pk (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;

/* primary key + partition */
CREATE TABLE my_table_all (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;

SQL Write #

Syntax #

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };

有关更多信息,请查看语法文档:
Spark INSERT Statement

INSERT INTO #

使用INSERT INTO将记录和更改应用到表中。

INSERT INTO my_table SELECT ...

Overwriting the Whole Table #

使用INSERT OVERWRITE覆盖整个未分区表。

INSERT OVERWRITE my_table SELECT ...

Overwriting a Partition #

使用INSERT OVERWRITE覆盖分区。

INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...

Dynamic Overwrite #

Spark默认的覆盖模式是静态分区覆盖。要启用动态覆盖,需要将Spark会话配置spark.sql.sources. partitionoverwritemode设置为dynamic
举例:

CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');

-- Static overwrite (Overwrite the whole table)
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  3| p1|
+---+---+
*/

-- Dynamic overwrite (Only overwrite pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  2| p2|
|  3| p1|
+---+---+
*/

Truncate tables #

TRUNCATE TABLE my_table;

Updating tables #

spark支持更新PrimitiveType和StructType,例如:

-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;

CREATE TABLE t (
  id INT, 
  s STRUCT<c1: INT, c2: STRING>, 
  name STRING)
TBLPROPERTIES (
  'primary-key' = 'id', 
  'merge-engine' = 'deduplicate'
);

-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;

Deleting from table #

DELETE FROM my_table WHERE currency = 'UNKNOWN';

Merging into table #

Paimon目前支持Spark 3+中的Merge Into语法,它允许在一次提交中基于源表进行一系列更新、插入和删除。

  1. 这只适用于主键表。
  2. 在update子句中不支持更新主键列。
  3. 不支持NOT MATCHED BY SOURCE语法。

Example: One
这是一个简单的演示,如果目标表中存在一行,则更新它,否则插入它。


-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

Example: Two
这是一个包含多个条件子句的示例。

-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
   UPDATE SET b = source.b + target.b      -- when matched and meet the condition 1, then update b;
WHEN MATCHED AND source.c > 'c2' THEN
   UPDATE SET *    -- when matched and meet the condition 2, then update all the columns;
WHEN MATCHED THEN
   DELETE      -- when matched, delete this row in target table;
WHEN NOT MATCHED AND c > 'c9' THEN
   INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT *      -- when not matched, insert this row without any transformation;

Streaming Write #

Paimon目前支持Spark 3+流写入。
Paimon结构化流只支持追加和完成两种模式。

// Create a paimon table if not exists.
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
           |""".stripMargin)

// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// Streaming Write to paimon table.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")

Schema Evolution #

模式演化是一种允许用户轻松修改表的当前模式以适应现有数据或随时间变化的新数据的特性,同时保持数据的完整性和一致性。

Paimon支持在写入数据时自动合并源数据和当前表数据的模式,并将合并后的模式用作表的最新模式,只需要配置write.merge-schema即可。

data.write
  .format("paimon")
  .mode("append")
  .option("write.merge-schema", "true")
  .save(location)

当启用write.merge-schema时,Paimon默认允许用户对表模式执行以下操作:

  1. 添加列
  2. 扩大(提升)转换类型(例如:Int ->Long)

Paimon还支持特定类型之间的显式类型转换(例如String -> Date, Long -> Int),它需要显式配置write.merge-schema.explicit-cast。

模式演化可以同时用于流模式。

val inputData = MemoryStream[(Int, String)]
inputData
  .toDS()
  .toDF("col1", "col2")
  .writeStream
  .format("paimon")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .option("write.merge-schema.explicit-cast", "true")
  .start(location)

下面列出了配置。
|Scan Mode | Description |
|write.merge-schema |如果为true,则在写入数据前自动合并数据模式和表模式。 |
|write.merge-schema.explicit-cast |如果为true,则允许合并数据类型,如果两种类型满足显式强制转换的规则。 |


SQL Query #

与所有其他表一样,可以使用SELECT语句查询Paimon表。

Batch Query #

Paimon的批处理读取返回表快照中的所有数据。默认情况下,批处理读取返回最新的快照。

Batch Time Travel #

带时间旅行的Paimon批读可以指定一个快照或一个标签,并读取相应的数据。

需要Spark 3.3+。

你可以在查询中使用VERSION AS OF和TIMESTAMP AS OF来进行时间旅行:

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;

-- read the snapshot from specified timestamp 
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';

-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';

-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

如果标签的名称是一个数字,并且等于快照id,则VERSION AS OF语法将首先考虑标签。例如,如果基于快照2有一个名为“1”的标签,语句SELECT * FROM t VERSION AS OF ‘1’实际上查询快照2而不是快照1。

Batch Incremental #

读取开始快照(不包含)和结束快照之间的增量变化。
举例:

  • “5,10”表示快照5和快照10之间的变化。
  • ‘ TAG1,TAG3 ’表示TAG1和TAG3之间的变化。

默认情况下,将扫描changelog文件以查找生成changelog文件的表。否则,扫描新修改的文件。你也可以强制指定“incremental-between-scan-mode”。

需要Spark 3.2+。

Paimon支持使用Spark SQL进行增量查询,而增量查询是由Spark表值函数实现的。

你可以使用query中的paimon_incremental_query来提取增量数据:

-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);

在批处理SQL中,不允许返回DELETE记录,因此将删除-D的记录。如果要查看DELETE记录,可以查询audit_log表。

Streaming Query #

Paimon目前支持Spark 3.3+流式读取。
Paimon支持流式读取的富扫描模式。这里有一个列表:

Scan Mode Description
latest 对于流源,连续地读取最新的更改,而不是在开始时生成快照。
latest-full 对于流源,在第一次启动时在表上生成最新的快照,并继续读取最新的更改。
from-timestamp 对于流源,从指定为“scan.timestamp-millis”的时间戳开始连续读取更改,而不会在开始时生成快照。
from-snapshot 对于流源,从“scan.snapshot-id”指定的快照开始连续读取更改,不开始生成快照。
from-snapshot-full 对于流源,在第一次启动时从表上指定的“scan.snapshot-id”快照生成,并持续读取更改。
default 如果指定了“scan.snapshot-id”,则相当于from-snapshot。如果指定了timestamp-millis,则相当于from-timestamp。或者,它相当于latest-full。

一个具有默认扫描模式的简单示例:

// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
  .format("paimon")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon结构化流还支持多种流读模式,它可以支持许多触发器和许多读限制。
支持以下读限制:

Key Default Type Description
read.stream.maxFilesPerTrigger (none) Integer 在单个批处理中返回的最大文件数。
read.stream.maxBytesPerTrigger (none) Long 在单个批处理中返回的最大字节数。
read.stream.maxRowsPerTrigger (none) Long 在单个批处理中返回的最大行数。
read.stream.minRowsPerTrigger (none) Long 单个批处理中返回的最小行数,用于与read.stream.maxTriggerDelayMs一起创建MinRowsReadLimit。
read.stream.maxTriggerDelayMs (none) Long 两个相邻批之间的最大延迟,用于与read.stream.minRowsPerTrigger一起创建MinRowsReadLimit。

举例: 1
使用paimon定义的org.apache.spark.sql.streaming.Trigger.AvailableNow()和maxBytesPerTrigger

// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

举例: 2
使用 org.apache.spark.sql.connector.read.streaming.ReadMinRows.

// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.minRowsPerTrigger", "5000")
  .option("read.stream.maxTriggerDelayMs", "300000")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon结构化流支持以更改日志的形式读取行(在行中添加rowkind列来表示其更改类型),有两种方式:

  • 直接流式读取系统audit_log表
  • 将 read.changelog设置为true(默认为false),然后流式读取表位置
    举例:
// Option 1
val query1 = spark.readStream
  .format("paimon")
  .table("`table_name$audit_log`")
  .writeStream
  .format("console")
  .start()

// Option 2
val query2 = spark.readStream
  .format("paimon")
  .option("read.changelog", "true")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

/*
+I   1  Hi
+I   2  Hello
*/

Query Optimization #

强烈建议与查询一起指定分区和主键过滤器,这将加快查询的数据跳过。
可以加速数据跳转的过滤函数有:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

Paimon将按主键对数据进行排序,这加快了点查询和范围查询的速度。当使用复合主键时,查询过滤器最好在主键的最左边形成一个前缀,以获得良好的加速。

假设一个表具有以下规格:

CREATE TABLE orders (
    catalog_id BIGINT,
    order_id BIGINT,
    .....,
) TBLPROPERTIES (
    'primary-key' = 'catalog_id,order_id'
);

通过为主键的最左边的前缀指定一个范围过滤器,查询可以获得很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders
  WHERE catalog_id=1025
  AND order_id>2035 AND order_id<6000;

但是,下面的过滤器不能很好地加速查询。

SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

Altering Tables #

Changing/Adding Table Properties #

下面的SQL将写缓冲区大小的表属性设置为256 MB。

ALTER TABLE my_table SET TBLPROPERTIES (
    'write-buffer-size' = '256 MB'
);

Removing Table Properties #

下面的SQL删除了write-buffer-size的表属性。

ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size');

Changing/Adding Table Comment #

下面的SQL将表my_table的注释更改为表注释。

ALTER TABLE my_table SET TBLPROPERTIES (
    'comment' = 'table comment'
    );

Removing Table Comment #

下面的SQL删除表注释。

ALTER TABLE my_table UNSET TBLPROPERTIES ('comment');

Rename Table Name #

下面的SQL将表名重命名为新名称。
最简单的sql调用是:

ALTER TABLE my_table RENAME TO my_table_new;

注意:我们可以在spark中这样重命名paimon表:

ALTER TABLE [catalog.[database.]]test1 RENAME to [database.]test2;

但是我们不能把目录名放在重命名表之前,如果我们这样写sql,它会抛出一个错误:

ALTER TABLE catalog.database.test1 RENAME to catalog.database.test2;

如果您使用对象存储,如S3或OSS,请谨慎使用此语法,因为对象存储的重命名不是原子性的,在失败的情况下可能只会移动部分文件。

Adding New Columns #

下面的SQL将两列c1和c2添加到表my_table中。

ALTER TABLE my_table ADD COLUMNS (
    c1 INT,
    c2 STRING
);

Renaming Column Name #

下面的SQL将表my_table中的列c0重命名为c1。

ALTER TABLE my_table RENAME COLUMN c0 TO c1;

Dropping Columns #

下面的SQL从表my_table中删除两列c1和c2。

ALTER TABLE my_table DROP COLUMNS (c1, c2);

Dropping Partitions #

下面的SQL删除paimon表的分区。对于spark sql,您需要指定所有分区列。

ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');

Changing Column Comment #

下面的SQL将列buy_count的注释更改为购买计数。

ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';

Adding Column Position #

ALTER TABLE my_table ADD COLUMN c INT FIRST;
ALTER TABLE my_table ADD COLUMN c INT AFTER b;

Changing Column Position #

ALTER TABLE my_table ALTER COLUMN col_a FIRST;

ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;

Changing Column Type #

ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;

Auxiliary Statements #

Set / Reset #

SET命令设置属性,返回现有属性的值或返回所有具有值和含义的SQLConf属性。RESET命令将特定于当前会话的运行时配置重置为通过set命令设置的默认值。要专门设置paimon配置,需要添加spark.paimon。前缀。

-- set spark conf
SET spark.sql.sources.partitionOverwriteMode=dynamic;
 
-- set paimon conf
SET spark.paimon.file.block-size=512M;

-- reset conf
RESET spark.paimon.file.block-size;

Describe table #

description TABLE语句返回表的基本元数据信息。元数据信息包括列名、列类型和列注释。

-- describe table
DESCRIBE TABLE my_table;

-- describe table with additional metadata
DESCRIBE TABLE EXTENDED my_table;

Show create table #

SHOW CREATE TABLE返回用于创建给定表的CREATE TABLE语句。

SHOW CREATE TABLE my_table;

Show columns #

返回表中列的列表。如果表不存在,则抛出异常。

SHOW COLUMNS FROM my_table;

Show partitions #

SHOW PARTITIONS语句用于列出表的分区。可以指定一个可选的分区规格来返回与提供的分区规格匹配的分区。

-- Lists all partitions for my_table
SHOW PARTITIONS my_table;

-- Lists partitions matching the supplied partition spec for my_table
SHOW PARTITIONS my_table PARTITION (dt=20230817);

Analyze table #

ANALYZE TABLE语句收集关于表的统计信息,查询优化器将使用这些信息来找到更好的查询执行计划。Paimon支持通过analyze收集表级统计信息和列级统计信息。

-- collect table-level statistics
ANALYZE TABLE my_table COMPUTE STATISTICS;

-- collect table-level statistics and column statistics for col1
ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS col1;

-- collect table-level statistics and column statistics for all columns
ANALYZE TABLE my_table COMPUTE STATISTICS FOR ALL COLUMNS;

Procedures #

本节介绍关于paimon的所有可用的spark过程。

Procedure Name Explanation Example
compact 压缩文件。参数:
- Table:目标表标识符。不能是空的。
- 分区:分区过滤器。“,”表示“AND”。
“;”表示“或”。如果要压缩一个date=01和day=01的分区,则需要写入‘date=01,day=01’。所有分区都为空。(不能与where连用)
where:分区谓词。所有分区都为空。(不能和“partitions”一起使用)
Order_strategy: ‘order’或‘zorder’或‘hilbert’或‘none’。空为“none”。
Order_columns:需要排序的列。如果‘order_strategy’为‘none’,则为空。
Partition_idle_time:这用于对没有接收到‘ Partition_idle_time ’新数据的分区进行完全压缩。只有这些分区会被压缩。这个论证不能用于紧序。
SET spark.sql.shuffle.partitions=10; --set the compact parallelism

CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')

CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')

CALL sys.compact(table => 'T', partition_idle_time => '60s')
expire_snapshots 使快照过期。参数:
- Table:目标表标识符。不能是空的。
- Retain_max:保留已完成快照的最大数量。
- Retain_min:保留已完成快照的最小数量。
- Older_than:快照被删除的时间戳。
- Max_deletes:一次可以删除的最大快照数量。
CALL sys.expire_snapshots(table => 'default.T', retain_max => 10)
expire_partitions 使分区过期。参数:
- Table:目标表标识符。不能是空的。
- Expiration_time:分区的过期时间。如果分区的生存期超过此值,则分区将过期。分区时间从分区值中提取。
- Timestamp_formatter:从字符串格式化时间戳的格式化程序。
- Timestamp_pattern:从分区获取时间戳的模式。
- Expire_strategy:分区过期策略,取值范围:‘values-time’或‘update-time’,默认为‘values-time’。
CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time')
create_tag 创建基于给定快照的标记。参数:
- Table:目标表标识符。不能是空的。
- 标签:新标签的名称。不能是空的。
- snapshot(长):新标签所基于的快照id。
- time_retained:新创建标签的最大保留时间。
-- based on snapshot 10 with 1d
CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d')

-- based on the latest snapshot
CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
---- ---- ----
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容