对于大对象的存取,s3提供了分段上传/下载的接口,基于此,可以进一步实现多线程并行传输或者断点续传等功能。
本实现使用了亚马逊的boto库
https://pypi.python.org/pypi/boto
以及filechunkio库
https://pypi.python.org/pypi/filechunkio/
1.分段上传
为了分段上传一个大文件,需要先将文件分段,然后使用云盘提供的Multipart接口上传每个分段即可,最后云盘将在后端把所有分段合并成一个Object。
下面的例子中使用了FileChunkIO分段读取文件:
chunksize=4096*1024
chunkcnt=int(math.ceil(filesize*1.0/chunksize))
mp=bucket.initiate_multipart_upload("object-1") #创建Multipart对象
for i in range(0,chunkcnt):
offset=chunksize*i
len=min(chunksize,filesize-offset)
fp=FileChunkIO(“/path/to/file”,'r',offset=offset,bytes=len) #创建文件的分段
mp.upload_part_from_file(fp,part_num=i+1) #上传每个分段
mp.complete_upload()
完成分段上传之后,需要使用Multipart的complete_upload()或者cancel_upload()结束分段上传,释放Multipart占用的资源。
2.分段下载
为了使用分段下载,需要指定分段在文件中的起始偏移地址和终止偏移地址,然后构造包含Range报文头的HTTP Get请求下载相应的分段。
示例如下:
chunksize=4096*1024
chunkcnt=int(math.ceil(filesize*1.0/chunksize))
for i in range(0,chunkcnt):
offset=chunksize*i
len=min(chunksize,filesize-offset)
resp=conn.make_request("GET",bucket.name,filename,headers={"Range":"bytes=%d-%d" % (offset,offset+len)})
data=resp.read(len)
if data == "":
break
fp.write(data)
3.多线程的完整实现
import shutil
import math
import string
import io
from io import BytesIO
import os
from os import path
import sys
import traceback
import boto
import boto.s3.connection
from filechunkio import FileChunkIO
import threading
import Queue
import time
class Chunk:
num = 0
offset = 0
len = 0
def __init__(self, n, o, l):
self.num = n
self.offset = o
self.len = l
chunksize = 8 << 20
def init_queue(filesize):
chunkcnt = int(math.ceil(filesize*1.0/chunksize))
q = Queue.Queue(maxsize = chunkcnt)
for i in range(0,chunkcnt):
offset = chunksize*i
len = min(chunksize, filesize-offset)
c = Chunk(i+1, offset, len)
q.put(c)
return q
def upload_chunk(filepath, mp, q, id):
while (not q.empty()):
chunk = q.get()
fp = FileChunkIO(filepath, 'r', offset=chunk.offset, bytes=chunk.len)
mp.upload_part_from_file(fp, part_num=chunk.num)
fp.close()
q.task_done()
def upload_file_multipart(filepath, keyname, bucket, threadcnt=8):
filesize = os.stat(filepath).st_size
mp = bucket.initiate_multipart_upload(keyname)
q = init_queue(filesize)
for i in range(0, threadcnt):
t = threading.Thread(target=upload_chunk, args=(filepath, mp, q, i))
t.setDaemon(True)
t.start()
q.join()
mp.complete_upload()
def download_chunk(filepath, bucket, key, q, id):
while (not q.empty()):
chunk = q.get()
offset = chunk.offset
len = chunk.len
resp = bucket.connection.make_request("GET", bucket.name, key.name, headers={"Range":"bytes=%d-%d" % (offset, offset+len)})
data = resp.read(len)
fp = FileChunkIO(filepath, 'r+', offset=offset, bytes=len)
fp.write(data)
fp.close()
q.task_done()
def download_file_multipart(key, bucket, filepath, threadcnt=8):
if type(key) == str:
key=bucket.get_key(key)
filesize=key.size
if os.path.exists(filepath):
os.remove(filepath)
os.mknod(filepath)
q = init_queue(filesize)
for i in range(0, threadcnt):
t = threading.Thread(target=download_chunk, args=(filepath, bucket, key, q, i))
t.setDaemon(True)
t.start()
q.join()
access_key = "test"
secret_key = "123456"
host = "*****"
filepath = "/search/2G.file"
keyname = "2G.file"
threadcnt = 8
conn = boto.connect_s3(
aws_access_key_id = access_key,
aws_secret_access_key = secret_key,
host = host,
is_secure=False,
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
)
bucket = conn.get_bucket("test")
time1= time.time()
upload_file_multipart(filepath, keyname, bucket, threadcnt)
time2= time.time()
print "upload %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)
key = bucket.get_key(keyname)
download_filepath = path.join(".", keyname)
time1= time.time()
download_file_multipart(key, bucket, download_filepath, threadcnt)
time2= time.time()
print "download %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)