Menu
1.创建FTP服务器
2.递归删除FTP服务器文件
3.递归下载FTP服务器文件
4.递归上传FTP服务器文件
1.创建FTP服务器
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
"""
Read permissions:
- "e" = change directory (CWD command)
- "l" = list files (LIST, NLST, STAT, MLSD, MLST, SIZE, MDTM commands)
- "r" = retrieve file from the server (RETR command)
Write permissions:
- "a" = append data to an existing file (APPE command)
- "d" = delete file or directory (DELE, RMD commands)
- "f" = rename file or directory (RNFR, RNTO commands)
- "m" = create directory (MKD command)
- "w" = store a file to the server (STOR, STOU commands)
- "M" = change file mode (SITE CHMOD command)
- "T" = update file last modified time (MFMT command)
"""
#新建一个用户组
authorizer = DummyAuthorizer()
#将用户名,密码,指定目录,权限 添加到里面
authorizer.add_user("lion", "asdf666", r"D:/Python", perm="elrd") #perm(权限)d=delete权限
print(help(authorizer.add_user))
#↓这个是添加匿名用户,任何人都可以访问,如果去掉的话,需要输入用户名和密码,可以自己尝试
#authorizer.add_anonymous(r"D:/Python")
handler = FTPHandler
handler.authorizer = authorizer
#开启服务器
server = FTPServer(("127.0.0.1", 21), handler)
server.serve_forever()
2.递归删除FTP服务器文件 (需要先开服务器,连到服务器ip和port)
import ftplib, getpass, sys, os
class FtpTools():
def config(self, site="127.0.0.1", port=21, username="lion", remortedir="remove"):
self.site=site
self.port = port
self.username = username
self.remortedir = remortedir
self.localdir = self.getlocaldir()
self.askyesnodel = self.askyesnodel()
self.remotepass = self.getpassword()
def getpassword(self):
try:
return getpass.getpass("input password")
except Exception as e:
print(e)
def getlocaldir(self):
return sys.argv[1] if len(sys.argv) > 1 else "."
def askyesnodel(self):
return input("Did you won't delete?") in ["y", "Y"]
def run(self, cleanfunc=lambda:None, transferact=lambda:None):
self.connectftp()
cleanfunc()
transferact()
self.ftpobj.quit()
def connectftp(self):
ftpobj = ftplib.FTP()
ftpobj.connect(self.site, self.port)
ftpobj.encoding = "utf-8"
ftpobj.login(self.username, self.remotepass)
ftpobj.cwd(self.remortedir)
print(help(ftpobj.dir))
self.ftpobj = ftpobj
class MyCleanAll(FtpTools):
def __init__(self):
self.fcount = self.dcount = 0
def cleanall(self):
longfiledir = []
shortfiledir = self.ftpobj.nlst()
self.ftpobj.dir(longfiledir.append)
for n in range(len(shortfiledir)):
if shortfiledir[n] in [".", ".."]:
continue
elif longfiledir[n][0] != "d":
print("delete file: %s" % shortfiledir[n])
self.ftpobj.delete(shortfiledir[n])
self.fcount += 1
else:
self.ftpobj.cwd(shortfiledir[n])
self.cleanall()
self.ftpobj.cwd("..")
self.ftpobj.rmd(shortfiledir[n])
self.dcount += 1
print("delete all over...")
if __name__ == "__main__":
ftp = MyCleanAll()
ftp.config()
ftp.run(ftp.cleanall)
print("del file %s, del dir %s" % (ftp.fcount, ftp.dcount))
3.递归下载ftp服务器文件 (需要先开FTPServer)
import ftplib, getpass, sys, os
class FtpTools():
"""
站点:默认站点
RemoteDir:默认给出
用户名:默认给出
LocalDir:getlocaldir方法Return,默认return“.”
password:getpassword方法里调用getpass,Return用户输入密码
"""
def config(self, site="127.0.0.1", port=21, username="lion", remortedir="received"):
self.site=site
self.port = port
self.username = username
self.remortedir = remortedir
self.localdir = self.getlocaldir()
self.askyesnodel = self.askyesnodel()
self.remotepass = self.getpassword()
def getpassword(self):
try:
return getpass.getpass("input password")
except Exception as e:
print(e)
def getlocaldir(self):
localdir = sys.argv[1] if len(sys.argv) > 1 else "Download"
self.mymkd(localdir)
return localdir
def mymkd(self, fname):
if not os.path.exists(fname):
os.mkdir(fname)
os.chdir(fname)
self.nowcwd = os.getcwd()
def askyesnodel(self):
return False
def run(self, cleanfunc=lambda:None, uploadfunc=lambda:None, downloadfunc=lambda:None):
self.connectftp()
cleanfunc()
uploadfunc()
downloadfunc()
self.ftpobj.quit()
def connectftp(self):
ftpobj = ftplib.FTP()
ftpobj.connect(self.site, self.port)
ftpobj.encoding = "utf-8"
ftpobj.login(self.username, self.remotepass)
ftpobj.cwd(self.remortedir)
self.ftpobj = ftpobj
def downloadone(self, filepath, filename):
localfileobj = open(filepath, "wb")
self.ftpobj.retrbinary("RETR " + filename, localfileobj.write)
self.fcount += 1
localfileobj.close()
class MyDownloadAll(FtpTools):
def __init__(self):
self.fcount = self.dcount = 0
def downloadall(self):
"""
retrbinary: retrieve(取回)
"""
longfilelist = []
self.ftpobj.dir(longfilelist.append)
shortfile = self.ftpobj.nlst()
for n in range(len(shortfile)):
filename = shortfile[n]
if filename in [".", ".."]:
continue
elif longfilelist[n][0] != "d":
filepath = os.path.join(self.nowcwd, filename)
self.downloadone(filepath, filename)
else:
self.ftpobj.cwd(filename) # server cwd
self.subdirpath = os.path.join(self.nowcwd, filename) # subdir path 1
self.mymkd(self.subdirpath)
os.chdir(self.subdirpath) # cwd subdir
self.downloadall()
self.ftpobj.cwd("..")
os.chdir("..")
self.dcount += 1
self.nowcwd = os.getcwd()
if __name__ == "__main__":
ftp = MyDownloadAll()
ftp.config()
ftp.run(downloadfunc=ftp.downloadall)
print("Download dir: %s, file: %s" % (ftp.dcount, ftp.fcount))
4.递归上传FTP服务器文件 (需要先开FTPServer)
import ftplib, getpass, sys, os
class FtpTools():
def config(self, site="127.0.0.1", port=21, username="lion", remotedir="received"):
self.site=site
self.port = port
self.username = username
self.remotedir = remotedir
self.localdir = self.getlocaldir()
self.askyesnodel = self.askyesnodel()
self.remotepass = self.getpassword()
def getpassword(self):
try:
return getpass.getpass("input password")
except Exception as e:
print(e)
def getlocaldir(self):
return sys.argv[1] if len(sys.argv) > 1 else "FTPUpload"
def askyesnodel(self):
return input("Did you won't delete?") in ["y", "Y"]
def run(self, downloadfunc=lambda:None, transferact=lambda:None):
self.connectftp()
downloadfunc()
transferact()
self.ftpobj.quit()
def connectftp(self):
ftpobj = ftplib.FTP()
ftpobj.connect(self.site, self.port)
ftpobj.encoding = "utf-8"
ftpobj.login(self.username, self.remotepass)
if self.remotedir not in ftpobj.nlst():
ftpobj.mkd(self.remotedir)
ftpobj.cwd(self.remotedir)
self.ftpobj = ftpobj
def uploadOne(self, localname, localpath, remotename):
localfile = open(localpath, 'rb')
self.ftpobj.storbinary('STOR ' + remotename, localfile)
localfile.close()
class MyUpLoadall(FtpTools):
def __init__(self):
self.fcount = self.dcount = 0
def askyesnodel(self):
return False # don't even ask
def uploadDir(self, localdir):
"""
for each directory in an entire tree
upload simple files, recur into subdirectories
"""
localfiles = os.listdir(localdir)
for localname in localfiles:
localpath = os.path.join(localdir, localname)
print('uploading', localpath, 'to', localname, end=' ')
if not os.path.isdir(localpath):
self.uploadOne(localname, localpath, localname)
self.fcount += 1
else:
try:
self.ftpobj.mkd(localname)
print('directory created')
except:
print('directory not created')
self.ftpobj.cwd(localname) # change remote dir
self.uploadDir(localpath) # upload local subdir
self.ftpobj.cwd('..') # change back up
self.dcount += 1
print('directory exited')
if __name__ == "__main__":
ftp = MyUpLoadall()
ftp.config()
ftp.run(transferact = lambda: ftp.uploadDir(ftp.localdir))
print("upload file %s, upload dir %s" % (ftp.fcount, ftp.dcount))