1.根据blolb路径读取数据
- 文件路径:都是位于blob里
#csv
path = '/mnt/blob108_bronze/Sales/emp.csv
#parquet
path = '/mnt/blob108_bronze/Sales/emp.parquet
#delta
path = '/mnt/blob108_bronze/Sales/emp.parquet
- 读取数据,添加配置,推荐这种方法读取,其他的有可能不能读取delta数据
path_employee_csv = '/mnt/blob108_bronze/employee.csv'
df = spark.read.format('csv').options(inferSchema = 'True',header = True).load(path_employee_csv)
df.show(10)
2. 将df框架改为临时表,使用sql处理数据
temp_table_name = 'employee'
df.creat
%sql
select * from employee
3. 处理Na的值
3.1 删除某列含有空值的该行
cleaned_df = df.na.drop(subset=["gender"])
3.2 填充某列的空值/填充所有的空值
df.fillna({'gender':1})
df.fillna(0)
4. 删除重复的行
df.sorted_df = df.orderBy(("id"))
unique_df= df.dropDuplicates(['column1', 'column2', 'column3'])
5. 使用withColumn对列进行修改
5.1 添加新的列
new_df = df.withColumn("new_column", df["old_column"] + 1)
5.2 修改原来的列
formatted_data = df.withColumn("sold_date", to_date(df["sold_date"], "yyyy-MM-dd"))
6. 写入数据到blob里
7. 使用sas/access key/key-value 对blob的container进行mount