Snowpark Scala Example

1.版本要求

Scala:2.12 (不支持2.13)
JVM for Scala: 11.x

2.Setting Up IntelliJ IDEA CE for Snowpark Scala

(1) 创建项目


image.png

(2) 在SBT文件中加入如下依赖

libraryDependencies += "com.snowflake" % "snowpark" % "1.8.0"

(3) 编写如下代码并运行

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

object Main {
  def main(args: Array[String]): Unit = {
    val configs = Map(
      "URL" -> "freewheelmediadev.us-east-1.snowflakecomputing.com",
      "USER" -> "pgao@freewheel.tv",
      "PASSWORD" -> "Gp5859561@",
      "ROLE" -> "USER_PGAO_ROLE",
      "WAREHOUSE" -> "WH_USER_PGAO_XS",
      "DB" -> "DEMO_DB",
      "SCHEMA" -> "LAKE_HOUSE_HUDI"
    )
    val session = Session.builder.configs(configs).create
    session.sql("show tables").show()
  }
}

3.Working with DataFrames

(1) 建立数据表

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);

(2) 构建Dataframe

  • 要从表、视图或流中的数据创建一个DataFrame,可以调用table方法:
// Create a DataFrame from the data in the "sample_product_data" table.
val dfTable = session.table("sample_product_data")

// To print out the first 10 rows, call:
//   dfTable.show()

session.table方法返回一个Updatable对象。Updatable继承自DataFrame,并提供了额外的方法来处理表中的数据(例如,用于更新和删除数据的方法)。

  • 要从值的序列创建一个DataFrame,可以调用createDataFrame方法:
// Create a DataFrame containing a sequence of values.
// In the DataFrame, name the columns "i" and "s".
val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
  • 要创建一个包含一定范围值的DataFrame,可以调用range方法:
// Create a DataFrame from a range
val dfRange = session.range(1, 10, 2)
  • 要为Stage中的文件创建一个DataFrame,可以调用read方法来获取一个DataFrameReader对象。在DataFrameReader对象中,调用与文件中数据格式相对应的方法:
// Create a DataFrame from data in a stage.
val dfJson = session.read.json("@mystage2/data1.json")
  • 要创建一个用于保存 SQL 查询结果的 DataFrame,可以调用 sql 方法:
// Create a DataFrame from a SQL query
val dfSql = session.sql("SELECT name from products")

(3) 对Dataframe进行transformation

  • 要指定应返回哪些行,请调用 filter 方法:
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

// Create a DataFrame for the rows with the ID 1
// in the "sample_product_data" table.
//
// This example uses the === operator of the Column object to perform an
// equality check.
val df = session.table("sample_product_data").filter(col("id") === 1)
df.show()
  • 要指定应选择哪些列,请调用 select 方法:
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

// Create a DataFrame that contains the id, name, and serial_number
// columns in te "sample_product_data" table.
val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
df.show()

每个方法返回一个经过转换的新的 DataFrame 对象(不会影响原始的 DataFrame 对象)。这意味着如果你想应用多个转换操作,你可以链式调用方法,将每个后续的转换方法调用应用在前一个方法返回的新 DataFrame 对象上。
请注意,这些transformation方法并不从Snowflake数据库中检索数据。(在执行DataFrame的action方法时,会执行数据检索。)transformation方法仅仅指定了如何构造SQL语句。

  • 处理不同Dataframe中的相同Column
    当在两个具有相同列名的不同 DataFrame 对象中引用列时(例如,在该列上进行连接操作),您可以在一个 DataFrame 对象中使用 DataFrame.col 方法来引用该对象中的列(例如,df1.col("name") 和 df2.col("name"))。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
  • 将列对象转换为特定类型
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))
  • 链式调用方法
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
  • 获取列定义
    要检索DataFrame数据集中列的定义,请调用schema方法。此方法返回一个StructType对象,其中包含StructField对象的数组。每个StructField对象包含列的定义。
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);

4.Dataframe Join

(1) 示例数据

create or replace table sample_a (
  id_a integer,
  name_a varchar,
  value integer
);
insert into sample_a (id_a, name_a, value) values
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
create or replace table sample_b (
  id_b integer,
  name_b varchar,
  id_a integer,
  value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', null, 200)
;
create or replace table sample_c (
  id_c integer,
  name_c varchar,
  id_a integer,
  id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
  (1012, 'C1', 10, null),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;

(2)指定要Join的Column

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()

(3) 指定Join类型
Join类型有以下几种:

image.png

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()

5.对DataFrame进行Action操作

如前所述,DataFrame是惰性执行的,这意味着直到执行某个操作之前,SQL语句不会被发送到服务器执行。

执行同步Action操作

image.png

for example:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())

执行异步Action操作

要异步执行操作,调用async方法返回一个"async actor"对象(例如DataFrameAsyncActor),然后在该对象中调用异步操作方法。
这些异步 actor 对象的操作方法会返回一个 TypedAsyncJob 对象,你可以使用该对象来检查异步操作的状态并获取操作的结果。

image.png

从返回的 TypedAsyncJob 对象中,你可以进行以下操作:

  • 要确定操作是否已完成,调用 isDone 方法。
  • 要获取与操作对应的查询 ID,请调用 getQueryId 方法。
  • 要返回操作的结果(例如 collect 方法的 Row 对象数组或 count 方法的行数),请调用 getResult 方法。请注意,getResult 是一个阻塞调用。
  • 要取消操作,请调用 cancel 方法。

example:

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)

指定最长等待时间

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)

通过ID获取异步query

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)

6.在Table中Updating/Deleting/Merging Rows

当你调用 Session.table 来为一个表创建一个 DataFrame 对象时,该方法会返回一个 Updatable 对象,它扩展了 DataFrame,并提供了额外的方法用于更新和删除表中的数据。
如果你需要更新或删除表中的行,可以使用 Updatable 类的以下方法:

  • 调用 update 来更新表中的现有行。
  • 调用 delete 来从表中删除行。
  • 调用 merge 来根据第二个表或子查询中的数据,向一个表中插入、更新和删除行(这相当于 SQL 中的 MERGE 命令)。

Updating rows
对于 update 方法,传入一个 Map,将要更新的列与对应的值关联起来。update 方法会返回一个 UpdateResult 对象,其中包含被更新的行数。
update 是一个Action方法,这意味着调用该方法会发送 SQL 语句到服务器执行。

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")

Deleting rows
对于 delete 方法,你可以指定一个条件来标识要删除的行,并且可以基于与另一个 DataFrame 的连接来构建该条件。delete 方法会返回一个 DeleteResult 对象,其中包含被删除的行数。
delete 是一个操作方法,这意味着调用该方法会发送 SQL 语句到服务器执行。

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")

Merging rows

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()

7.将数据保存到Table中

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

默认情况下,columnOrder 选项被设置为 "index",这意味着 DataFrameWriter 按照列的出现顺序插入值。例如,DataFrameWriter 将从 DataFrame 的第一列插入值到表中的第一列,从 DataFrame 的第二列插入值到表中的第二列,依此类推。
如果你将行插入到现有的表中(SaveMode.Append),并且 DataFrame 中的列名与表中的列名匹配,可以调用 DataFrameWriter.option 方法,传入 "columnOrder" 和 "name" 作为参数。

8.通过DataFrame创建一个View

// 创建persistent view
df.createOrReplaceView("db.schema.viewName")

// 创建tmp view
df.createOrReplaceTempView("db.schema.viewName")

9.Cache DataFrame

在某些情况下,你可能需要执行复杂的查询并保留结果以供后续操作使用(而不是再次执行相同的查询)。在这种情况下,你可以通过调用 DataFrame.cacheResult 方法来缓存 DataFrame 的内容。
该方法的工作方式如下:

  • 运行查询。
    在调用 cacheResult 之前,你无需调用单独的操作方法来检索结果。cacheResult 是一个执行查询的操作方法。
  • 将结果保存在临时表中。
    由于 cacheResult 创建了一个临时表,因此你必须对正在使用的模式具有 CREATE TABLE 权限。
  • 返回一个 HasCachedResult 对象,该对象提供对临时表中结果的访问。
    由于 HasCachedResult 扩展了 DataFrame,你可以对这些缓存数据执行与 DataFrame 上相同的一些操作。
import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容