Snowpark Scala Example

1.版本要求

Scala:2.12 (不支持2.13)
JVM for Scala: 11.x

2.Setting Up IntelliJ IDEA CE for Snowpark Scala

(1) 创建项目


image.png

(2) 在SBT文件中加入如下依赖

libraryDependencies += "com.snowflake" % "snowpark" % "1.8.0"

(3) 编写如下代码并运行

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

object Main {
  def main(args: Array[String]): Unit = {
    val configs = Map(
      "URL" -> "freewheelmediadev.us-east-1.snowflakecomputing.com",
      "USER" -> "pgao@freewheel.tv",
      "PASSWORD" -> "Gp5859561@",
      "ROLE" -> "USER_PGAO_ROLE",
      "WAREHOUSE" -> "WH_USER_PGAO_XS",
      "DB" -> "DEMO_DB",
      "SCHEMA" -> "LAKE_HOUSE_HUDI"
    )
    val session = Session.builder.configs(configs).create
    session.sql("show tables").show()
  }
}

3.Working with DataFrames

(1) 建立数据表

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);

(2) 构建Dataframe

  • 要从表、视图或流中的数据创建一个DataFrame,可以调用table方法:
// Create a DataFrame from the data in the "sample_product_data" table.
val dfTable = session.table("sample_product_data")

// To print out the first 10 rows, call:
//   dfTable.show()

session.table方法返回一个Updatable对象。Updatable继承自DataFrame,并提供了额外的方法来处理表中的数据(例如,用于更新和删除数据的方法)。

  • 要从值的序列创建一个DataFrame,可以调用createDataFrame方法:
// Create a DataFrame containing a sequence of values.
// In the DataFrame, name the columns "i" and "s".
val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
  • 要创建一个包含一定范围值的DataFrame,可以调用range方法:
// Create a DataFrame from a range
val dfRange = session.range(1, 10, 2)
  • 要为Stage中的文件创建一个DataFrame,可以调用read方法来获取一个DataFrameReader对象。在DataFrameReader对象中,调用与文件中数据格式相对应的方法:
// Create a DataFrame from data in a stage.
val dfJson = session.read.json("@mystage2/data1.json")
  • 要创建一个用于保存 SQL 查询结果的 DataFrame,可以调用 sql 方法:
// Create a DataFrame from a SQL query
val dfSql = session.sql("SELECT name from products")

(3) 对Dataframe进行transformation

  • 要指定应返回哪些行,请调用 filter 方法:
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

// Create a DataFrame for the rows with the ID 1
// in the "sample_product_data" table.
//
// This example uses the === operator of the Column object to perform an
// equality check.
val df = session.table("sample_product_data").filter(col("id") === 1)
df.show()
  • 要指定应选择哪些列,请调用 select 方法:
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

// Create a DataFrame that contains the id, name, and serial_number
// columns in te "sample_product_data" table.
val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
df.show()

每个方法返回一个经过转换的新的 DataFrame 对象(不会影响原始的 DataFrame 对象)。这意味着如果你想应用多个转换操作,你可以链式调用方法,将每个后续的转换方法调用应用在前一个方法返回的新 DataFrame 对象上。
请注意,这些transformation方法并不从Snowflake数据库中检索数据。(在执行DataFrame的action方法时,会执行数据检索。)transformation方法仅仅指定了如何构造SQL语句。

  • 处理不同Dataframe中的相同Column
    当在两个具有相同列名的不同 DataFrame 对象中引用列时(例如,在该列上进行连接操作),您可以在一个 DataFrame 对象中使用 DataFrame.col 方法来引用该对象中的列(例如,df1.col("name") 和 df2.col("name"))。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
  • 将列对象转换为特定类型
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))
  • 链式调用方法
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
  • 获取列定义
    要检索DataFrame数据集中列的定义,请调用schema方法。此方法返回一个StructType对象,其中包含StructField对象的数组。每个StructField对象包含列的定义。
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);

4.Dataframe Join

(1) 示例数据

create or replace table sample_a (
  id_a integer,
  name_a varchar,
  value integer
);
insert into sample_a (id_a, name_a, value) values
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
create or replace table sample_b (
  id_b integer,
  name_b varchar,
  id_a integer,
  value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', null, 200)
;
create or replace table sample_c (
  id_c integer,
  name_c varchar,
  id_a integer,
  id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
  (1012, 'C1', 10, null),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;

(2)指定要Join的Column

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()

(3) 指定Join类型
Join类型有以下几种:

image.png

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()

5.对DataFrame进行Action操作

如前所述,DataFrame是惰性执行的,这意味着直到执行某个操作之前,SQL语句不会被发送到服务器执行。

执行同步Action操作

image.png

for example:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())

执行异步Action操作

要异步执行操作,调用async方法返回一个"async actor"对象(例如DataFrameAsyncActor),然后在该对象中调用异步操作方法。
这些异步 actor 对象的操作方法会返回一个 TypedAsyncJob 对象,你可以使用该对象来检查异步操作的状态并获取操作的结果。

image.png

从返回的 TypedAsyncJob 对象中,你可以进行以下操作:

  • 要确定操作是否已完成,调用 isDone 方法。
  • 要获取与操作对应的查询 ID,请调用 getQueryId 方法。
  • 要返回操作的结果(例如 collect 方法的 Row 对象数组或 count 方法的行数),请调用 getResult 方法。请注意,getResult 是一个阻塞调用。
  • 要取消操作,请调用 cancel 方法。

example:

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)

指定最长等待时间

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)

通过ID获取异步query

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)

6.在Table中Updating/Deleting/Merging Rows

当你调用 Session.table 来为一个表创建一个 DataFrame 对象时,该方法会返回一个 Updatable 对象,它扩展了 DataFrame,并提供了额外的方法用于更新和删除表中的数据。
如果你需要更新或删除表中的行,可以使用 Updatable 类的以下方法:

  • 调用 update 来更新表中的现有行。
  • 调用 delete 来从表中删除行。
  • 调用 merge 来根据第二个表或子查询中的数据,向一个表中插入、更新和删除行(这相当于 SQL 中的 MERGE 命令)。

Updating rows
对于 update 方法,传入一个 Map,将要更新的列与对应的值关联起来。update 方法会返回一个 UpdateResult 对象,其中包含被更新的行数。
update 是一个Action方法,这意味着调用该方法会发送 SQL 语句到服务器执行。

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")

Deleting rows
对于 delete 方法,你可以指定一个条件来标识要删除的行,并且可以基于与另一个 DataFrame 的连接来构建该条件。delete 方法会返回一个 DeleteResult 对象,其中包含被删除的行数。
delete 是一个操作方法,这意味着调用该方法会发送 SQL 语句到服务器执行。

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")

Merging rows

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()

7.将数据保存到Table中

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

默认情况下,columnOrder 选项被设置为 "index",这意味着 DataFrameWriter 按照列的出现顺序插入值。例如,DataFrameWriter 将从 DataFrame 的第一列插入值到表中的第一列,从 DataFrame 的第二列插入值到表中的第二列,依此类推。
如果你将行插入到现有的表中(SaveMode.Append),并且 DataFrame 中的列名与表中的列名匹配,可以调用 DataFrameWriter.option 方法,传入 "columnOrder" 和 "name" 作为参数。

8.通过DataFrame创建一个View

// 创建persistent view
df.createOrReplaceView("db.schema.viewName")

// 创建tmp view
df.createOrReplaceTempView("db.schema.viewName")

9.Cache DataFrame

在某些情况下,你可能需要执行复杂的查询并保留结果以供后续操作使用(而不是再次执行相同的查询)。在这种情况下,你可以通过调用 DataFrame.cacheResult 方法来缓存 DataFrame 的内容。
该方法的工作方式如下:

  • 运行查询。
    在调用 cacheResult 之前,你无需调用单独的操作方法来检索结果。cacheResult 是一个执行查询的操作方法。
  • 将结果保存在临时表中。
    由于 cacheResult 创建了一个临时表,因此你必须对正在使用的模式具有 CREATE TABLE 权限。
  • 返回一个 HasCachedResult 对象,该对象提供对临时表中结果的访问。
    由于 HasCachedResult 扩展了 DataFrame,你可以对这些缓存数据执行与 DataFrame 上相同的一些操作。
import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容