从hdfs读写 parquet 、textfile 、 csv 、 xlsx四种文件格式代码简写后如下:
import io
import os
import uuid
import hdfs
import pandas as pd
from hdfs.ext.kerberos import KerberosClient
from krbcontext import krbcontext
from hdfs import HdfsError
def Myhdfs():
def __init__(self,url):
self.client = self.get_client(url)
def get_client(url):
with krbcontext(using_keytab=True,keytab_file='keytab文件地址',principal='用户名',ccache_file='缓存文件'):
hclient = KerberosClient(url,session='')
return hclient
def df_writeto_parquet(self,df,path):
if df.empty:
return False
path = os.path.join(path,str(uuid.uuid4()))
byt = io.BytesIO()
df.to_parquet(byt)
self.client.write(path,byt.getvalue(),True)
return True
def df_writeto_textfile(self,df,path):
path = os.path.join(path,str(uuid.uuid4()))
rest = '\n'.join([','.join(row) for n,row in df.iterrows()])
self.client.write(path,rest)
return True
def df_readfrom_hdfs(self,path,format = 'textfile',columns = None):
if format =='textfile':
if columns:
with self.client.read(path) as reader:
be = str(reader.read(),'utf-8')
data = map(lambda s:s.split('\001'),be.split('\n'))
df = pd.DataFrame(data=data,columns=columns)
return df
else:
raise Exception('columns 不能为空')
elif format =='parquet':
with self.client.read(path) as reader:
be = io.BytesIO(reader.read())
df = pd.read_parquet(be)
return df
elif format =='csv':
for code in ['gbk','utf-8','gb18030']:
try:
with self.client.read(path,encoding = code) as reader:
df = pd.read_csv(reader)
return df
except UnicodeEncodeError:
continue
except Exception as e:
print(e)
continue
elif format == 'xls':
with self.client.read(path) as reader:
df = pd.read_excel(reader.read())
return df
def show_files(self,path,filelist):
files = self.client.list(path)
for file in files:
curpath = os.path.join(path,file)
try:
self.show_files(curpath,filelist)
except HdfsError as e:
filelist.append(curpath)
except Exception as e:
print(e)
return filelist