项目背景
之前项目中经常使用paramiko模块SFTP下载远端文件到本地,再读取本地文件进行相关操作后入库,最终再删除该本地文件。
思考优化
但是最近开始思考到,其实很多情况下是不需要保留SFTP下载到本地的文件的,只需一次性使用其中的数据而已。
查看了一下paramiko模块的API,paramiko
模块中下载数据的方法SFTPClient
类有get
和getfo
两种。前者则是我常用的下载文件到本地的方法,后者是将文件一次读取为数据流再写入到文件对象的方法。但都不符合我的预期要求。
我希望能安全的读取SFTP远端文件的数据对象到内存中使用,便有了这次尝试。既然要安全的读取远端文件,便不能把文件内容一次全部加载到内存,这里就需要考虑使用生成器。
代码实现
我的整体测试代码如下,借助sftp的open方法,实现一个按行读取远端大文件的生成器方法。
更新:参考源码的getfo方法,使用prefeach
预加载全部数据,否则实际使用过程中遇到大型文佳效率将会非常低。
import paramiko
class SftpTest(object):
def __init__(self):
self.host = 'Your sftp host'
self.port = 22
self.username = 'Your username'
self.password = 'Your password'
self.ssh = None
self.sftp = None
self.connect()
def __get_ssh(self):
try:
self.ssh = paramiko.Transport((self.host, self.port))
self.ssh.connect(username=self.username, password=self.password)
except Exception as e:
print(e)
def __get_sftp(self):
try:
self.sftp = paramiko.SFTPClient.from_transport(self.ssh)
except Exception as e:
print(e)
def connect(self):
self.__get_ssh()
self.__get_sftp()
def close(self):
if hasattr(self.sftp, "close") and callable(self.sftp.close):
self.sftp.close()
self.sftp = None
if hasattr(self.ssh, "close") and callable(self.ssh.close):
self.ssh.close()
self.ssh = None
# 借助sftp的open方法自定义实现一个生成器方法,返回一个按行读取远端文件的生成器对象
def generator_readfile(self, remotepath):
file_size = self.sftp.stat(remotepath).st_size
with self.sftp.open(remotepath, "r") as f:
f.prefetch(file_size)
while True:
line = f.readline()
if not line:
break
yield line
# 使用示例
if __name__ == '__main__':
sftp = SftpTest()
gen_obj = sftp.generator_readfile(remotepath="/root/download/test_dat2.DAT")
for i in gen_obj:
print(i, end="")
# do something
print("insert to MongoDB")
sftp.close()
这样就可以按行读取到远端文件数据,并对数据进行自定义的操作。输出示例如下:
2020-02-04;上海贸易有限公司0;2004;1423307;
insert to MongoDB
2020-02-04;上海贸易有限公司1;2004;4889789;
insert to MongoDB
2020-02-04;上海贸易有限公司2;2004;1577178;
insert to MongoDB
2020-02-04;上海贸易有限公司3;2001;7205864;
insert to MongoDB
2020-02-04;上海贸易有限公司4;2004;7202767;
insert to MongoDB
...
也可以利用生成器的特性,使用next()
方法对生成器进行读取操作:
if __name__ == '__main__':
sftp = SftpTest()
gen_obj = sftp.generator_readfile(remotepath="/root/download/test_dat2.DAT")
res = []
for i in range(1000):
line = next(gen_obj)
res.append(line)
# do something
print(res)
sftp.close()
输出如下:
['2020-02-04;上海贸易有限公司0;2004;1423307;\n', '2020-02-04;上海贸易有限公司1;2004;4889789;\n', '2020-02-04;上海贸易有限公司2;2004;1577178;\n', '2020-02-04;上海贸易有限公司3;2001;7205864;\n', '2020-02-04;上海贸易有限公司4;2004;7202767;\n', '2020-02-04;上海贸易有限公司5;2004;9012832;\n', '2020-02-04;上海贸易有限公司6;2001;2525190;\n', '2020-02-04;上海贸易有限公司7;2001;8152497;\n', '2020-02-04;上海贸易有限公司8;2004;2008408;\n', '2020-02-04;上海贸易有限公司9;2004;6432318;\n', '2020-02-04;上海贸易有限公司10;2004;8080234;\n', '2020-02-04;上海贸易有限公司11;2001;1511492;\n', '2020-02-04;上海贸易有限公司12;2001;5644331;\n', '2020-02-04;上海贸易有限公司13;2001;5638310;\n', '2020-02-04;上海贸易有限公司14;2001;2499542;\n', '2020-02-04;上海贸易有限公司15;2004;3782315;\n', '2020-02-04;上海贸易有限公司16;2001;5187995;\n', '2020-02-04;上海贸易有限公司17;2004;2797247;\n', '2020-02-04;上海贸易有限公司18;2004;7077140;\n', '2020-02-04;上海贸易有限公司19;2004;5229405;\n', '2020-02-04;上海贸易有限公司20;2001;3052820;\n', '2020-02-04;上海贸易有限公司21;2001;2084866;\n', '2020-02-04;上海贸易有限公司22;2001;6490240;\n', '2020-02-04;上海贸易有限公司23;2001;5979480;\n', '2020-02-04;上海贸易有限公司24;2004;9088521;\n',
...
]
总结一下
有了远端文件的生成器对象,不必担心文件太大内存不足的问题,也不再需要先下载文件到本地,对于大型文件可以节约很长的时间以及本地储存空间。
还可以做一系列的其余操作,比如对每行数据进行数据预处理后再yield,比如指定读取行数,比如按行将远端文件拆分到本地等等。
这些都可借助next()
方法调用生成器对象进行灵活的操作。