#!coding=utf-8
import os
import logging
from time import time
from boto3.session import Session
from multiprocessing import Pool
AWS_ACCESS_KEY_ID = '9E2B670F8631B13D'
AWS_SECRET_ACCESS_KEY = 'f8f6988f2fbf54b42b9ed3c74098c1194c29a651a'
ENDPOINT_URL = 'http://xxxxx.com'
LOGGING_FILE = '/Users/xxxx/Desktop/example.log'
logging.basicConfig(filename=LOGGING_FILE, level=logging.INFO)
class S3UploadFile():
'''
初始化参数:
bucket_name:
桶名字
upload_file_dir:
要上传文件的地址
bucket_dir:
本地上传目录截取地址。
示例:
upload_file_dir = '/root/images/aaa'
bucket_dir = 'root/' * 这里的 '/' 是必须得
在桶中创建的目录结构为 'images/aaa'
processes_number:
默认:10
启动的进程数。
所有的文件会根据进程数分组。
如:115个文件,进程数 10。则会把 115 个文件分成 10 组
'''
def __init__(self, upload_file_dir, bucket_name, bucket_dir, processes_number=10):
self.aws_access_key_id = AWS_ACCESS_KEY_ID
self.aws_secret_access_key = AWS_SECRET_ACCESS_KEY
self.endpoint_url = ENDPOINT_URL
self.processes_number = processes_number
self.bucket_name = bucket_name
self.upload_file_dir = upload_file_dir
self.bucket_dir = bucket_dir
self.all_obj_path = []
self.listdir(self.upload_file_dir)
def client_connection(self):
session = Session(aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key)
return session.client('s3', endpoint_url=self.endpoint_url)
def listdir(self, upload_file_dir):
for file in os.listdir(upload_file_dir):
file_path = os.path.join(upload_file_dir, file)
if os.path.isdir(file_path):
self.listdir(file_path)
else:
self.all_obj_path.append(file_path)
def upload_file(self, file_paths):
s3_client = self.client_connection()
pid_start_time = time()
pid = os.getpid()
logging.info('PID: %s; All Files: %s' % (pid, file_paths))
for file in file_paths:
start_time = time()
file_name = file.split(self.bucket_dir)[1]
try:
s3_client.upload_file(file, self.bucket_name, file_name, ExtraArgs={'ACL': 'public-read'})
logging.info('PID: %s File upload success: %s: time: %.2f' %
(pid, file_name, time() - start_time))
except Exception as error:
logging.info('File upload failed: %s; ' % (file, error))
logging.info('Upload End PID: %s; Total Time: %.2f' %
(os.getpid(), (time() - pid_start_time)))
def split_tasks(self):
obj_total = len(self.all_obj_path)
if obj_total < self.processes_number:
self.processes_number = obj_total
divide_list = [[] for i in range(self.processes_number)]
for i in range(0, obj_total):
divide_list[i % 10].append(self.all_obj_path[i])
return divide_list
def run(self):
po = Pool(self.processes_number)
task_list = self.split_tasks()
po.map(self.upload_file, (*task_list, ))
po.close()
po.join()
print("Finish uploading !!")
if __name__ == '__main__':
upload_file_dir = '/Users/xxxx/Desktop/xxxx'
bucket_name = 'xxxxx'
bucket_dir = 'Desktop/'
s3_upload = S3UploadFile(upload_file_dir, bucket_name, bucket_dir)
s3_upload.run()
使用 boto3 多进程批量上传文件
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 安装PHPEXCEL 1.安装命令:composer require phpoffice/phpexcel 引入 ...
- 最近在工作中需要把本地的图片上传到亚马逊对象存储S3中供外链访问。 为了更快的实现,使用了Python 接口的bo...
- 2019.3.20更新(将代码升级为非阻塞式多进程,效率极大提升)2019.6.28更新 (将代码模块化,复用性更...
- 今天是2016年6月9日,也是农历的端午节。 难得的小长假,转眼就到学期末,忙了一学期大概大家或回家过端午或在宿舍...
- el-upload组件在使用自动上传多文件模式时,会多次请求接口,为了避免多次请求,改为在同一个请求中同时上传多个...