S3上传下载

S3下载

# coding=utf-8
import os
import logging
import re

import boto3

global TEST

S3_FILE_CONF = {
    "ACCESS_KEY": "******",
    "SECRET_KEY": "*******",
    "END_POINT": "http://storage.***.com",
    "END_POINT-test": "http://test-storage.***..com",
    "BUCKET_NAME-test": "utopia-ai-card-asr",
    "BUCKET_NAME": "ke-cangjie-ai-card",
    "REGION_NAME": "cn-north-1",

    "DOWN_BUCKET_NAME": "utopia-ai-card-asr",
    "DOWN_FILE_NAME": "asr/original/text/C02E212009095_V2.1.15_20220624095937_AD000801C_MIX.json",
    "DOWN_LOCAL_FILE": "download",

    "GET_BUCKET_NAME": "utopia-ai-card-asr",
    "GET_FILE_NAME": "asr/original/text/",

    "UPLOAD_FILE_NAME": "/Users/meichaoyang/PycharmProjects/server_log400/utils/s3.py",
    "UPLOAD_BUCKET_NAME": "utopia-ai-card-asr",
    "UPLOAD_S3_DIR": "asr/original/text/",
}


class S3Helper(object):
    """
    需要下载boto3模块
    """

    def __init__(self, test=False):
        end_point_key = "END_POINT-test" if test else "END_POINT"
        self.bucket_name = S3_FILE_CONF.get("BUCKET_NAME-test") if test else S3_FILE_CONF.get("BUCKET_NAME")
        self.access_key = S3_FILE_CONF.get("ACCESS_KEY")
        self.secret_key = S3_FILE_CONF.get("SECRET_KEY")
        self.endpoint = S3_FILE_CONF.get(end_point_key)
        self.region_name = S3_FILE_CONF.get("REGION_NAME")

        # self.bucker_name = S3_FILE_CONF.get("BUCKET_NAME")

        self.client = boto3.client(
            service_name='s3',
            endpoint_url=self.endpoint,
            region_name=self.region_name,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key,
        )

    def download_file_s3(self, file_name, local_file, bucket_name=None):
        """
        从s3下载指定文件到本地
        需要本地运行程序的目录下新建一个local_file完整目录
        :param bucket_name: 桶名称
        :param file_name: 要下载的文件,所在路径
        :return: 下载完成返回True,下载出问题返回False,并打印错误
        """
        if bucket_name is None:
            bucket_name = self.bucket_name
        try:
            ret = self.client.download_file(bucket_name, file_name, local_file)
            logging.info(ret)
            return True
        except Exception as e:
            logging.error(f'download from s3 Error endpoint:{self.endpoint}, bucket_name:{bucket_name},'
                          f' file_name:{file_name},local_file:{local_file}, Exception:{e}')
            return False

    def get_list_s3(self, file_name, bucket_name=None):
        """
        用来列举出该目录下的所有文件
        :param bucket_name: 桶名称
        :param file_name: 要查询的文件夹
        :return: 该目录下所有文件列表
        """
        # 用来存放文件列表
        file_list = []

        if bucket_name is None:
            bucket_name = self.bucket_name
        response = self.client.list_objects_v2(
            Bucket=bucket_name,
            Delimiter='/',
            Prefix=file_name,
        )

        for file in response['Contents']:
            s = str(file['Key'])
            p = re.compile(r'.*/(.*)(\..*)')
            if p.search(s):
                s1 = p.search(s).group(1)
                s2 = p.search(s).group(2)
                result = s1 + s2
                file_list.append(result)
        return file_list

    def upload_file_s3(self, file_name, s3_dir, bucket_name=None):
        """
        上传本地文件到s3指定文件夹下
        :param file_name: 本地文件路径
        :param bucket_name: 桶名称
        :param s3_dir:要上传到的s3文件夹名称(以"/"结尾),或者文件名
        :return: 上传成功返回True,上传失败返回False,并打印错误
        """

        if bucket_name is None:
            bucket_name = self.bucket_name
        if len(os.path.basename(s3_dir)) < 1:
            s3_file = s3_dir + os.path.basename(file_name)
        else:
            s3_file = s3_dir
        logging.debug(f"s3_file:{s3_file}")
        try:
            self.client.upload_file(file_name, bucket_name, s3_file, ExtraArgs={'ACL': 'public-read'})
        except Exception as e:
            logging.error('出错了:' + str(e))
            raise e
        return s3_file


if __name__ == '__main__':
    LOG_FORMAT = "%(asctime)s - %(filename)s[line:%(lineno)d]%(funcName)s - %(levelname)s: %(message)s"
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
    s3 = S3Helper()
    res = s3.download_file_s3(S3_FILE_CONF["DOWN_FILE_NAME"], S3_FILE_CONF["DOWN_LOCAL_FILE"],
                              S3_FILE_CONF["DOWN_BUCKET_NAME"])
    logging.info(res)
    file_list = s3.get_list_s3(S3_FILE_CONF["GET_FILE_NAME"], S3_FILE_CONF["GET_BUCKET_NAME"])
    logging.info(file_list)
    res = s3.upload_file_s3(S3_FILE_CONF["UPLOAD_FILE_NAME"], S3_FILE_CONF["UPLOAD_S3_DIR"],
                            S3_FILE_CONF["UPLOAD_BUCKET_NAME"])
    logging.info(res)

    file_list = s3.get_list_s3("asr/correct/model/text/", S3_FILE_CONF["GET_BUCKET_NAME"])
    logging.info(file_list)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容