远程传文件
- 从本地复制到远程
scp local_file remote_username@remote_ip:remote_folder
或者
scp local_file remote_username@remote_ip:remote_file
或者
scp local_file remote_ip:remote_folder
或者
scp local_file remote_ip:remote_file
- 从远程复制到本地
scp root@www.runoob.com:/home/root/others/music /home/space/music/1.mp3
scp -r www.runoob.com:/home/root/others/ /home/space/music/
读文件
- spark.read.csv()
可以读取csv、tsv、snappy压缩文件等
from pyspark.sql import types
# 设置字段schema
schema = types.StructType([
types.StructField('id', types.LongType()),
types.StructField('tag', types.StringType())])
df = spark.read.csv(results_path, sep='\t', schema=schema)
# 或者自带header
df = spark.read.csv(results_path, header=False, inferSchema=False, sep='\t')
写文件
- df.write.csv()
# 分区写文件
df.coalesce(1).write.csv(results_path, sep='\t', header=True, compression='none', mode='overwrite')
# 不分区写成单个文件
df.repartition(1).write.csv(results_path, sep='\t', header=False, compression='none', mode='overwrite')
列操作
- 保留列
df = df.select(['a', 'b'])
- 增加一个新列
# withColumn只能添加 df 已有列的变换
df = df.withColumn('a', col('b'))
- 删除dataframe某些列
df = df.drop('a', 'b', 'c')
- 更改列名
from pyspark.sql.functions import col
# 方法一
df = df.withColumnRenamed('a', col('b'))
# 方法二
mapping = dict(zip(['_c0'], ['gid']))
test = test.select([col(c).alias(mapping.get(c, c)) for c in test.columns])
- 替换列的字符串并生成新列
new_df = df.withColumn("vv_new", regexp_replace(col("vv"), "na", "nan")).drop('vv')
- 按列filter过滤
df = df.filter(df.tag_name.like('%文化%')) # 保留某个匹配值
groupby
- 聚合统计操作
df_count = df.groupby('id').count() # groupby数量
df2 = df_count.filter(df_count['count']>=50)
- 按某列groupby
id_list = vb_res.select('id').distinct().rdd.flatMap(lambda x: x).collect()
data = [vb_res.where(vb_res['id'] == id) for id in id_list]
print('id 数量:' + str(len(data)))
拼接两个dataframe
- 直接append,前提是schema相同
a = spark.createDataFrame([(1, 'xxx'), (2, 'xxx')], ['photo_id', 'caption'])
b = spark.createDataFrame([(1, 'xxx'), (2, 'xxx')], ['photo_id', 'caption'])
c = a.unionAll(b)
c.show()
- 按字段join
spark.conf.set("spark.sql.crossJoin.enabled", "true")
new_df = df.join(filter_data, df['gid']==filter_data['gid_1'])
new_df = new_df.drop('gid_1')
new_df
类型转换
- sql CAST函数
CAST (movie_score AS int)
- spark cast
from pyspark.sql import types
df_after = df.select(col("movie_score"), col("movie_score").cast(types.LongType()))
写hive表
- 不追加,直接覆盖xx.yy表
data.write.mode("overwrite").insertInto("xx.yy")
- sql insert 写hive表
# 使用普通的hive-sql写入分区表
spark.sql("""
insert overwrite table ai.da_aipurchase_dailysale_hive
partition (saledate)
select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate
from szy_aipurchase_tmp_szy_dailysale distribute by saledate
""")
# 先将dataframe注册成临时表,然后通过sql的方式插入
df.createOrReplaceTempView("temp_tab")
spark.sql("insert into zz_table select * from temp_tab")
- saveAsTable()
# 不写分区表,只是简单的导入到hive表
df.write.saveAsTable("xx.yy", None, "overwrite", None)
# 在hive表已有的表xx.yy中追加记录,按date分区
df.write.format("Hive").mode("append").saveAsTable("xx.yy", partitionBy='date')
- insertInto()
如果想要在不影响其他分区的情况下覆盖某个指定分区的数据,可以用insertInto()
# 1.首先在SparkSession设置config
config("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
# 2.将列对其之后
data.write.format("Hive").mode("overwrite").insertInto(table_name)
saveAsTable与insertInto的区别:
- saveAsTable——当hive中已经存在目标表,无论SaveMode是append还是overwrite,不需要schema一样,只要列名存在就会根据列名进行匹配覆盖数据
- insertInto——当hive中存在目标表时,无论SaveMode是append还是overwrite,需要当前DF的schema与目标表的schema必须一致,因为insertInto插入的时候,是根据列的位置插入,而不是根据列的名字
UDF
- 解析json数据并生成一列新列 - by line
@udf('array<struct<a:bigint, b:float>>') # 注册udf函数
def parse_dict_udf(s):
dic = json.loads(s)
return [{'a': int(a), 'b': d[a]} for a in dic]
df_transform = df.withColumn('b', explode(parse_dict_udf('a')))
- json数据解析 - by row
def convert_result_emb(row):
item = row.asDict()
result_dic = json.loads(item[u'result'].encode('utf-8'))
item['embedding'] = result_dic['embeding']
del item[u'result']
return Row(**item)
filter_data = spark.createDataFrame(data.rdd.map(convert_result_emb))
- 传入多个参数
def get_text(title, name):
if not name: name = ''
if not title: title = ''
return title + name
func = udf(get_text, types.StringType()) # 注册udf函数
df = df.withColumn("text", func(df.title, df.name))
- 传入字典/tuple等特殊数据类型
## 方法一
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def translate(mapping):
def translate_(col):
return mapping.get(col)
return udf(translate_, StringType())
df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
mapping = {
'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S',
'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
df.withColumn("value", translate(mapping)("key"))
## 方法二 (Spark >= 2.0, Spark < 3.0)
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
df.withColumn("value", mapping_expr.getItem(col("key")))
## 方法二 (Spark >= 3.0)
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
df.withColumn("value", mapping_expr[col("key")]).show()
参考:
PySpark create new column with mapping from a dict
Pyspark-UDF函数的使用、UDF传入多个参数、UDF传出多个参数、传入特殊数据类型
- pyspark去重
# distinct
# reduceByKey
rdd.reduceByKey(lambda x,y:x)
# drop_duplicates
df.dropDuplicates([col_name1, col_name2])
df.drop_duplicates([col_name1, col_name2])
- pyspark排序
spark.createDataFrame(dfall).orderBy(desc('datestr')) # 降序
spark.createDataFrame(dfall).orderBy(df.datestr.desc()) # 降序
- 排序+去重
# 方法一:sort+drop_duplicates
df = df.sort(['movie_score'], ascending=False)
new_df = df.drop_duplicates(['item_id'])
# 方法二:开窗函数
from pyspark.sql import Window
from pyspark.sql.functions import rank
window = Window.partitionBy(['item_id']).orderBy(['movie_score'])
df_1 = df.withColumn('rank', rank().over(window))
window = Window.partitionBy("col1").orderBy("datestr")
df_1 = df.withColumn('rank', rank().over(window)) # 保留所有排序
df_1 = df.withColumn('rank', rank().over(window)).filter(col('rank') == 1).drop('rank') # 保留第一行
- 对某列加和
# 方法一
df.groupBy().sum().collect()[0][0]
# 方法二
sum_number = df.agg({"a":"sum"}).collect()[0]
result = sum_number["sum(a)"]
- 生成libsvm格式文件
def to_libsvm(row):
item = row.asDict()
new_item = {}
feat_txt = ' '.join(['{}:{}'.format(i, t[1]) for i,t in enumerate(item.items())])
libsvm_txt = '{} {}'.format(item['label'], feat_txt)
new_item['_c0'] = libsvm_txt
return Row(**new_item)
final_df = spark.createDataFrame(new_df.rdd.map(to_libsvm))
OneHotEncoder
- 不使用pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "value"])
stringIndexer = StringIndexer(inputCol="value", outputCol="valueIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="valueIndex", outputCol="valueIndexVec")
encoded = encoder.transform(indexed)
encoded.show()
- 使用pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.functions import col
# setHandleInvalid(“keep”) 防止无知新数据报错
stringIndexer = StringIndexer(inputCol="_c112", outputCol="_c112_indexed").setHandleInvalid("keep")
encoder = OneHotEncoder(inputCol="_c112_indexed", outputCol="poi_type_vec")
pipeline = Pipeline(stages=[stringIndexer, encoder])
model = pipeline.fit(geo_df)
transformed = model.transform(geo_df)
transformed.select('poi_type_vec').show(1, False)
Tips:
- row_number() | rank() | dense_rank() 的区别
row_number 1 2 3
rank 1 2 2
dense_rank 1 1 3