拉取镜像脚本
#!/bin/bash
# =============================================================================
# 脚本名称: docker_pull_images.sh
# 脚本功能: 批量拉取容器镜像(从指定文件读取镜像列表)
# 容器工具: 支持 nerdctl / docker,自动检测,或通过 -c 指定
# 参数解析: 使用 getopts 处理命令行参数
# =============================================================================
set -u
# ---------- 颜色定义 ----------
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
# ---------- 默认参数 ----------
IMAGE_FILE=""
RETRY_COUNT=0
TIMEOUT=0
VERBOSE=false
CLI_TOOL="" # 容器 CLI 工具,空值表示自动检测
NAMESPACE="" # nerdctl 命名空间 (如 k8s.io),仅 nerdctl 生效
# ---------- 日志函数 ----------
log_info() { echo -e "${BLUE}[INFO]${NC} $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_success() { echo -e "${GREEN}[SUCCESS]${NC} $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_error() { echo -e "${RED}[ERROR]${NC} $(date '+%Y-%m-%d %H:%M:%S') $*"; }
log_debug() { [[ "$VERBOSE" == "true" ]] && echo -e "${BLUE}[DEBUG]${NC} $(date '+%Y-%m-%d %H:%M:%S') $*"; }
# ---------- 显示用法 ----------
usage() {
cat <<EOF
用法: $(basename "$0") -f <镜像列表文件> [-c <cli>] [-n <namespace>] [-r <重试次数>] [-t <超时秒数>] [-v] [-h]
选项:
-f <file> 镜像列表文件路径 (必填)
-c <cli> 指定容器 CLI 工具: nerdctl 或 docker (默认: 自动检测,优先 nerdctl)
-n <namespace> nerdctl 命名空间 (如 k8s.io),仅当 cli 为 nerdctl 时生效
-r <num> 单个镜像拉取失败后的重试次数 (默认: 0)
-t <sec> 单个镜像拉取超时时间,单位秒 (默认: 0,不限制)
-v 开启详细日志 (verbose)
-h 显示此帮助信息
文件格式:
每行一个镜像名称,支持 # 开头的注释行和空行
示例:
$(basename "$0") -f images.txt
$(basename "$0") -f images.txt -c nerdctl -n k8s.io
$(basename "$0") -f images.txt -c nerdctl -r 3 -t 600 -v
EOF
}
# ---------- 使用 getopts 解析参数 ----------
while getopts ":f:c:n:r:t:vh" opt; do
case "$opt" in
f)
IMAGE_FILE="$OPTARG"
;;
c)
if [[ "$OPTARG" != "nerdctl" && "$OPTARG" != "docker" ]]; then
log_error "-c 参数只支持: nerdctl 或 docker,当前值: $OPTARG"
exit 1
fi
CLI_TOOL="$OPTARG"
;;
n)
NAMESPACE="$OPTARG"
;;
r)
if ! [[ "$OPTARG" =~ ^[0-9]+$ ]]; then
log_error "-r 参数必须是非负整数: $OPTARG"
exit 1
fi
RETRY_COUNT="$OPTARG"
;;
t)
if ! [[ "$OPTARG" =~ ^[0-9]+$ ]]; then
log_error "-t 参数必须是非负整数: $OPTARG"
exit 1
fi
TIMEOUT="$OPTARG"
;;
v)
VERBOSE=true
;;
h)
usage
exit 0
;;
\?)
log_error "未知选项: -$OPTARG"
usage
exit 1
;;
:)
log_error "选项 -$OPTARG 需要一个参数"
usage
exit 1
;;
esac
done
shift $((OPTIND - 1))
# ---------- 参数校验 ----------
if [[ -z "$IMAGE_FILE" ]]; then
log_error "缺少必填参数: -f <镜像列表文件>"
usage
exit 1
fi
if [[ ! -f "$IMAGE_FILE" ]]; then
log_error "镜像列表文件不存在: $IMAGE_FILE"
exit 1
fi
if [[ ! -r "$IMAGE_FILE" ]]; then
log_error "镜像列表文件不可读: $IMAGE_FILE"
exit 1
fi
if [[ -n "$CLI_TOOL" ]]; then
if ! command -v "$CLI_TOOL" >/dev/null 2>&1; then
log_error "未找到指定的容器 CLI 工具: $CLI_TOOL"
exit 1
fi
else
# 自动检测:优先 nerdctl,其次 docker
if command -v nerdctl >/dev/null 2>&1; then
CLI_TOOL="nerdctl"
elif command -v docker >/dev/null 2>&1; then
CLI_TOOL="docker"
else
log_error "未找到 nerdctl 或 docker,请先安装其中之一,或使用 -c 指定"
exit 1
fi
fi
# namespace 仅对 nerdctl 生效
if [[ -n "$NAMESPACE" && "$CLI_TOOL" != "nerdctl" ]]; then
log_warn "-n 命名空间参数仅在 nerdctl 下生效,当前 CLI 为 $CLI_TOOL,已忽略"
NAMESPACE=""
fi
log_info "使用容器 CLI 工具: $CLI_TOOL${NAMESPACE:+ (namespace=$NAMESPACE)}"
# ---------- 拉取单个镜像(含重试/超时) ----------
pull_one_image() {
local image="$1"
local attempt=0
local max_attempts=$((RETRY_COUNT + 1))
local -a cli_cmd
local -a cmd
# 构建基础 CLI 命令(含 nerdctl 的 namespace)
cli_cmd=("$CLI_TOOL")
if [[ "$CLI_TOOL" == "nerdctl" && -n "$NAMESPACE" ]]; then
cli_cmd+=("--namespace" "$NAMESPACE")
fi
while (( attempt < max_attempts )); do
attempt=$((attempt + 1))
log_debug "第 $attempt/$max_attempts 次尝试拉取: $image"
if (( TIMEOUT > 0 )) && command -v timeout >/dev/null 2>&1; then
cmd=(timeout "$TIMEOUT" "${cli_cmd[@]}" pull "$image")
else
cmd=("${cli_cmd[@]}" pull "$image")
fi
if "${cmd[@]}"; then
return 0
fi
log_warn "第 $attempt 次拉取失败: $image"
(( attempt < max_attempts )) && sleep 2
done
return 1
}
# ---------- 主流程 ----------
log_info "开始读取镜像列表文件: $IMAGE_FILE"
log_debug "参数: cli=$CLI_TOOL, namespace=${NAMESPACE:-<none>}, retry=$RETRY_COUNT, timeout=$TIMEOUT, verbose=$VERBOSE"
total=0
success=0
failed=0
failed_list=()
start_time=$(date +%s)
while IFS= read -r line || [[ -n "$line" ]]; do
image=$(echo "$line" | sed 's/^[[:space:]]*//;s/[[:space:]]*$//')
[[ -z "$image" ]] && continue
[[ "$image" =~ ^# ]] && continue
total=$((total + 1))
log_info "[$total] 正在拉取镜像: $image"
if pull_one_image "$image"; then
log_success "[$total] 拉取成功: $image"
success=$((success + 1))
else
log_error "[$total] 拉取失败: $image"
failed=$((failed + 1))
failed_list+=("$image")
fi
echo "----------------------------------------"
done < "$IMAGE_FILE"
elapsed=$(( $(date +%s) - start_time ))
# ---------- 结果汇总 ----------
echo
log_info "========== 拉取结果汇总 =========="
log_info "镜像总数: $total"
log_success "成功数量: $success"
if (( failed > 0 )); then
log_error "失败数量: $failed"
log_warn "失败镜像列表:"
for img in "${failed_list[@]}"; do
echo " - $img"
done
else
log_info "失败数量: 0"
fi
log_info "总耗时: ${elapsed} 秒"
log_info "=================================="
(( failed == 0 )) && exit 0 || exit 1
拉取镜像脚本1
#!/bin/bash
# 镜像列表文件路径
IMAGE_FILE="${1:-images.txt}"
# 自动检测容器运行时
detect_runtime() {
if command -v docker &> /dev/null; then
echo "docker"
elif command -v nerdctl &> /dev/null; then
echo "nerdctl"
elif command -v crictl &> /dev/null; then
echo "crictl"
elif command -v ctr &> /dev/null; then
echo "ctr"
else
echo "none"
fi
}
# 拉取镜像函数
pull_image() {
local runtime=$1
local image=$2
case $runtime in
docker)
docker pull "$image"
;;
nerdctl)
nerdctl pull "$image"
;;
crictl)
crictl pull "$image"
;;
ctr)
ctr image pull "$image"
;;
*)
return 1
;;
esac
}
# 检查文件是否存在
if [ ! -f "$IMAGE_FILE" ]; then
echo "错误: 镜像列表文件 $IMAGE_FILE 不存在!"
exit 1
fi
# 检测容器运行时
RUNTIME=$(detect_runtime)
if [ "$RUNTIME" = "none" ]; then
echo "错误: 未找到任何容器运行时 (docker, nerdctl, crictl, ctr)"
echo "请安装 Docker 或使用其他容器运行时"
exit 1
fi
echo "检测到容器运行时: $RUNTIME"
echo "开始拉取镜像..."
echo "================================"
# 统计变量
total=0
success=0
failed=0
# 读取文件中的每一行
while IFS= read -r image || [ -n "$image" ]; do
# 跳过空行和注释行
if [[ -z "$image" ]] || [[ "$image" =~ ^[[:space:]]*# ]]; then
continue
fi
# 去除行首行尾的空格
image=$(echo "$image" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')
total=$((total + 1))
echo "[$total] 正在拉取镜像: $image"
# 拉取镜像
if pull_image "$RUNTIME" "$image"; then
echo "✓ 成功拉取: $image"
success=$((success + 1))
else
echo "✗ 拉取失败: $image"
failed=$((failed + 1))
fi
echo "--------------------------------"
done < "$IMAGE_FILE"
# 输出统计结果
echo "================================"
echo "拉取完成!"
echo "总计: $total 个镜像"
echo "成功: $success 个"
echo "失败: $failed 个"
if [ $failed -gt 0 ]; then
exit 1
else
exit 0
fi
拉取镜像更新版本
#!/bin/bash
# 镜像列表文件路径
IMAGE_FILE="${1:-images.txt}"
# 目标镜像仓库前缀
TARGET_REGISTRY="hub.docker.io"
# 自动检测容器运行时
detect_runtime() {
if command -v docker &> /dev/null; then
echo "docker"
elif command -v nerdctl &> /dev/null; then
echo "nerdctl"
elif command -v crictl &> /dev/null; then
echo "crictl"
elif command -v ctr &> /dev/null; then
echo "ctr"
else
echo "none"
fi
}
# 拉取镜像函数
pull_image() {
local runtime=$1
local image=$2
case $runtime in
docker)
docker pull "$image"
;;
nerdctl)
nerdctl pull "$image"
;;
crictl)
crictl pull "$image"
;;
ctr)
ctr image pull "$image"
;;
*)
return 1
;;
esac
}
# 替换镜像域名(只替换仓库域名,保留完整路径)
replace_image_registry() {
local original_image=$1
# 提取协议头(如果有)
local protocol=""
if [[ "$original_image" =~ ^https?:// ]]; then
protocol=$(echo "$original_image" | grep -oE '^https?://')
original_image=$(echo "$original_image" | sed -E 's|^https?://||')
fi
# 提取第一个 / 之前的部分(原仓库域名)
# 剩余部分是路径(命名空间/镜像名:标签)
local registry=$(echo "$original_image" | cut -d'/' -f1)
local rest_path=$(echo "$original_image" | cut -d'/' -f2-)
# 如果 rest_path 为空(即原镜像没有路径),则 rest_path 就是镜像名
if [ -z "$rest_path" ]; then
rest_path="$registry"
registry=""
fi
# 组合新的镜像名:目标仓库域名 + 原路径
echo "${protocol}${TARGET_REGISTRY}/${rest_path}"
}
# 检查文件是否存在
if [ ! -f "$IMAGE_FILE" ]; then
echo "错误: 镜像列表文件 $IMAGE_FILE 不存在!"
exit 1
fi
# 检测容器运行时
RUNTIME=$(detect_runtime)
if [ "$RUNTIME" = "none" ]; then
echo "错误: 未找到任何容器运行时 (docker, nerdctl, crictl, ctr)"
echo "请安装 Docker 或使用其他容器运行时"
exit 1
fi
echo "检测到容器运行时: $RUNTIME"
echo "目标镜像仓库: $TARGET_REGISTRY"
echo "开始拉取镜像..."
echo "================================"
# 统计变量
total=0
success=0
failed=0
# 读取文件中的每一行
while IFS= read -r image || [ -n "$image" ]; do
# 跳过空行和注释行
if [[ -z "$image" ]] || [[ "$image" =~ ^[[:space:]]*# ]]; then
continue
fi
# 去除行首行尾的空格
original_image=$(echo "$image" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')
# 替换镜像域名
new_image=$(replace_image_registry "$original_image")
total=$((total + 1))
echo "[$total] 原始镜像: $original_image"
echo " 替换为: $new_image"
# 拉取镜像
if pull_image "$RUNTIME" "$new_image"; then
echo "✓ 成功拉取: $new_image"
success=$((success + 1))
else
echo "✗ 拉取失败: $new_image"
failed=$((failed + 1))
fi
echo "--------------------------------"
done < "$IMAGE_FILE"
# 输出统计结果
echo "================================"
echo "拉取完成!"
echo "总计: $total 个镜像"
echo "成功: $success 个"
echo "失败: $failed 个"
if [ $failed -gt 0 ]; then
exit 1
else
exit 0
fi
python版本
#!/usr/bin/env python3
"""
Docker 镜像迁移脚本
功能:从源仓库拉取镜像并推送到目标仓库
适用于生产环境,包含完整的错误处理、日志记录和资源清理
"""
import logging
import sys
import os
from pathlib import Path
from typing import List, Optional, Tuple
import subprocess
import argparse
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(f'docker_migration_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
]
)
logger = logging.getLogger(__name__)
class DockerMigrationError(Exception):
"""自定义 Docker 迁移异常"""
pass
class DockerImageMigrator:
"""Docker 镜像迁移工具类"""
def __init__(self, image_list_file: str, source_domain: str, target_domain: str):
"""
初始化镜像迁移器
Args:
image_list_file: 镜像列表文件路径
source_domain: 源仓库域名
target_domain: 目标仓库域名
"""
self.image_list_file = Path(image_list_file)
self.source_domain = source_domain
self.target_domain = target_domain
self.unified_file = None
self.pulled_images = []
self.tagged_images = []
def validate_input_file(self) -> None:
"""验证输入文件是否存在且可读"""
if not self.image_list_file.exists():
raise DockerMigrationError(f"错误: 文件 {self.image_list_file} 不存在")
if not self.image_list_file.is_file():
raise DockerMigrationError(f"错误: {self.image_list_file} 不是一个文件")
if not os.access(self.image_list_file, os.R_OK):
raise DockerMigrationError(f"错误: 没有读取文件 {self.image_list_file} 的权限")
logger.info(f"输入文件验证通过: {self.image_list_file}")
def parse_image_list(self) -> List[str]:
"""
解析镜像列表文件,过滤注释和空行,统一域名
Returns:
List[str]: 处理后的镜像列表
"""
images = []
try:
with open(self.image_list_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# 跳过空行和注释行
if not line or line.startswith('#'):
continue
# 统一域名: 将所有镜像域名替换为源域名
# 处理格式: registry/image:tag -> hub-vpc.jdcloud.com/image:tag
if '/' in line:
# 获取域名后的部分
parts = line.split('/', 1)
if len(parts) == 2:
image_path = parts[1]
unified_image = f"{self.source_domain}/{image_path}"
else:
logger.warning(f"第 {line_num} 行格式异常,跳过: {line}")
continue
else:
# 如果没有域名,直接添加源域名
unified_image = f"{self.source_domain}/{line}"
images.append(unified_image)
except IOError as e:
raise DockerMigrationError(f"读取文件失败: {e}")
if not images:
raise DockerMigrationError("镜像列表为空,没有可处理的镜像")
logger.info(f"成功解析 {len(images)} 个镜像")
return images
def run_docker_command(self, command: List[str], error_msg: str = "Docker 命令执行失败") -> Tuple[bool, str]:
"""
执行 Docker 命令并处理结果
Args:
command: Docker 命令列表
error_msg: 失败时的错误消息
Returns:
Tuple[bool, str]: (是否成功, 输出信息)
"""
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
check=False,
timeout=300 # 5分钟超时
)
if result.returncode == 0:
return True, result.stdout.strip()
else:
error_output = result.stderr.strip()
logger.error(f"{error_msg}: {error_output}")
return False, error_output
except subprocess.TimeoutExpired:
raise DockerMigrationError(f"Docker 命令超时: {' '.join(command)}")
except subprocess.SubprocessError as e:
raise DockerMigrationError(f"Docker 命令执行异常: {e}")
except Exception as e:
raise DockerMigrationError(f"未知错误: {e}")
def pull_image(self, image: str) -> bool:
"""
拉取单个镜像
Args:
image: 镜像名称
Returns:
bool: 是否成功
"""
logger.info(f"正在拉取: {image}")
success, output = self.run_docker_command(
["docker", "pull", image],
f"拉取失败: {image}"
)
if success:
logger.info(f"✓ 成功拉取: {image}")
self.pulled_images.append(image)
else:
logger.error(f"✗ 拉取失败: {image}")
return success
def pull_all_images(self, images: List[str]) -> None:
"""
拉取所有镜像
Args:
images: 镜像列表
"""
logger.info("开始拉取镜像...")
failed_images = []
for i, image in enumerate(images, 1):
logger.info(f"进度: {i}/{len(images)}")
if not self.pull_image(image):
failed_images.append(image)
if failed_images:
raise DockerMigrationError(f"以下镜像拉取失败: {', '.join(failed_images)}")
logger.info("所有镜像拉取完成!")
def tag_and_push_image(self, source_image: str) -> bool:
"""
标记并推送单个镜像
Args:
source_image: 源镜像名称
Returns:
bool: 是否成功
"""
# 生成目标镜像名
target_image = source_image.replace(self.source_domain, self.target_domain, 1)
# 标记镜像
logger.info(f"正在标记: {source_image} -> {target_image}")
success, output = self.run_docker_command(
["docker", "tag", source_image, target_image],
f"标记失败: {source_image}"
)
if not success:
logger.error(f"✗ 标记失败: {source_image}")
return False
logger.info("✓ 标记成功")
self.tagged_images.append(target_image)
# 推送镜像
logger.info(f"正在推送: {target_image}")
success, output = self.run_docker_command(
["docker", "push", target_image],
f"推送失败: {target_image}"
)
if success:
logger.info(f"✓ 推送成功: {target_image}")
else:
logger.error(f"✗ 推送失败: {target_image}")
return success
def tag_and_push_all_images(self, images: List[str]) -> None:
"""
标记并推送所有镜像
Args:
images: 源镜像列表
"""
logger.info(f"开始重新标记并推送到 {self.target_domain}")
failed_images = []
for i, image in enumerate(images, 1):
logger.info(f"进度: {i}/{len(images)}")
if not self.tag_and_push_image(image):
failed_images.append(image)
if failed_images:
raise DockerMigrationError(f"以下镜像处理失败: {', '.join(failed_images)}")
logger.info("所有镜像推送完成!")
def cleanup(self) -> None:
"""清理本地标记的镜像(可选)"""
logger.info("清理本地镜像...")
cleanup_errors = []
for image in self.tagged_images:
success, output = self.run_docker_command(
["docker", "rmi", "-f", image],
f"清理镜像失败: {image}"
)
if not success:
cleanup_errors.append(image)
if cleanup_errors:
logger.warning(f"部分镜像清理失败: {', '.join(cleanup_errors)}")
else:
logger.info("本地镜像清理完成")
def run(self, cleanup_local: bool = False) -> None:
"""
执行完整的镜像迁移流程
Args:
cleanup_local: 是否清理本地镜像
"""
try:
logger.info("=" * 60)
logger.info("Docker 镜像迁移工具启动")
logger.info(f"源仓库: {self.source_domain}")
logger.info(f"目标仓库: {self.target_domain}")
logger.info("=" * 60)
# 1. 验证输入文件
self.validate_input_file()
# 2. 解析镜像列表
images = self.parse_image_list()
logger.info("\n待处理镜像列表:")
for img in images:
logger.info(f" - {img}")
# 3. 拉取镜像
self.pull_all_images(images)
# 4. 标记并推送镜像
self.tag_and_push_all_images(images)
# 5. 清理本地镜像(可选)
if cleanup_local:
self.cleanup()
logger.info("=" * 60)
logger.info("所有操作完成!")
logger.info(f"成功处理 {len(images)} 个镜像")
logger.info("=" * 60)
except DockerMigrationError as e:
logger.error(f"迁移过程出错: {e}")
sys.exit(1)
except KeyboardInterrupt:
logger.warning("\n用户中断操作")
sys.exit(130)
except Exception as e:
logger.error(f"未预期的错误: {e}", exc_info=True)
sys.exit(1)
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="Docker 镜像迁移工具 - 从源仓库拉取镜像并推送到目标仓库",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
python docker_migrate.py # 使用默认配置
python docker_migrate.py -f myimages.txt # 指定镜像列表文件
python docker_migrate.py -s hub-vpc.jdcloud.com -t hub.dev86.cvessel.jdcloud.com
python docker_migrate.py --cleanup # 执行后清理本地镜像
"""
)
parser.add_argument(
'-f', '--file',
default='imagelist',
help='镜像列表文件路径 (默认: imagelist)'
)
parser.add_argument(
'-s', '--source',
default='hub-vpc.jdcloud.com',
help='源仓库域名 (默认: hub-vpc.jdcloud.com)'
)
parser.add_argument(
'-t', '--target',
default='hub.dev86.cvessel.jdcloud.com',
help='目标仓库域名 (默认: hub.dev86.cvessel.jdcloud.com)'
)
parser.add_argument(
'--cleanup',
action='store_true',
help='执行完成后清理本地缓存的镜像'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='显示详细日志'
)
args = parser.parse_args()
# 设置日志级别
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# 检查 Docker 是否可用
try:
subprocess.run(["docker", "version"], capture_output=True, check=True, timeout=10)
except (subprocess.SubprocessError, FileNotFoundError):
logger.error("Docker 未安装或不可用,请先安装 Docker")
sys.exit(1)
# 创建迁移器实例并执行
migrator = DockerImageMigrator(
image_list_file=args.file,
source_domain=args.source,
target_domain=args.target
)
migrator.run(cleanup_local=args.cleanup)
if __name__ == "__main__":
main()
拉取镜像python脚本2
#!/usr/bin/env python3
# =============================================================================
# 脚本名称: docker_pull_images.py
# 脚本功能: 批量拉取容器镜像(从指定文件读取镜像列表)
# 容器工具: 支持 nerdctl / docker,自动检测,或通过 -c 指定
# 适用环境: 生产环境
# =============================================================================
import argparse
import shutil
import subprocess
import sys
import time
from datetime import datetime
from typing import List, Optional
# ---------- 颜色定义 ----------
class Color:
RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[0;34m"
NC = "\033[0m"
# ---------- 日志函数 ----------
def _ts() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def log_info(msg: str) -> None:
print(f"{Color.BLUE}[INFO]{Color.NC} {_ts()} {msg}")
def log_success(msg: str) -> None:
print(f"{Color.GREEN}[SUCCESS]{Color.NC} {_ts()} {msg}")
def log_warn(msg: str) -> None:
print(f"{Color.YELLOW}[WARN]{Color.NC} {_ts()} {msg}")
def log_error(msg: str) -> None:
print(f"{Color.RED}[ERROR]{Color.NC} {_ts()} {msg}", file=sys.stderr)
def log_debug(msg: str, verbose: bool = False) -> None:
if verbose:
print(f"{Color.BLUE}[DEBUG]{Color.NC} {_ts()} {msg}")
# ---------- 参数解析 ----------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="docker_pull_images.py",
description="批量拉取容器镜像(从指定文件读取镜像列表)",
formatter_class=argparse.RawTextHelpFormatter,
epilog=(
"文件格式:\n"
" 每行一个镜像名称,支持 # 开头的注释行和空行\n\n"
"示例:\n"
" %(prog)s -f images.txt\n"
" %(prog)s -f images.txt -c nerdctl -n k8s.io\n"
" %(prog)s -f images.txt -c docker -r 3 -t 600 -v\n"
),
)
parser.add_argument(
"-f", "--file",
required=True,
metavar="<file>",
help="镜像列表文件路径(必填)",
)
parser.add_argument(
"-c", "--cli",
choices=["nerdctl", "docker"],
default=None,
metavar="<cli>",
help="指定容器 CLI 工具: nerdctl 或 docker(默认: 自动检测,优先 nerdctl)",
)
parser.add_argument(
"-n", "--namespace",
default="",
metavar="<namespace>",
help="nerdctl 命名空间(如 k8s.io),仅当 cli 为 nerdctl 时生效",
)
parser.add_argument(
"-r", "--retry",
type=int,
default=0,
metavar="<num>",
help="单个镜像拉取失败后的重试次数(默认: 0)",
)
parser.add_argument(
"-t", "--timeout",
type=int,
default=0,
metavar="<sec>",
help="单个镜像拉取超时时间,单位秒(默认: 0,不限制)",
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="开启详细日志",
)
return parser.parse_args()
# ---------- 自动检测 CLI 工具 ----------
def detect_cli(cli_arg: Optional[str]) -> str:
if cli_arg:
if not shutil.which(cli_arg):
log_error(f"未找到指定的容器 CLI 工具: {cli_arg}")
sys.exit(1)
return cli_arg
for tool in ("nerdctl", "docker"):
if shutil.which(tool):
return tool
log_error("未找到 nerdctl 或 docker,请先安装其中之一,或使用 -c 指定")
sys.exit(1)
# ---------- 读取镜像列表 ----------
def load_images(filepath: str) -> List[str]:
images: List[str] = []
try:
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
images.append(line)
except PermissionError:
log_error(f"镜像列表文件不可读: {filepath}")
sys.exit(1)
except FileNotFoundError:
log_error(f"镜像列表文件不存在: {filepath}")
sys.exit(1)
return images
# ---------- 拉取单个镜像(含重试 / 超时) ----------
def pull_one_image(
image: str,
cli: str,
namespace: str,
retry: int,
timeout: int,
verbose: bool,
) -> bool:
base_cmd: List[str] = [cli]
if cli == "nerdctl" and namespace:
base_cmd += ["--namespace", namespace]
base_cmd += ["pull", image]
max_attempts = retry + 1
for attempt in range(1, max_attempts + 1):
log_debug(f"第 {attempt}/{max_attempts} 次尝试拉取: {image}", verbose)
try:
kwargs = {"timeout": timeout if timeout > 0 else None}
result = subprocess.run(
base_cmd,
**kwargs,
)
if result.returncode == 0:
return True
except subprocess.TimeoutExpired:
log_warn(f"第 {attempt} 次拉取超时({timeout}s): {image}")
except Exception as e:
log_warn(f"第 {attempt} 次拉取异常: {image} -> {e}")
else:
log_warn(f"第 {attempt} 次拉取失败(返回码非 0): {image}")
if attempt < max_attempts:
time.sleep(2)
return False
# ---------- 主流程 ----------
def main() -> None:
args = parse_args()
# 参数校验
if args.retry < 0:
log_error("-r/--retry 参数必须是非负整数")
sys.exit(1)
if args.timeout < 0:
log_error("-t/--timeout 参数必须是非负整数")
sys.exit(1)
cli = detect_cli(args.cli)
namespace = args.namespace
# namespace 仅对 nerdctl 生效
if namespace and cli != "nerdctl":
log_warn(f"-n 命名空间参数仅在 nerdctl 下生效,当前 CLI 为 {cli},已忽略")
namespace = ""
log_info(f"使用容器 CLI 工具: {cli}" + (f" (namespace={namespace})" if namespace else ""))
log_info(f"开始读取镜像列表文件: {args.file}")
log_debug(
f"参数: cli={cli}, namespace={namespace or '<none>'}, "
f"retry={args.retry}, timeout={args.timeout}, verbose={args.verbose}",
args.verbose,
)
images = load_images(args.file)
if not images:
log_warn("镜像列表为空,无需拉取")
sys.exit(0)
total = len(images)
success = 0
failed = 0
failed_list: List[str] = []
start_time = time.time()
for idx, image in enumerate(images, start=1):
log_info(f"[{idx}/{total}] 正在拉取镜像: {image}")
if pull_one_image(image, cli, namespace, args.retry, args.timeout, args.verbose):
log_success(f"[{idx}/{total}] 拉取成功: {image}")
success += 1
else:
log_error(f"[{idx}/{total}] 拉取失败: {image}")
failed += 1
failed_list.append(image)
print("-" * 40)
elapsed = int(time.time() - start_time)
# ---------- 结果汇总 ----------
print()
log_info("========== 拉取结果汇总 ==========")
log_info(f"镜像总数: {total}")
log_success(f"成功数量: {success}")
if failed > 0:
log_error(f"失败数量: {failed}")
log_warn("失败镜像列表:")
for img in failed_list:
print(f" - {img}")
else:
log_info("失败数量: 0")
log_info(f"总耗时: {elapsed} 秒")
log_info("==================================")
sys.exit(0 if failed == 0 else 1)
if __name__ == "__main__":
main()