Pyspark(最新)

1.文件挂载和文件操作

1.1 挂载mount blob

  • source里的内容source = 'wasbs://<container名称>@<blob的名称>.blob.core.windows.net;
  • extra_configs的内容是extra_configs = {'fs.azure.account.key.<blob的名称>.blob.core.windows.net':'<container里的access key>'}
dbutils.fs.mount(source = 'wasbs://input@dl108lg.blob.core.windows.net',
                          mount_point= '/mnt/blob108_input',
                          extra_configs = {'fs.azure.account.key.dl108lg.blob.core.windows.net':'fuDFNS1ziD9Lw4aeH/N6gw7+4'}
)
**注意:如果使用的时sas当作extra_configs的话,改成`extra_configs = {'fs.azure.<container-name>.<blolb-name>.blob.core....`**

1.2查看挂载点的信息

  1. 查看所有的mount点
dbutils.fs.mounts()

1.3 按照指定格式读取文件

  • 读取csv
path = '/mnt/blob108_bronze/Sales/emp.csv
df = spark.read.format('csv').load(path,header = True)
  • 读取parquet
path = '/mnt/blob108_bronze/Sales/emp.parquet
df = spark.read.format('parquet').load(path,header = True)
  • 读取delta
path = '/mnt/blob108_bronze/Sales/emp.parquet
df = spark.read.format('delta').load(path,header = True)

1.4 查看df

display(df)
  • sql:
select * from df

2.select和filter

2.1 select()选择需要的数据

  1. 选择单独的列
#方法1:
df.select('SalesYTD').show(n=5)

#方法2:
from pyspark.sql.functions import col

df.select(col('SalesYTD')).show(n=5)

#方法3:
df.select(df.SalesYTD).show()

#方法4:
df.select(df['SalesYTD']).show()
  • sql:
select SalesYTD from df
  1. 选择多列
df.select(['SalesYTD','Bonus'])

2.2 selectExpr()可以像sql一样处理column

1.使用aggregate

df.selectExpr('sum(SalesYTD)').show()
  • sql:
select sum(SalesYTD) from df
  1. 字符串处理
df.selectExpr("CONCAT(month,year) AS MonthYear","UPPER(month) AS Capital_Months").show()
  1. 条件处理
df.selectExpr("State","CASE WHEN State='CA' THEN 'California' WHEN State='NY' THEN 'New York' END AS State_Name").show(n=5)

3.filter() 过滤

  1. 大于4000的
df.filter(df.SalesYTD>4000000).show()
  • sql:
select SalesYTD from df where SalesYTD>4000000
  1. 多条件过滤
df.filter(df.SalesYTD>4000000 & df.Bonus<55000).show()
  • sql:
select * from df where SalesYTD>4000000 and Bonus<55000
  1. 过滤字符串是否包含
df.filter(col('education').contain('degree').show()

4.等于和不等于

df.filter(df.region=='CA').show()
df.filter(~df.region=='CA').show()
  • sql
select * from df where region != 'CA'

4. 处理Null,重复和聚合

4.1空值

4.1 查找空值

df.filter(df['SalesYTD'].isNull()).show()

4.2 删除/填充 空值

  1. 删除空值所在一行
df.dropna().show()
  1. 使用指定的值,填充空值的行
filled_df = df.fillna({"column_name": "value"})
filled_df.show()

4.2 重复

  1. 查看表的重复情况
duplicate_columns = df.groupBy("name", "dep_id").count().filter("count > 1").show()
  1. 根据分组删除重复;不加入上面的分组,会直接删除所有相同的行,留下一行
df_no_duplicates = df.dropDuplicates(["name", "dep_id"])
df_no_duplicates.orderBy('emp_id').show()

3.根据id排序,显示删除后的表

df_no_duplicates.orderBy('emp_id').show()

4.写入到文件夹

path = '/mnt/blob108_bronze/no_duplicate_emp/'
df_no_duplicates.write.format('parquet').mode('overwrite').save(path)

5. withcolumn() 创建新的列

1.创建一个新的列

from pyspark.sql.functions import concat

df.withColumn('new_info',concat(df.name , df.country)).show()
  • sql :
select CONCAT(name,country) as new from df
image.png
  1. 将每个单词的开头变成大写
df.withColumn(df.name,initcap('df.name')).show()
  1. 使用when 来判断
df_with_category = df.withColumn("category", when(df.age < 25, "Young").when(df.age > 25,"new").otherwise("Old"))

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容