1.文件挂载和文件操作
1.1 挂载mount blob
- source里的内容
source = 'wasbs://<container名称>@<blob的名称>.blob.core.windows.net; - extra_configs的内容是
extra_configs = {'fs.azure.account.key.<blob的名称>.blob.core.windows.net':'<container里的access key>'}
dbutils.fs.mount(source = 'wasbs://input@dl108lg.blob.core.windows.net',
mount_point= '/mnt/blob108_input',
extra_configs = {'fs.azure.account.key.dl108lg.blob.core.windows.net':'fuDFNS1ziD9Lw4aeH/N6gw7+4'}
)
**注意:如果使用的时sas当作extra_configs的话,改成`extra_configs = {'fs.azure.<container-name>.<blolb-name>.blob.core....`**
1.2查看挂载点的信息
- 查看所有的mount点
dbutils.fs.mounts()
1.3 按照指定格式读取文件
- 读取csv
path = '/mnt/blob108_bronze/Sales/emp.csv
df = spark.read.format('csv').load(path,header = True)
- 读取parquet
path = '/mnt/blob108_bronze/Sales/emp.parquet
df = spark.read.format('parquet').load(path,header = True)
- 读取delta
path = '/mnt/blob108_bronze/Sales/emp.parquet
df = spark.read.format('delta').load(path,header = True)
1.4 查看df
display(df)
- sql:
select * from df
2.select和filter
2.1 select()选择需要的数据
- 选择单独的列
#方法1:
df.select('SalesYTD').show(n=5)
#方法2:
from pyspark.sql.functions import col
df.select(col('SalesYTD')).show(n=5)
#方法3:
df.select(df.SalesYTD).show()
#方法4:
df.select(df['SalesYTD']).show()
- sql:
select SalesYTD from df
- 选择多列
df.select(['SalesYTD','Bonus'])
2.2 selectExpr()可以像sql一样处理column
1.使用aggregate
df.selectExpr('sum(SalesYTD)').show()
- sql:
select sum(SalesYTD) from df
- 字符串处理
df.selectExpr("CONCAT(month,year) AS MonthYear","UPPER(month) AS Capital_Months").show()
- 条件处理
df.selectExpr("State","CASE WHEN State='CA' THEN 'California' WHEN State='NY' THEN 'New York' END AS State_Name").show(n=5)
3.filter() 过滤
- 大于4000的
df.filter(df.SalesYTD>4000000).show()
- sql:
select SalesYTD from df where SalesYTD>4000000
- 多条件过滤
df.filter(df.SalesYTD>4000000 & df.Bonus<55000).show()
- sql:
select * from df where SalesYTD>4000000 and Bonus<55000
- 过滤字符串是否包含
df.filter(col('education').contain('degree').show()
4.等于和不等于
df.filter(df.region=='CA').show()
df.filter(~df.region=='CA').show()
- sql
select * from df where region != 'CA'
4. 处理Null,重复和聚合
4.1空值
4.1 查找空值
df.filter(df['SalesYTD'].isNull()).show()
4.2 删除/填充 空值
- 删除空值所在一行
df.dropna().show()
- 使用指定的值,填充空值的行
filled_df = df.fillna({"column_name": "value"})
filled_df.show()
4.2 重复
- 查看表的重复情况
duplicate_columns = df.groupBy("name", "dep_id").count().filter("count > 1").show()
- 根据分组删除重复;不加入上面的分组,会直接删除所有相同的行,留下一行
df_no_duplicates = df.dropDuplicates(["name", "dep_id"])
df_no_duplicates.orderBy('emp_id').show()
3.根据id排序,显示删除后的表
df_no_duplicates.orderBy('emp_id').show()
4.写入到文件夹
path = '/mnt/blob108_bronze/no_duplicate_emp/'
df_no_duplicates.write.format('parquet').mode('overwrite').save(path)
5. withcolumn() 创建新的列
1.创建一个新的列
from pyspark.sql.functions import concat
df.withColumn('new_info',concat(df.name , df.country)).show()
- sql :
select CONCAT(name,country) as new from df

image.png
- 将每个单词的开头变成大写
df.withColumn(df.name,initcap('df.name')).show()
- 使用when 来判断
df_with_category = df.withColumn("category", when(df.age < 25, "Young").when(df.age > 25,"new").otherwise("Old"))