Spark分析fsimage

"""
create table dev.bj5_hadoop_fsimage(
path string,
replication int,
modificationtime date,
accesstime date,
preferredblocksize int,
blockscount int,
filesize int,
nsquota int,
dsquota int,
permission string,
username string,
groupname string,
path0 string,
path1 string,
path2 string,
path3 string,
path4 string
)
PARTITIONED BY (pt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';
"""

from pyspark import SparkContext,HiveContext
import time
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import split,concat_ws

def load_file_to_spark(x):
if 'Replication' in x:
pass
else:
return x
if name == 'main':
table_name = ''
sc = SparkContext.getOrCreate()
s_time=time.strftime("%Y-%m", time.localtime())
spark = SparkSession.builder.enableHiveSupport().appName('pyspark').getOrCreate()
sqlContext = HiveContext(sc)
# lines = sc.textFile("/Users/pausky/Downloads/xml_file1")
lines = sc.textFile("/tmp/xml_file")
filt_lines = lines.filter(load_file_to_spark)
parts = filt_lines.map(lambda x: x.split('[<'))
hdfs_file_imager = parts.map(lambda p: Row(path=p[0], replication=p[1], modificationtime=p[2], accesstime=p[3],
preferredblocksize=p[4],blockscount=p[5],filesize=p[6], nsquota=p[7], dsquota=p[8],permission=p[9],
username=p[10],groupname=p[11]))
df_fsimage = spark.createDataFrame(hdfs_file_imager)
split_col = split(df_fsimage['path'], "/")
df_fsimage_with_muti_c = df_fsimage.withColumn('path0', split_col.getItem(1))
.withColumn('path1', split_col.getItem(2)).withColumn('path2', split_col.getItem(3))
.withColumn('path3', split_col.getItem(4)).withColumn('path4', split_col.getItem(5))
# df_fsimage_with_c = df_fsimage_with_muti_c.withColumn('same_c', concat_ws('/',df_fsimage_with_muti_c['path0'],
# df_fsimage_with_muti_c['path1'],
# df_fsimage_with_muti_c['path2'],
# df_fsimage_with_muti_c['path3'],
# df_fsimage_with_muti_c['path4']))
# df_fsimage_delete_c = df_fsimage_with_c.drop('path0').drop('path1').drop('path2').drop('path3').drop('path4')
# df_fsimage_delete_c.write.saveAsTable("dev.bj5_hadoop_fsimage", mode='overwrite')
# df_fsimage_delete_c.write.orc("/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/")
# df_fsimage_delete_c.write.option("path","/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/").saveAsTable('dev.bj5_hadoop_fsimage')
df_fsimage_with_muti_c.createOrReplaceTempView("bj5_hdfs_fsimage_table")
sqlContext.sql('insert overwrite table '+ table_name +' partition (pt = "'+s_time+'") select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,'
'filesize,nsquota,dsquota,permission,username,groupname,path0,path1,path2,path3,path4 from bj5_hdfs_fsimage_table')
# t = spark.sql('select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,filesize,nsquota,dsquota,permission,username,groupname,same_c from bj5_hdfs_fsimage_table ')
# df_fsimage_with_c.write.parquet('')
# t = spark.sql('select same_c,sum(FileSize)/1024/1024 as size from bj5_hdfs_fsimage_table where same_c="user/hive/123" group by same_c order by size ')
# print(t.show(10))

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容