Spark数据清洗合集(最新)

1.根据blolb路径读取数据

  1. 文件路径:都是位于blob里
#csv
path = '/mnt/blob108_bronze/Sales/emp.csv
#parquet
path = '/mnt/blob108_bronze/Sales/emp.parquet
#delta
path = '/mnt/blob108_bronze/Sales/emp.parquet
  1. 读取数据,添加配置,推荐这种方法读取,其他的有可能不能读取delta数据
path_employee_csv = '/mnt/blob108_bronze/employee.csv'

df = spark.read.format('csv').options(inferSchema = 'True',header = True).load(path_employee_csv)
df.show(10)

2. 将df框架改为临时表,使用sql处理数据

  • 转为临时表
temp_table_name = 'employee'
df.creat
  • 使用sql
%sql 
select * from employee

3. 处理Na的值

3.1 删除某列含有空值的该行

cleaned_df = df.na.drop(subset=["gender"])

3.2 填充某列的空值/填充所有的空值

  • 填充gender列的空值
df.fillna({'gender':1})
  • 填充所有的空值为0
df.fillna(0)

4. 删除重复的行

  • 根据分组删除重复的行,且将最大的id最大的删除
df.sorted_df = df.orderBy(("id"))
unique_df= df.dropDuplicates(['column1', 'column2', 'column3'])

5. 使用withColumn对列进行修改

5.1 添加新的列

new_df = df.withColumn("new_column", df["old_column"] + 1)

5.2 修改原来的列

  • 修改sold_date列的时间格式
formatted_data = df.withColumn("sold_date", to_date(df["sold_date"], "yyyy-MM-dd"))

6. 写入数据到blob里

7. 使用sas/access key/key-value 对blob的container进行mount

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容