拉取镜像

拉取镜像脚本

#!/bin/bash
# =============================================================================
# 脚本名称: docker_pull_images.sh
# 脚本功能: 批量拉取容器镜像(从指定文件读取镜像列表)
# 容器工具: 支持 nerdctl / docker,自动检测,或通过 -c 指定
# 参数解析: 使用 getopts 处理命令行参数
# =============================================================================

set -u

# ---------- 颜色定义 ----------
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

# ---------- 默认参数 ----------
IMAGE_FILE=""
RETRY_COUNT=0
TIMEOUT=0
VERBOSE=false
CLI_TOOL=""           # 容器 CLI 工具,空值表示自动检测
NAMESPACE=""          # nerdctl 命名空间 (如 k8s.io),仅 nerdctl 生效

# ---------- 日志函数 ----------
log_info()    { echo -e "${BLUE}[INFO]${NC}    $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_success() { echo -e "${GREEN}[SUCCESS]${NC} $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_warn()    { echo -e "${YELLOW}[WARN]${NC}    $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_error()   { echo -e "${RED}[ERROR]${NC}   $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_debug()   { [[ "$VERBOSE" == "true" ]] && echo -e "${BLUE}[DEBUG]${NC}   $(date '+%Y-%m-%d %H:%M:%S') $*"; }

# ---------- 显示用法 ----------
usage() {
    cat <<EOF
用法: $(basename "$0") -f <镜像列表文件> [-c <cli>] [-n <namespace>] [-r <重试次数>] [-t <超时秒数>] [-v] [-h]

选项:
    -f <file>        镜像列表文件路径 (必填)
    -c <cli>         指定容器 CLI 工具: nerdctl 或 docker (默认: 自动检测,优先 nerdctl)
    -n <namespace>   nerdctl 命名空间 (如 k8s.io),仅当 cli 为 nerdctl 时生效
    -r <num>         单个镜像拉取失败后的重试次数 (默认: 0)
    -t <sec>         单个镜像拉取超时时间,单位秒 (默认: 0,不限制)
    -v               开启详细日志 (verbose)
    -h               显示此帮助信息

文件格式:
    每行一个镜像名称,支持 # 开头的注释行和空行

示例:
    $(basename "$0") -f images.txt
    $(basename "$0") -f images.txt -c nerdctl -n k8s.io
    $(basename "$0") -f images.txt -c nerdctl -r 3 -t 600 -v

EOF
}

# ---------- 使用 getopts 解析参数 ----------
while getopts ":f:c:n:r:t:vh" opt; do
    case "$opt" in
        f)
            IMAGE_FILE="$OPTARG"
            ;;
        c)
            if [[ "$OPTARG" != "nerdctl" && "$OPTARG" != "docker" ]]; then
                log_error "-c 参数只支持: nerdctl 或 docker,当前值: $OPTARG"
                exit 1
            fi
            CLI_TOOL="$OPTARG"
            ;;
        n)
            NAMESPACE="$OPTARG"
            ;;
        r)
            if ! [[ "$OPTARG" =~ ^[0-9]+$ ]]; then
                log_error "-r 参数必须是非负整数: $OPTARG"
                exit 1
            fi
            RETRY_COUNT="$OPTARG"
            ;;
        t)
            if ! [[ "$OPTARG" =~ ^[0-9]+$ ]]; then
                log_error "-t 参数必须是非负整数: $OPTARG"
                exit 1
            fi
            TIMEOUT="$OPTARG"
            ;;
        v)
            VERBOSE=true
            ;;
        h)
            usage
            exit 0
            ;;
        \?)
            log_error "未知选项: -$OPTARG"
            usage
            exit 1
            ;;
        :)
            log_error "选项 -$OPTARG 需要一个参数"
            usage
            exit 1
            ;;
    esac
done
shift $((OPTIND - 1))

# ---------- 参数校验 ----------
if [[ -z "$IMAGE_FILE" ]]; then
    log_error "缺少必填参数: -f <镜像列表文件>"
    usage
    exit 1
fi

if [[ ! -f "$IMAGE_FILE" ]]; then
    log_error "镜像列表文件不存在: $IMAGE_FILE"
    exit 1
fi

if [[ ! -r "$IMAGE_FILE" ]]; then
    log_error "镜像列表文件不可读: $IMAGE_FILE"
    exit 1
fi

if [[ -n "$CLI_TOOL" ]]; then
    if ! command -v "$CLI_TOOL" >/dev/null 2>&1; then
        log_error "未找到指定的容器 CLI 工具: $CLI_TOOL"
        exit 1
    fi
else
    # 自动检测:优先 nerdctl,其次 docker
    if command -v nerdctl >/dev/null 2>&1; then
        CLI_TOOL="nerdctl"
    elif command -v docker >/dev/null 2>&1; then
        CLI_TOOL="docker"
    else
        log_error "未找到 nerdctl 或 docker,请先安装其中之一,或使用 -c 指定"
        exit 1
    fi
fi

# namespace 仅对 nerdctl 生效
if [[ -n "$NAMESPACE" && "$CLI_TOOL" != "nerdctl" ]]; then
    log_warn "-n 命名空间参数仅在 nerdctl 下生效,当前 CLI 为 $CLI_TOOL,已忽略"
    NAMESPACE=""
fi

log_info "使用容器 CLI 工具: $CLI_TOOL${NAMESPACE:+ (namespace=$NAMESPACE)}"

# ---------- 拉取单个镜像(含重试/超时) ----------
pull_one_image() {
    local image="$1"
    local attempt=0
    local max_attempts=$((RETRY_COUNT + 1))
    local -a cli_cmd
    local -a cmd

    # 构建基础 CLI 命令(含 nerdctl 的 namespace)
    cli_cmd=("$CLI_TOOL")
    if [[ "$CLI_TOOL" == "nerdctl" && -n "$NAMESPACE" ]]; then
        cli_cmd+=("--namespace" "$NAMESPACE")
    fi

    while (( attempt < max_attempts )); do
        attempt=$((attempt + 1))
        log_debug "第 $attempt/$max_attempts 次尝试拉取: $image"

        if (( TIMEOUT > 0 )) && command -v timeout >/dev/null 2>&1; then
            cmd=(timeout "$TIMEOUT" "${cli_cmd[@]}" pull "$image")
        else
            cmd=("${cli_cmd[@]}" pull "$image")
        fi

        if "${cmd[@]}"; then
            return 0
        fi

        log_warn "第 $attempt 次拉取失败: $image"
        (( attempt < max_attempts )) && sleep 2
    done

    return 1
}

# ---------- 主流程 ----------
log_info "开始读取镜像列表文件: $IMAGE_FILE"
log_debug "参数: cli=$CLI_TOOL, namespace=${NAMESPACE:-<none>}, retry=$RETRY_COUNT, timeout=$TIMEOUT, verbose=$VERBOSE"

total=0
success=0
failed=0
failed_list=()
start_time=$(date +%s)

while IFS= read -r line || [[ -n "$line" ]]; do
    image=$(echo "$line" | sed 's/^[[:space:]]*//;s/[[:space:]]*$//')
    [[ -z "$image" ]] && continue
    [[ "$image" =~ ^# ]] && continue

    total=$((total + 1))
    log_info "[$total] 正在拉取镜像: $image"

    if pull_one_image "$image"; then
        log_success "[$total] 拉取成功: $image"
        success=$((success + 1))
    else
        log_error "[$total] 拉取失败: $image"
        failed=$((failed + 1))
        failed_list+=("$image")
    fi
    echo "----------------------------------------"
done < "$IMAGE_FILE"

elapsed=$(( $(date +%s) - start_time ))

# ---------- 结果汇总 ----------
echo
log_info "========== 拉取结果汇总 =========="
log_info "镜像总数: $total"
log_success "成功数量: $success"
if (( failed > 0 )); then
    log_error "失败数量: $failed"
    log_warn "失败镜像列表:"
    for img in "${failed_list[@]}"; do
        echo "  - $img"
    done
else
    log_info "失败数量: 0"
fi
log_info "总耗时: ${elapsed} 秒"
log_info "=================================="

(( failed == 0 )) && exit 0 || exit 1

拉取镜像脚本1

#!/bin/bash

# 镜像列表文件路径
IMAGE_FILE="${1:-images.txt}"

# 自动检测容器运行时
detect_runtime() {
    if command -v docker &> /dev/null; then
        echo "docker"
    elif command -v nerdctl &> /dev/null; then
        echo "nerdctl"
    elif command -v crictl &> /dev/null; then
        echo "crictl"
    elif command -v ctr &> /dev/null; then
        echo "ctr"
    else
        echo "none"
    fi
}

# 拉取镜像函数
pull_image() {
    local runtime=$1
    local image=$2

    case $runtime in
        docker)
            docker pull "$image"
            ;;
        nerdctl)
            nerdctl pull "$image"
            ;;
        crictl)
            crictl pull "$image"
            ;;
        ctr)
            ctr image pull "$image"
            ;;
        *)
            return 1
            ;;
    esac
}

# 检查文件是否存在
if [ ! -f "$IMAGE_FILE" ]; then
    echo "错误: 镜像列表文件 $IMAGE_FILE 不存在!"
    exit 1
fi

# 检测容器运行时
RUNTIME=$(detect_runtime)
if [ "$RUNTIME" = "none" ]; then
    echo "错误: 未找到任何容器运行时 (docker, nerdctl, crictl, ctr)"
    echo "请安装 Docker 或使用其他容器运行时"
    exit 1
fi

echo "检测到容器运行时: $RUNTIME"
echo "开始拉取镜像..."
echo "================================"

# 统计变量
total=0
success=0
failed=0

# 读取文件中的每一行
while IFS= read -r image || [ -n "$image" ]; do
    # 跳过空行和注释行
    if [[ -z "$image" ]] || [[ "$image" =~ ^[[:space:]]*# ]]; then
        continue
    fi

    # 去除行首行尾的空格
    image=$(echo "$image" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')

    total=$((total + 1))
    echo "[$total] 正在拉取镜像: $image"

    # 拉取镜像
    if pull_image "$RUNTIME" "$image"; then
        echo "✓ 成功拉取: $image"
        success=$((success + 1))
    else
        echo "✗ 拉取失败: $image"
        failed=$((failed + 1))
    fi

    echo "--------------------------------"

done < "$IMAGE_FILE"

# 输出统计结果
echo "================================"
echo "拉取完成!"
echo "总计: $total 个镜像"
echo "成功: $success 个"
echo "失败: $failed 个"

if [ $failed -gt 0 ]; then
    exit 1
else
    exit 0
fi

拉取镜像更新版本

#!/bin/bash

# 镜像列表文件路径
IMAGE_FILE="${1:-images.txt}"

# 目标镜像仓库前缀
TARGET_REGISTRY="hub.docker.io"

# 自动检测容器运行时
detect_runtime() {
    if command -v docker &> /dev/null; then
        echo "docker"
    elif command -v nerdctl &> /dev/null; then
        echo "nerdctl"
    elif command -v crictl &> /dev/null; then
        echo "crictl"
    elif command -v ctr &> /dev/null; then
        echo "ctr"
    else
        echo "none"
    fi
}

# 拉取镜像函数
pull_image() {
    local runtime=$1
    local image=$2

    case $runtime in
        docker)
            docker pull "$image"
            ;;
        nerdctl)
            nerdctl pull "$image"
            ;;
        crictl)
            crictl pull "$image"
            ;;
        ctr)
            ctr image pull "$image"
            ;;
        *)
            return 1
            ;;
    esac
}

# 替换镜像域名(只替换仓库域名,保留完整路径)
replace_image_registry() {
    local original_image=$1

    # 提取协议头(如果有)
    local protocol=""
    if [[ "$original_image" =~ ^https?:// ]]; then
        protocol=$(echo "$original_image" | grep -oE '^https?://')
        original_image=$(echo "$original_image" | sed -E 's|^https?://||')
    fi

    # 提取第一个 / 之前的部分(原仓库域名)
    # 剩余部分是路径(命名空间/镜像名:标签)
    local registry=$(echo "$original_image" | cut -d'/' -f1)
    local rest_path=$(echo "$original_image" | cut -d'/' -f2-)

    # 如果 rest_path 为空(即原镜像没有路径),则 rest_path 就是镜像名
    if [ -z "$rest_path" ]; then
        rest_path="$registry"
        registry=""
    fi

    # 组合新的镜像名:目标仓库域名 + 原路径
    echo "${protocol}${TARGET_REGISTRY}/${rest_path}"
}

# 检查文件是否存在
if [ ! -f "$IMAGE_FILE" ]; then
    echo "错误: 镜像列表文件 $IMAGE_FILE 不存在!"
    exit 1
fi

# 检测容器运行时
RUNTIME=$(detect_runtime)
if [ "$RUNTIME" = "none" ]; then
    echo "错误: 未找到任何容器运行时 (docker, nerdctl, crictl, ctr)"
    echo "请安装 Docker 或使用其他容器运行时"
    exit 1
fi

echo "检测到容器运行时: $RUNTIME"
echo "目标镜像仓库: $TARGET_REGISTRY"
echo "开始拉取镜像..."
echo "================================"

# 统计变量
total=0
success=0
failed=0

# 读取文件中的每一行
while IFS= read -r image || [ -n "$image" ]; do
    # 跳过空行和注释行
    if [[ -z "$image" ]] || [[ "$image" =~ ^[[:space:]]*# ]]; then
        continue
    fi

    # 去除行首行尾的空格
    original_image=$(echo "$image" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')

    # 替换镜像域名
    new_image=$(replace_image_registry "$original_image")

    total=$((total + 1))
    echo "[$total] 原始镜像: $original_image"
    echo "      替换为: $new_image"

    # 拉取镜像
    if pull_image "$RUNTIME" "$new_image"; then
        echo "✓ 成功拉取: $new_image"
        success=$((success + 1))
    else
        echo "✗ 拉取失败: $new_image"
        failed=$((failed + 1))
    fi

    echo "--------------------------------"

done < "$IMAGE_FILE"

# 输出统计结果
echo "================================"
echo "拉取完成!"
echo "总计: $total 个镜像"
echo "成功: $success 个"
echo "失败: $failed 个"

if [ $failed -gt 0 ]; then
    exit 1
else
    exit 0
fi

python版本

#!/usr/bin/env python3
"""
Docker 镜像迁移脚本
功能:从源仓库拉取镜像并推送到目标仓库
适用于生产环境,包含完整的错误处理、日志记录和资源清理
"""

import logging
import sys
import os
from pathlib import Path
from typing import List, Optional, Tuple
import subprocess
import argparse
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler(f'docker_migration_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
    ]
)
logger = logging.getLogger(__name__)


class DockerMigrationError(Exception):
    """自定义 Docker 迁移异常"""
    pass


class DockerImageMigrator:
    """Docker 镜像迁移工具类"""
    
    def __init__(self, image_list_file: str, source_domain: str, target_domain: str):
        """
        初始化镜像迁移器
        
        Args:
            image_list_file: 镜像列表文件路径
            source_domain: 源仓库域名
            target_domain: 目标仓库域名
        """
        self.image_list_file = Path(image_list_file)
        self.source_domain = source_domain
        self.target_domain = target_domain
        self.unified_file = None
        self.pulled_images = []
        self.tagged_images = []
        
    def validate_input_file(self) -> None:
        """验证输入文件是否存在且可读"""
        if not self.image_list_file.exists():
            raise DockerMigrationError(f"错误: 文件 {self.image_list_file} 不存在")
        if not self.image_list_file.is_file():
            raise DockerMigrationError(f"错误: {self.image_list_file} 不是一个文件")
        if not os.access(self.image_list_file, os.R_OK):
            raise DockerMigrationError(f"错误: 没有读取文件 {self.image_list_file} 的权限")
        logger.info(f"输入文件验证通过: {self.image_list_file}")
    
    def parse_image_list(self) -> List[str]:
        """
        解析镜像列表文件,过滤注释和空行,统一域名
        
        Returns:
            List[str]: 处理后的镜像列表
        """
        images = []
        try:
            with open(self.image_list_file, 'r', encoding='utf-8') as f:
                for line_num, line in enumerate(f, 1):
                    line = line.strip()
                    
                    # 跳过空行和注释行
                    if not line or line.startswith('#'):
                        continue
                    
                    # 统一域名: 将所有镜像域名替换为源域名
                    # 处理格式: registry/image:tag -> hub-vpc.jdcloud.com/image:tag
                    if '/' in line:
                        # 获取域名后的部分
                        parts = line.split('/', 1)
                        if len(parts) == 2:
                            image_path = parts[1]
                            unified_image = f"{self.source_domain}/{image_path}"
                        else:
                            logger.warning(f"第 {line_num} 行格式异常,跳过: {line}")
                            continue
                    else:
                        # 如果没有域名,直接添加源域名
                        unified_image = f"{self.source_domain}/{line}"
                    
                    images.append(unified_image)
                    
        except IOError as e:
            raise DockerMigrationError(f"读取文件失败: {e}")
        
        if not images:
            raise DockerMigrationError("镜像列表为空,没有可处理的镜像")
        
        logger.info(f"成功解析 {len(images)} 个镜像")
        return images
    
    def run_docker_command(self, command: List[str], error_msg: str = "Docker 命令执行失败") -> Tuple[bool, str]:
        """
        执行 Docker 命令并处理结果
        
        Args:
            command: Docker 命令列表
            error_msg: 失败时的错误消息
            
        Returns:
            Tuple[bool, str]: (是否成功, 输出信息)
        """
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                check=False,
                timeout=300  # 5分钟超时
            )
            
            if result.returncode == 0:
                return True, result.stdout.strip()
            else:
                error_output = result.stderr.strip()
                logger.error(f"{error_msg}: {error_output}")
                return False, error_output
                
        except subprocess.TimeoutExpired:
            raise DockerMigrationError(f"Docker 命令超时: {' '.join(command)}")
        except subprocess.SubprocessError as e:
            raise DockerMigrationError(f"Docker 命令执行异常: {e}")
        except Exception as e:
            raise DockerMigrationError(f"未知错误: {e}")
    
    def pull_image(self, image: str) -> bool:
        """
        拉取单个镜像
        
        Args:
            image: 镜像名称
            
        Returns:
            bool: 是否成功
        """
        logger.info(f"正在拉取: {image}")
        success, output = self.run_docker_command(
            ["docker", "pull", image],
            f"拉取失败: {image}"
        )
        
        if success:
            logger.info(f"✓ 成功拉取: {image}")
            self.pulled_images.append(image)
        else:
            logger.error(f"✗ 拉取失败: {image}")
            
        return success
    
    def pull_all_images(self, images: List[str]) -> None:
        """
        拉取所有镜像
        
        Args:
            images: 镜像列表
        """
        logger.info("开始拉取镜像...")
        failed_images = []
        
        for i, image in enumerate(images, 1):
            logger.info(f"进度: {i}/{len(images)}")
            if not self.pull_image(image):
                failed_images.append(image)
        
        if failed_images:
            raise DockerMigrationError(f"以下镜像拉取失败: {', '.join(failed_images)}")
        
        logger.info("所有镜像拉取完成!")
    
    def tag_and_push_image(self, source_image: str) -> bool:
        """
        标记并推送单个镜像
        
        Args:
            source_image: 源镜像名称
            
        Returns:
            bool: 是否成功
        """
        # 生成目标镜像名
        target_image = source_image.replace(self.source_domain, self.target_domain, 1)
        
        # 标记镜像
        logger.info(f"正在标记: {source_image} -> {target_image}")
        success, output = self.run_docker_command(
            ["docker", "tag", source_image, target_image],
            f"标记失败: {source_image}"
        )
        
        if not success:
            logger.error(f"✗ 标记失败: {source_image}")
            return False
        
        logger.info("✓ 标记成功")
        self.tagged_images.append(target_image)
        
        # 推送镜像
        logger.info(f"正在推送: {target_image}")
        success, output = self.run_docker_command(
            ["docker", "push", target_image],
            f"推送失败: {target_image}"
        )
        
        if success:
            logger.info(f"✓ 推送成功: {target_image}")
        else:
            logger.error(f"✗ 推送失败: {target_image}")
            
        return success
    
    def tag_and_push_all_images(self, images: List[str]) -> None:
        """
        标记并推送所有镜像
        
        Args:
            images: 源镜像列表
        """
        logger.info(f"开始重新标记并推送到 {self.target_domain}")
        failed_images = []
        
        for i, image in enumerate(images, 1):
            logger.info(f"进度: {i}/{len(images)}")
            if not self.tag_and_push_image(image):
                failed_images.append(image)
        
        if failed_images:
            raise DockerMigrationError(f"以下镜像处理失败: {', '.join(failed_images)}")
        
        logger.info("所有镜像推送完成!")
    
    def cleanup(self) -> None:
        """清理本地标记的镜像(可选)"""
        logger.info("清理本地镜像...")
        cleanup_errors = []
        
        for image in self.tagged_images:
            success, output = self.run_docker_command(
                ["docker", "rmi", "-f", image],
                f"清理镜像失败: {image}"
            )
            if not success:
                cleanup_errors.append(image)
        
        if cleanup_errors:
            logger.warning(f"部分镜像清理失败: {', '.join(cleanup_errors)}")
        else:
            logger.info("本地镜像清理完成")
    
    def run(self, cleanup_local: bool = False) -> None:
        """
        执行完整的镜像迁移流程
        
        Args:
            cleanup_local: 是否清理本地镜像
        """
        try:
            logger.info("=" * 60)
            logger.info("Docker 镜像迁移工具启动")
            logger.info(f"源仓库: {self.source_domain}")
            logger.info(f"目标仓库: {self.target_domain}")
            logger.info("=" * 60)
            
            # 1. 验证输入文件
            self.validate_input_file()
            
            # 2. 解析镜像列表
            images = self.parse_image_list()
            logger.info("\n待处理镜像列表:")
            for img in images:
                logger.info(f"  - {img}")
            
            # 3. 拉取镜像
            self.pull_all_images(images)
            
            # 4. 标记并推送镜像
            self.tag_and_push_all_images(images)
            
            # 5. 清理本地镜像(可选)
            if cleanup_local:
                self.cleanup()
            
            logger.info("=" * 60)
            logger.info("所有操作完成!")
            logger.info(f"成功处理 {len(images)} 个镜像")
            logger.info("=" * 60)
            
        except DockerMigrationError as e:
            logger.error(f"迁移过程出错: {e}")
            sys.exit(1)
        except KeyboardInterrupt:
            logger.warning("\n用户中断操作")
            sys.exit(130)
        except Exception as e:
            logger.error(f"未预期的错误: {e}", exc_info=True)
            sys.exit(1)


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="Docker 镜像迁移工具 - 从源仓库拉取镜像并推送到目标仓库",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
使用示例:
  python docker_migrate.py                              # 使用默认配置
  python docker_migrate.py -f myimages.txt              # 指定镜像列表文件
  python docker_migrate.py -s hub-vpc.jdcloud.com -t hub.dev86.cvessel.jdcloud.com
  python docker_migrate.py --cleanup                    # 执行后清理本地镜像
        """
    )
    
    parser.add_argument(
        '-f', '--file',
        default='imagelist',
        help='镜像列表文件路径 (默认: imagelist)'
    )
    
    parser.add_argument(
        '-s', '--source',
        default='hub-vpc.jdcloud.com',
        help='源仓库域名 (默认: hub-vpc.jdcloud.com)'
    )
    
    parser.add_argument(
        '-t', '--target',
        default='hub.dev86.cvessel.jdcloud.com',
        help='目标仓库域名 (默认: hub.dev86.cvessel.jdcloud.com)'
    )
    
    parser.add_argument(
        '--cleanup',
        action='store_true',
        help='执行完成后清理本地缓存的镜像'
    )
    
    parser.add_argument(
        '-v', '--verbose',
        action='store_true',
        help='显示详细日志'
    )
    
    args = parser.parse_args()
    
    # 设置日志级别
    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)
    
    # 检查 Docker 是否可用
    try:
        subprocess.run(["docker", "version"], capture_output=True, check=True, timeout=10)
    except (subprocess.SubprocessError, FileNotFoundError):
        logger.error("Docker 未安装或不可用,请先安装 Docker")
        sys.exit(1)
    
    # 创建迁移器实例并执行
    migrator = DockerImageMigrator(
        image_list_file=args.file,
        source_domain=args.source,
        target_domain=args.target
    )
    
    migrator.run(cleanup_local=args.cleanup)


if __name__ == "__main__":
    main()

拉取镜像python脚本2

#!/usr/bin/env python3
# =============================================================================
# 脚本名称: docker_pull_images.py
# 脚本功能: 批量拉取容器镜像(从指定文件读取镜像列表)
# 容器工具: 支持 nerdctl / docker,自动检测,或通过 -c 指定
# 适用环境: 生产环境
# =============================================================================

import argparse
import shutil
import subprocess
import sys
import time
from datetime import datetime
from typing import List, Optional


# ---------- 颜色定义 ----------
class Color:
    RED    = "\033[0;31m"
    GREEN  = "\033[0;32m"
    YELLOW = "\033[1;33m"
    BLUE   = "\033[0;34m"
    NC     = "\033[0m"


# ---------- 日志函数 ----------
def _ts() -> str:
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def log_info(msg: str) -> None:
    print(f"{Color.BLUE}[INFO]{Color.NC}    {_ts()} {msg}")

def log_success(msg: str) -> None:
    print(f"{Color.GREEN}[SUCCESS]{Color.NC} {_ts()} {msg}")

def log_warn(msg: str) -> None:
    print(f"{Color.YELLOW}[WARN]{Color.NC}    {_ts()} {msg}")

def log_error(msg: str) -> None:
    print(f"{Color.RED}[ERROR]{Color.NC}   {_ts()} {msg}", file=sys.stderr)

def log_debug(msg: str, verbose: bool = False) -> None:
    if verbose:
        print(f"{Color.BLUE}[DEBUG]{Color.NC}   {_ts()} {msg}")


# ---------- 参数解析 ----------
def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        prog="docker_pull_images.py",
        description="批量拉取容器镜像(从指定文件读取镜像列表)",
        formatter_class=argparse.RawTextHelpFormatter,
        epilog=(
            "文件格式:\n"
            "  每行一个镜像名称,支持 # 开头的注释行和空行\n\n"
            "示例:\n"
            "  %(prog)s -f images.txt\n"
            "  %(prog)s -f images.txt -c nerdctl -n k8s.io\n"
            "  %(prog)s -f images.txt -c docker -r 3 -t 600 -v\n"
        ),
    )
    parser.add_argument(
        "-f", "--file",
        required=True,
        metavar="<file>",
        help="镜像列表文件路径(必填)",
    )
    parser.add_argument(
        "-c", "--cli",
        choices=["nerdctl", "docker"],
        default=None,
        metavar="<cli>",
        help="指定容器 CLI 工具: nerdctl 或 docker(默认: 自动检测,优先 nerdctl)",
    )
    parser.add_argument(
        "-n", "--namespace",
        default="",
        metavar="<namespace>",
        help="nerdctl 命名空间(如 k8s.io),仅当 cli 为 nerdctl 时生效",
    )
    parser.add_argument(
        "-r", "--retry",
        type=int,
        default=0,
        metavar="<num>",
        help="单个镜像拉取失败后的重试次数(默认: 0)",
    )
    parser.add_argument(
        "-t", "--timeout",
        type=int,
        default=0,
        metavar="<sec>",
        help="单个镜像拉取超时时间,单位秒(默认: 0,不限制)",
    )
    parser.add_argument(
        "-v", "--verbose",
        action="store_true",
        help="开启详细日志",
    )
    return parser.parse_args()


# ---------- 自动检测 CLI 工具 ----------
def detect_cli(cli_arg: Optional[str]) -> str:
    if cli_arg:
        if not shutil.which(cli_arg):
            log_error(f"未找到指定的容器 CLI 工具: {cli_arg}")
            sys.exit(1)
        return cli_arg

    for tool in ("nerdctl", "docker"):
        if shutil.which(tool):
            return tool

    log_error("未找到 nerdctl 或 docker,请先安装其中之一,或使用 -c 指定")
    sys.exit(1)


# ---------- 读取镜像列表 ----------
def load_images(filepath: str) -> List[str]:
    images: List[str] = []
    try:
        with open(filepath, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                images.append(line)
    except PermissionError:
        log_error(f"镜像列表文件不可读: {filepath}")
        sys.exit(1)
    except FileNotFoundError:
        log_error(f"镜像列表文件不存在: {filepath}")
        sys.exit(1)
    return images


# ---------- 拉取单个镜像(含重试 / 超时) ----------
def pull_one_image(
    image: str,
    cli: str,
    namespace: str,
    retry: int,
    timeout: int,
    verbose: bool,
) -> bool:
    base_cmd: List[str] = [cli]
    if cli == "nerdctl" and namespace:
        base_cmd += ["--namespace", namespace]
    base_cmd += ["pull", image]

    max_attempts = retry + 1
    for attempt in range(1, max_attempts + 1):
        log_debug(f"第 {attempt}/{max_attempts} 次尝试拉取: {image}", verbose)
        try:
            kwargs = {"timeout": timeout if timeout > 0 else None}
            result = subprocess.run(
                base_cmd,
                **kwargs,
            )
            if result.returncode == 0:
                return True
        except subprocess.TimeoutExpired:
            log_warn(f"第 {attempt} 次拉取超时({timeout}s): {image}")
        except Exception as e:
            log_warn(f"第 {attempt} 次拉取异常: {image} -> {e}")
        else:
            log_warn(f"第 {attempt} 次拉取失败(返回码非 0): {image}")

        if attempt < max_attempts:
            time.sleep(2)

    return False


# ---------- 主流程 ----------
def main() -> None:
    args = parse_args()

    # 参数校验
    if args.retry < 0:
        log_error("-r/--retry 参数必须是非负整数")
        sys.exit(1)
    if args.timeout < 0:
        log_error("-t/--timeout 参数必须是非负整数")
        sys.exit(1)

    cli = detect_cli(args.cli)
    namespace = args.namespace

    # namespace 仅对 nerdctl 生效
    if namespace and cli != "nerdctl":
        log_warn(f"-n 命名空间参数仅在 nerdctl 下生效,当前 CLI 为 {cli},已忽略")
        namespace = ""

    log_info(f"使用容器 CLI 工具: {cli}" + (f" (namespace={namespace})" if namespace else ""))
    log_info(f"开始读取镜像列表文件: {args.file}")
    log_debug(
        f"参数: cli={cli}, namespace={namespace or '<none>'}, "
        f"retry={args.retry}, timeout={args.timeout}, verbose={args.verbose}",
        args.verbose,
    )

    images = load_images(args.file)
    if not images:
        log_warn("镜像列表为空,无需拉取")
        sys.exit(0)

    total = len(images)
    success = 0
    failed = 0
    failed_list: List[str] = []
    start_time = time.time()

    for idx, image in enumerate(images, start=1):
        log_info(f"[{idx}/{total}] 正在拉取镜像: {image}")
        if pull_one_image(image, cli, namespace, args.retry, args.timeout, args.verbose):
            log_success(f"[{idx}/{total}] 拉取成功: {image}")
            success += 1
        else:
            log_error(f"[{idx}/{total}] 拉取失败: {image}")
            failed += 1
            failed_list.append(image)
        print("-" * 40)

    elapsed = int(time.time() - start_time)

    # ---------- 结果汇总 ----------
    print()
    log_info("========== 拉取结果汇总 ==========")
    log_info(f"镜像总数: {total}")
    log_success(f"成功数量: {success}")
    if failed > 0:
        log_error(f"失败数量: {failed}")
        log_warn("失败镜像列表:")
        for img in failed_list:
            print(f"  - {img}")
    else:
        log_info("失败数量: 0")
    log_info(f"总耗时: {elapsed} 秒")
    log_info("==================================")

    sys.exit(0 if failed == 0 else 1)


if __name__ == "__main__":
    main()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容