MySQL 数据库二进制版本安装操作步骤:
1. 首先先上传数据库安装包到linux操作系统
2. 添加一块新的硬盘, 用来存储数据库数据信息
3. 格式化硬盘
mkfs.xfs /dev/sdb
4. 创建数据库存放数据的路径
mkdir -p /data
5. 挂载并检查
mount /dev/sdb /data
df -h
6. 添加磁盘为永久挂载
vim /etc/fstab
/dev/sdb /data/ xfs defaults 0 0
umount -lf /data
mount -a
7. 加压上传的数据库安装包
[root@shell~]#cd /usr/local/
[root@shell/usr/local]#tar xf mysql-5.7.26-linux-glibc2.12-x86_64.tar.gz
8. 解压完成, 创建连接文件 或者使用mv命令修改文件名称
[root@shell/usr/local]#ln -s mysql-5.7.26-linux-glibc2.12-x86_64 mysql
9. 创建数据库数据存放目录
mkdir -p /data/mysql/data
10 创建MySQL数据库虚拟用户用来管理MySQL数据库
useradd -M -s /sbin/nologin mysql
11. 进行授权路径
[root@shell/usr/local]#chown -R mysql.mysql /usr/local/mysql /data/mysql/data/
12. 定义软连接 让系统可以直接调用数据库命令
ln -s /usr/local/mysql/bin/* /usr/local/bin
13 . 编写系统环境变量配置文件, 让系统可以直接调用数据库命令 并刷新让其生效
vim /etc/profile
PATH=/usr/local/mysql/bin:$PATH
source /etc/profile
14. 复制mysql数据库的启动脚本到 /etc/init.d/目录下 添加执行权限, 添加到开机自启动
[root@shell/usr/local]#cp /usr/local/mysql/support-files/mysql.server [root@shell/usr/local]#/etc/init.d/mysqld
[root@shell/usr/local]# chmod +x /etc/init.d/mysqld
[root@shell/usr/local]# chkconfig --add mysqld
15. 编写mysql 数据库主配置文件
vim /etc/my.cnf
[mysqld]
user=mysql
mysqlx=0
basedir=/usr/local/mysql
datadir=/data/mysql/data
port=3306
server_id=13
socket=/tmp/mysql.sock
[mysql]
socket=/tmp/mysql.sock
16. 初始化数据库
mysqld --initialize-insecure --user=mysql --basedir=/usr/local/mysql --datadir=/data/mysql/data
17. 启动数据库
systemctl start mysqld
18. 检查数据库服务是否启动成功
ps -ef |grep mysqld && netstat -anpt |grep mysqld
19.登录数据库测试
mysql 直接登录
脚本化
#!/bin/bash
# MySQL生产环境安装脚本
# 版本: 1.0
# 作者: 系统管理员
# 描述: 用于在生产环境安装MySQL 9.4.0
set -euo pipefail # 严格模式: 遇到错误退出,使用未定义变量退出,管道错误退出
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 日志函数
log() {
echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] $*${NC}"
}
warn() {
echo -e "${YELLOW}[$(date +'%Y-%m-%d %H:%M:%S')] WARNING: $*${NC}"
}
error() {
echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] ERROR: $*${NC}" >&2
exit 1
}
# 检查是否以root用户运行
check_root() {
if [[ $EUID -ne 0 ]]; then
error "此脚本必须以root用户运行"
fi
}
# 定义变量
MYSQL_TAR="mysql-9.4.0-linux-glibc2.28-x86_64.tar.xz"
MYSQL_DIR="/usr/local/mysql"
DATA_DIR="/data/mysql/data"
MYSQL_USER="mysql"
MYSQL_GROUP="mysql"
SERVICE_FILE="/etc/init.d/mysqld"
CONFIG_FILE="/etc/my.cnf"
# 检查文件是否存在
check_files() {
log "检查必要的文件..."
if [[ ! -f "$MYSQL_TAR" ]]; then
error "MySQL安装包 $MYSQL_TAR 不存在"
fi
if [[ ! -d "/usr/local" ]]; then
error "/usr/local 目录不存在"
fi
}
# 备份原有配置
backup_existing() {
log "备份现有MySQL配置..."
# 备份my.cnf
if [[ -f "$CONFIG_FILE" ]]; then
cp -p "$CONFIG_FILE" "${CONFIG_FILE}.bak.$(date +%Y%m%d%H%M%S)"
log "已备份 $CONFIG_FILE"
fi
# 备份服务脚本
if [[ -f "$SERVICE_FILE" ]]; then
cp -p "$SERVICE_FILE" "${SERVICE_FILE}.bak.$(date +%Y%m%d%H%M%S)"
log "已备份 $SERVICE_FILE"
fi
}
# 创建MySQL用户和组
create_mysql_user() {
log "创建MySQL用户和组..."
if ! id "$MYSQL_USER" &>/dev/null; then
groupadd -r "$MYSQL_GROUP" || warn "用户组 $MYSQL_GROUP 可能已存在"
useradd -r -g "$MYSQL_GROUP" -s /bin/false "$MYSQL_USER" || warn "用户 $MYSQL_USER 可能已存在"
log "MySQL用户和组创建完成"
else
log "MySQL用户已存在"
fi
}
# 创建数据目录
create_data_dir() {
log "创建数据目录..."
if [[ ! -d "$DATA_DIR" ]]; then
mkdir -p "$DATA_DIR"
log "数据目录 $DATA_DIR 创建完成"
else
warn "数据目录 $DATA_DIR 已存在"
fi
}
# 安装MySQL
install_mysql() {
log "开始安装MySQL..."
# 解压安装包
log "解压MySQL安装包..."
if ! tar xf "$MYSQL_TAR" -C /usr/local/; then
error "解压MySQL安装包失败"
fi
# 重命名目录
log "重命名MySQL目录..."
local extracted_dir="/usr/local/mysql-9.4.0-linux-glibc2.28-x86_64"
if [[ -d "$extracted_dir" ]]; then
if [[ -d "$MYSQL_DIR" ]]; then
mv "$MYSQL_DIR" "${MYSQL_DIR}.bak.$(date +%Y%m%d%H%M%S)"
warn "原有MySQL目录已备份"
fi
mv "$extracted_dir" "$MYSQL_DIR"
else
error "解压后的目录不存在: $extracted_dir"
fi
# 设置权限
log "设置目录权限..."
chown -R "${MYSQL_USER}.${MYSQL_GROUP}" "$MYSQL_DIR" "/data"
# 创建符号链接
log "创建二进制文件符号链接..."
for bin_file in "$MYSQL_DIR"/bin/*; do
if [[ -x "$bin_file" ]]; then
ln -sf "$bin_file" "/usr/local/bin/"
fi
done
}
# 配置MySQL服务
configure_service() {
log "配置MySQL服务..."
# 复制服务脚本
if [[ -f "$MYSQL_DIR/support-files/mysql.server" ]]; then
cp -a "$MYSQL_DIR/support-files/mysql.server" "$SERVICE_FILE"
chmod +x "$SERVICE_FILE"
log "服务脚本已复制"
else
warn "MySQL服务脚本不存在,跳过复制"
fi
# 配置chkconfig
if command -v chkconfig &>/dev/null; then
chkconfig --add mysqld
log "MySQL服务已添加到chkconfig"
else
warn "chkconfig命令不存在,跳过服务配置"
fi
}
# 创建配置文件
create_config() {
log "创建MySQL配置文件..."
cat > "$CONFIG_FILE" << EOF
[mysqld]
basedir=$MYSQL_DIR
datadir=$DATA_DIR
port=3306
socket=/tmp/mysql.sock
user=$MYSQL_USER
log-error=$DATA_DIR/mysql.error.log
pid-file=$DATA_DIR/mysql.pid
# 性能调优参数 (根据实际情况调整)
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
max_connections=1000
innodb_buffer_pool_size=1G
innodb_log_file_size=256M
[mysql]
socket=/tmp/mysql.sock
default-character-set=utf8mb4
[client]
socket=/tmp/mysql.sock
EOF
log "MySQL配置文件已创建: $CONFIG_FILE"
}
# 移除冲突的MariaDB包
remove_mariadb() {
log "检查并移除冲突的MariaDB包..."
local mariadb_packages
mariadb_packages=$(rpm -qa | grep -E '^mariadb' || true)
if [[ -n "$mariadb_packages" ]]; then
log "发现以下MariaDB包:"
echo "$mariadb_packages"
for package in $mariadb_packages; do
warn "移除包: $package"
rpm -e --nodeps "$package" || warn "移除包 $package 失败"
done
else
log "未发现冲突的MariaDB包"
fi
}
# 初始化MySQL
initialize_mysql() {
log "初始化MySQL..."
# 检查数据目录是否为空
if [[ "$(ls -A $DATA_DIR 2>/dev/null)" ]]; then
warn "数据目录 $DATA_DIR 不为空,跳过初始化"
return 0
fi
log "执行MySQL初始化..."
if ! mysqld --initialize-insecure \
--user="$MYSQL_USER" \
--basedir="$MYSQL_DIR" \
--datadir="$DATA_DIR" 2>&1 | tee "/tmp/mysql_initialize.log"; then
error "MySQL初始化失败,请检查日志: /tmp/mysql_initialize.log"
fi
log "MySQL初始化完成"
}
# 启动MySQL服务
start_mysql() {
log "启动MySQL服务..."
# 尝试使用systemctl
if command -v systemctl &>/dev/null; then
if systemctl start mysqld; then
log "使用systemctl启动MySQL成功"
else
warn "systemctl启动失败,尝试使用service命令"
service mysqld start || error "启动MySQL服务失败"
fi
elif command -v service &>/dev/null; then
service mysqld start || error "启动MySQL服务失败"
log "使用service启动MySQL成功"
else
warn "无法找到systemctl或service命令,请手动启动MySQL"
return 1
fi
# 检查服务状态
sleep 3
if pgrep mysqld >/dev/null; then
log "MySQL进程运行正常"
else
error "MySQL进程未运行"
fi
# 检查端口监听
if netstat -anpt 2>/dev/null | grep -q ':3306.*LISTEN'; then
log "MySQL正在监听3306端口"
else
warn "MySQL未在3306端口监听,请检查日志"
fi
}
# 安全设置
security_setup() {
log "执行MySQL安全设置..."
# 等待MySQL完全启动
sleep 5
# 设置root密码(生产环境应该从安全的地方获取密码)
local mysql_root_password
mysql_root_password=$(openssl rand -base64 16)
log "生成的MySQL root密码: $mysql_root_password"
echo "MySQL root密码: $mysql_root_password" > /root/mysql_root_password.txt
chmod 600 /root/mysql_root_password.txt
# 使用mysql_secure_installation的简化版本
mysql -u root --skip-password << EOF
ALTER USER 'root'@'localhost' IDENTIFIED BY '${mysql_root_password}';
DELETE FROM mysql.user WHERE User='';
DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
DROP DATABASE IF EXISTS test;
DELETE FROM mysql.db WHERE Db='test' OR Db='test\\_%';
FLUSH PRIVILEGES;
EOF
log "MySQL安全设置完成,root密码已保存到 /root/mysql_root_password.txt"
}
# 主函数
main() {
log "开始MySQL生产环境安装..."
check_root
check_files
backup_existing
create_mysql_user
create_data_dir
remove_mariadb
install_mysql
configure_service
create_config
initialize_mysql
start_mysql
security_setup
log "MySQL安装完成!"
log "请立即修改root密码并执行以下命令验证安装:"
echo " mysql -u root -p"
echo " systemctl status mysqld"
echo " netstat -anpt | grep 3306"
}
# 脚本执行入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi
#!/usr/bin/env python3
"""
MySQL生产环境安装脚本
版本: 1.0
描述: 用于在生产环境安装MySQL 9.4.0
"""
import os
import sys
import subprocess
import shutil
import time
import logging
import tarfile
import getpass
from pathlib import Path
from datetime import datetime
import secrets
import string
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='\033[92m[%(asctime)s] %(message)s\033[0m',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# 颜色代码
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
NC = '\033[0m' # No Color
# 自定义日志级别
def log_info(message):
logger.info(message)
def log_warning(message):
print(f"{Colors.YELLOW}[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] WARNING: {message}{Colors.NC}")
def log_error(message, exit_script=True):
print(f"{Colors.RED}[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ERROR: {message}{Colors.NC}")
if exit_script:
sys.exit(1)
def run_command(cmd, check=True, capture_output=False):
"""执行shell命令"""
try:
log_info(f"执行命令: {cmd}")
if capture_output:
result = subprocess.run(cmd, shell=True, check=check,
capture_output=True, text=True)
return result
else:
result = subprocess.run(cmd, shell=True, check=check)
return result
except subprocess.CalledProcessError as e:
if check:
log_error(f"命令执行失败: {cmd}\n错误: {e}")
else:
log_warning(f"命令执行失败: {cmd}\n错误: {e}")
return None
except Exception as e:
if check:
log_error(f"执行命令时发生异常: {cmd}\n异常: {e}")
else:
log_warning(f"执行命令时发生异常: {cmd}\n异常: {e}")
return None
class MySQLInstaller:
def __init__(self):
self.mysql_tar = "mysql-9.4.0-linux-glibc2.28-x86_64.tar.xz"
self.mysql_dir = Path("/usr/local/mysql")
self.data_dir = Path("/data/mysql/data")
self.mysql_user = "mysql"
self.mysql_group = "mysql"
self.service_file = Path("/etc/init.d/mysqld")
self.config_file = Path("/etc/my.cnf")
def check_root(self):
"""检查是否以root用户运行"""
if os.geteuid() != 0:
log_error("此脚本必须以root用户运行")
def check_files(self):
"""检查必要的文件"""
log_info("检查必要的文件...")
if not os.path.isfile(self.mysql_tar):
log_error(f"MySQL安装包 {self.mysql_tar} 不存在")
if not os.path.isdir("/usr/local"):
log_error("/usr/local 目录不存在")
log_info("文件检查完成")
def backup_existing(self):
"""备份原有配置"""
log_info("备份现有MySQL配置...")
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# 备份my.cnf
if self.config_file.exists():
backup_file = f"{self.config_file}.bak.{timestamp}"
shutil.copy2(self.config_file, backup_file)
log_info(f"已备份 {self.config_file} 到 {backup_file}")
# 备份服务脚本
if self.service_file.exists():
backup_file = f"{self.service_file}.bak.{timestamp}"
shutil.copy2(self.service_file, backup_file)
log_info(f"已备份 {self.service_file} 到 {backup_file}")
# 备份原有MySQL目录
if self.mysql_dir.exists():
backup_dir = f"{self.mysql_dir}.bak.{timestamp}"
shutil.move(self.mysql_dir, backup_dir)
log_info(f"已备份原有MySQL目录到 {backup_dir}")
def create_mysql_user(self):
"""创建MySQL用户和组"""
log_info("创建MySQL用户和组...")
try:
# 检查用户组是否存在
run_command(f"getent group {self.mysql_group}", check=False)
run_command(f"groupadd -r {self.mysql_group}", check=False)
log_info(f"用户组 {self.mysql_group} 创建完成")
except Exception as e:
log_warning(f"创建用户组时发生异常: {e}")
try:
# 检查用户是否存在
run_command(f"id {self.mysql_user}", check=False)
run_command(f"useradd -r -g {self.mysql_group} -s /bin/false {self.mysql_user}", check=False)
log_info(f"用户 {self.mysql_user} 创建完成")
except Exception as e:
log_warning(f"创建用户时发生异常: {e}")
def create_data_dir(self):
"""创建数据目录"""
log_info("创建数据目录...")
try:
self.data_dir.mkdir(parents=True, exist_ok=True)
log_info(f"数据目录 {self.data_dir} 创建完成")
except Exception as e:
log_error(f"创建数据目录失败: {e}")
def install_mysql(self):
"""安装MySQL"""
log_info("开始安装MySQL...")
# 解压安装包
log_info("解压MySQL安装包...")
try:
with tarfile.open(self.mysql_tar, 'r:xz') as tar:
tar.extractall(path='/usr/local/')
log_info("MySQL安装包解压完成")
except Exception as e:
log_error(f"解压MySQL安装包失败: {e}")
# 重命名目录
log_info("重命名MySQL目录...")
extracted_dir = Path("/usr/local/mysql-9.4.0-linux-glibc2.28-x86_64")
if not extracted_dir.exists():
log_error(f"解压后的目录不存在: {extracted_dir}")
if self.mysql_dir.exists():
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
backup_dir = Path(f"{self.mysql_dir}.bak.{timestamp}")
shutil.move(self.mysql_dir, backup_dir)
log_warning(f"原有MySQL目录已备份到 {backup_dir}")
shutil.move(str(extracted_dir), str(self.mysql_dir))
log_info("MySQL目录重命名完成")
# 设置权限
log_info("设置目录权限...")
run_command(f"chown -R {self.mysql_user}.{self.mysql_group} {self.mysql_dir}")
run_command(f"chown -R {self.mysql_user}.{self.mysql_group} /data")
# 创建符号链接
log_info("创建二进制文件符号链接...")
bin_dir = self.mysql_dir / "bin"
if bin_dir.exists():
for bin_file in bin_dir.iterdir():
if bin_file.is_file() and os.access(bin_file, os.X_OK):
target_link = Path("/usr/local/bin") / bin_file.name
if target_link.exists() or target_link.is_symlink():
target_link.unlink()
target_link.symlink_to(bin_file)
log_info("二进制文件符号链接创建完成")
else:
log_warning(f"MySQL bin目录不存在: {bin_dir}")
def configure_service(self):
"""配置MySQL服务"""
log_info("配置MySQL服务...")
# 复制服务脚本
service_source = self.mysql_dir / "support-files" / "mysql.server"
if service_source.exists():
shutil.copy2(service_source, self.service_file)
self.service_file.chmod(0o755)
log_info("服务脚本已复制并设置权限")
else:
log_warning(f"MySQL服务脚本不存在: {service_source}")
# 配置chkconfig
if shutil.which("chkconfig"):
run_command("chkconfig --add mysqld")
log_info("MySQL服务已添加到chkconfig")
else:
log_warning("chkconfig命令不存在,跳过服务配置")
def create_config(self):
"""创建MySQL配置文件"""
log_info("创建MySQL配置文件...")
config_content = f"""
[mysqld]
basedir={self.mysql_dir}
datadir={self.data_dir}
port=3306
socket=/tmp/mysql.sock
user={self.mysql_user}
log-error={self.data_dir}/mysql.error.log
pid-file={self.data_dir}/mysql.pid
# 性能调优参数 (根据实际情况调整)
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
max_connections=1000
innodb_buffer_pool_size=1G
innodb_log_file_size=256M
[mysql]
socket=/tmp/mysql.sock
default-character-set=utf8mb4
[client]
socket=/tmp/mysql.sock
"""
try:
with open(self.config_file, 'w') as f:
f.write(config_content)
log_info(f"MySQL配置文件已创建: {self.config_file}")
except Exception as e:
log_error(f"创建配置文件失败: {e}")
def remove_mariadb(self):
"""移除冲突的MariaDB包"""
log_info("检查并移除冲突的MariaDB包...")
try:
result = run_command("rpm -qa | grep -E '^mariadb'", check=False, capture_output=True)
if result and result.returncode == 0 and result.stdout.strip():
mariadb_packages = result.stdout.strip().split('\n')
log_info("发现以下MariaDB包:")
for package in mariadb_packages:
print(f" - {package}")
for package in mariadb_packages:
log_warning(f"移除包: {package}")
run_command(f"rpm -e --nodeps {package}", check=False)
else:
log_info("未发现冲突的MariaDB包")
except Exception as e:
log_warning(f"检查MariaDB包时发生异常: {e}")
def initialize_mysql(self):
"""初始化MySQL"""
log_info("初始化MySQL...")
# 检查数据目录是否为空
try:
if any(self.data_dir.iterdir()):
log_warning(f"数据目录 {self.data_dir} 不为空,跳过初始化")
return
except Exception as e:
log_error(f"检查数据目录失败: {e}")
log_info("执行MySQL初始化...")
init_cmd = (
f"mysqld --initialize-insecure "
f"--user={self.mysql_user} "
f"--basedir={self.mysql_dir} "
f"--datadir={self.data_dir}"
)
try:
with open("/tmp/mysql_initialize.log", "w") as log_file:
process = subprocess.Popen(
init_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True
)
# 实时输出日志
for line in process.stdout:
log_file.write(line)
log_file.flush()
process.wait()
if process.returncode != 0:
log_error(f"MySQL初始化失败,请检查日志: /tmp/mysql_initialize.log")
else:
log_info("MySQL初始化完成")
except Exception as e:
log_error(f"初始化过程中发生异常: {e}")
def generate_password(self, length=16):
"""生成随机密码"""
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(length))
def start_mysql(self):
"""启动MySQL服务"""
log_info("启动MySQL服务...")
# 尝试使用systemctl
if shutil.which("systemctl"):
if run_command("systemctl start mysqld", check=False):
log_info("使用systemctl启动MySQL成功")
else:
log_warning("systemctl启动失败,尝试使用service命令")
if not run_command("service mysqld start", check=False):
log_error("启动MySQL服务失败")
elif shutil.which("service"):
if run_command("service mysqld start", check=False):
log_info("使用service启动MySQL成功")
else:
log_error("启动MySQL服务失败")
else:
log_warning("无法找到systemctl或service命令,请手动启动MySQL")
return False
# 检查服务状态
log_info("检查MySQL服务状态...")
time.sleep(3)
# 检查进程
if run_command("pgrep mysqld", check=False, capture_output=True):
log_info("MySQL进程运行正常")
else:
log_error("MySQL进程未运行")
# 检查端口监听
result = run_command("netstat -anpt 2>/dev/null | grep ':3306' | grep LISTEN",
check=False, capture_output=True)
if result and result.returncode == 0 and result.stdout.strip():
log_info("MySQL正在监听3306端口")
else:
log_warning("MySQL未在3306端口监听,请检查日志")
return True
def security_setup(self):
"""安全设置"""
log_info("执行MySQL安全设置...")
# 等待MySQL完全启动
time.sleep(5)
# 生成随机密码
mysql_root_password = self.generate_password()
log_info(f"生成的MySQL root密码: {mysql_root_password}")
# 保存密码到文件
password_file = Path("/root/mysql_root_password.txt")
try:
with open(password_file, 'w') as f:
f.write(f"MySQL root密码: {mysql_root_password}\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
password_file.chmod(0o600)
log_info(f"密码已保存到 {password_file}")
except Exception as e:
log_error(f"保存密码文件失败: {e}")
# 执行安全设置
security_commands = [
f"ALTER USER 'root'@'localhost' IDENTIFIED BY '{mysql_root_password}';",
"DELETE FROM mysql.user WHERE User='';",
"DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');",
"DROP DATABASE IF EXISTS test;",
"DELETE FROM mysql.db WHERE Db='test' OR Db='test\\\\_%';",
"FLUSH PRIVILEGES;"
]
# 由于初始化时使用了--initialize-insecure,首次连接不需要密码
try:
for cmd in security_commands:
mysql_cmd = f"mysql -u root --skip-password -e \"{cmd}\""
run_command(mysql_cmd, check=False)
log_info("MySQL安全设置完成")
except Exception as e:
log_warning(f"执行安全设置时发生异常: {e}")
def verify_installation(self):
"""验证安装"""
log_info("验证MySQL安装...")
checks = [
("检查MySQL进程", "ps -ef | grep mysqld | grep -v grep"),
("检查端口监听", "netstat -anpt | grep 3306"),
("检查服务状态", "systemctl status mysqld 2>/dev/null || service mysqld status 2>/dev/null"),
]
for check_name, cmd in checks:
log_info(check_name)
result = run_command(cmd, check=False, capture_output=True)
if result and result.returncode == 0:
print(f" {Colors.GREEN}✓{Colors.NC} {check_name} - 成功")
else:
print(f" {Colors.RED}✗{Colors.NC} {check_name} - 失败")
def install(self):
"""执行安装流程"""
log_info("开始MySQL生产环境安装...")
steps = [
("检查root权限", self.check_root),
("检查文件", self.check_files),
("备份现有配置", self.backup_existing),
("创建MySQL用户", self.create_mysql_user),
("创建数据目录", self.create_data_dir),
("移除MariaDB冲突包", self.remove_mariadb),
("安装MySQL", self.install_mysql),
("配置服务", self.configure_service),
("创建配置文件", self.create_config),
("初始化MySQL", self.initialize_mysql),
("启动MySQL服务", self.start_mysql),
("安全设置", self.security_setup),
("验证安装", self.verify_installation),
]
for step_name, step_func in steps:
log_info(f"{Colors.BLUE}=== {step_name} ==={Colors.NC}")
try:
step_func()
except Exception as e:
log_error(f"步骤 '{step_name}' 执行失败: {e}")
log_info(f"{Colors.GREEN}MySQL安装完成!{Colors.NC}")
log_info("请立即修改root密码并执行以下命令验证安装:")
print(" mysql -u root -p")
print(" systemctl status mysqld")
print(" netstat -anpt | grep 3306")
print(f" 初始root密码保存在: /root/mysql_root_password.txt")
def main():
"""主函数"""
installer = MySQLInstaller()
installer.install()
if __name__ == "__main__":
main()
批量拉取镜像
#!/usr/bin/env python
"""
Docker镜像批量拉取工具
支持并行拉取、错误重试、进度显示等功能
"""
import argparse
import logging
import os
import subprocess
import sys
import multiprocessing
from typing import List, Set, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
class DockerPullManager:
def __init__(self, max_workers: int = None, retry_count: int = 2):
self.max_workers = max_workers or min(4, multiprocessing.cpu_count())
self.retry_count = retry_count
def run_cmd(self, cmd: str) -> Tuple[int, str, str]:
"""执行shell命令"""
try:
process = subprocess.Popen(
cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
out, error = process.communicate(timeout=1800)
return process.returncode, out.strip(), error.strip()
except subprocess.TimeoutExpired:
logger.error(f"Command timed out: {cmd}")
process.kill()
return 1, "", "Timeout"
except Exception as e:
logger.error(f"Command error: {cmd}, {e}")
return 1, "", str(e)
def get_image_list(self, file_path: str) -> Set[str]:
"""读取镜像列表"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Image file not found: {file_path}")
images = set()
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line and not line.startswith("#"):
images.add(line)
except IOError as e:
raise IOError(f"Failed to read {file_path}: {e}")
return images
def pull_single_image(self, image: str) -> Tuple[bool, str]:
"""拉取单个镜像,支持重试"""
for attempt in range(self.retry_count + 1):
logger.info(f"Pulling {image} (attempt {attempt + 1})")
r, out, error = self.run_cmd(f"docker pull {image}")
if r == 0:
logger.info(f"✓ Success: {image}")
return True, out
else:
logger.warning(f"Attempt {attempt + 1} failed for {image}: {error}")
logger.error(f"✗ Failed: {image} after {self.retry_count + 1} attempts")
return False, error
def batch_pull(self, file_path: str) -> Tuple[int, int]:
"""批量拉取镜像"""
try:
images = self.get_image_list(file_path)
except Exception as e:
logger.error(str(e))
return 0, 0
if not images:
logger.warning("No images found to pull")
return 0, 0
logger.info(f"Found {len(images)} unique images, starting pull with {self.max_workers} workers...")
success_count = 0
with multiprocessing.Pool(processes=self.max_workers) as pool:
results = [pool.apply_async(self.pull_single_image, (img,)) for img in images]
for i, (image, result) in enumerate(zip(images, results)):
try:
success, msg = result.get(timeout=1800)
if success:
success_count += 1
logger.info(f"Progress: {i+1}/{len(images)}")
except multiprocessing.TimeoutError:
logger.error(f"Timeout: {image}")
failed_count = len(images) - success_count
logger.info(f"Completed: {success_count} successful, {failed_count} failed")
return success_count, failed_count
def main():
parser = argparse.ArgumentParser(description='批量拉取Docker镜像')
parser.add_argument('--file', '-f', default='images.txt',
help='镜像列表文件路径 (默认: images.txt)')
parser.add_argument('--workers', '-w', type=int,
help=f'并行工作进程数 (默认: CPU核心数,最大4)')
parser.add_argument('--retry', '-r', type=int, default=2,
help='失败重试次数 (默认: 2)')
parser.add_argument('--verbose', '-v', action='store_true',
help='详细输出')
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
manager = DockerPullManager(max_workers=args.workers, retry_count=args.retry)
success, failed = manager.batch_pull(args.file)
sys.exit(1 if failed > 0 else 0)
if __name__ == '__main__':
main()
加强版本
#!/usr/bin/env python
"""
Docker镜像批量拉取工具
支持并行拉取、错误重试、进度显示、镜像验证等功能
增强版本包含更完善的错误处理和进度展示
"""
import argparse # 导入命令行参数解析模块
import logging # 导入日志记录模块
import os # 导入操作系统功能模块
import subprocess # 导入子进程管理模块
import sys # 导入系统相关功能模块
import time # 导入时间相关功能模块
import multiprocessing # 导入多进程处理模块
from typing import List, Set, Tuple, Dict, Optional # 导入类型提示功能
from datetime import datetime # 导入日期时间处理模块
# 配置日志系统,设置格式和输出目标
logging.basicConfig(
level=logging.INFO, # 设置默认日志级别为INFO
format='%(asctime)s - %(levelname)s - %(message)s', # 设置日志格式:时间-级别-消息
handlers=[logging.StreamHandler(sys.stdout)] # 设置日志输出到标准输出
)
logger = logging.getLogger(__name__) # 获取当前模块的日志记录器
class DockerPullManager:
"""Docker镜像拉取管理器,处理批量镜像拉取任务"""
def __init__(self, max_workers: int = None, retry_count: int = 2, timeout: int = 1800):
"""
初始化Docker拉取管理器
Args:
max_workers: 最大并行工作进程数,默认为min(4, CPU核心数)
retry_count: 失败重试次数,默认为2次
timeout: 命令执行超时时间(秒),默认30分钟
"""
# 设置最大工作进程数,取4和CPU核心数的较小值作为默认值
self.max_workers = max_workers or min(4, multiprocessing.cpu_count())
self.retry_count = retry_count # 设置重试次数
self.timeout = timeout # 设置超时时间
self.start_time = time.time() # 记录开始时间,用于计算总耗时
self.results: Dict[str, bool] = {} # 存储每个镜像的拉取结果
def run_cmd(self, cmd: str) -> Tuple[int, str, str]:
"""
执行shell命令并返回结果
Args:
cmd: 要执行的shell命令
Returns:
返回值元组: (返回码, 标准输出, 标准错误)
"""
try:
# 创建子进程执行命令,并捕获标准输出和标准错误
process = subprocess.Popen(
cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# 等待进程完成并获取输出,设置超时时间
out, error = process.communicate(timeout=self.timeout)
# 返回进程返回码和输出结果
return process.returncode, out.strip(), error.strip()
except subprocess.TimeoutExpired:
# 处理命令执行超时情况
logger.error(f"命令执行超时: {cmd}")
process.kill() # 终止超时进程
return 1, "", "执行超时"
except Exception as e:
# 处理其他异常情况
logger.error(f"命令执行错误: {cmd}, {e}")
return 1, "", str(e)
def get_image_list(self, file_path: str) -> Set[str]:
"""
从文件中读取Docker镜像列表
Args:
file_path: 包含镜像列表的文件路径
Returns:
唯一镜像名称集合
Raises:
FileNotFoundError: 文件不存在时抛出
IOError: 文件读取错误时抛出
"""
# 检查文件是否存在
if not os.path.exists(file_path):
raise FileNotFoundError(f"镜像列表文件未找到: {file_path}")
images = set() # 使用集合存储唯一的镜像名称
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip() # 去除行首尾空白字符
# 忽略空行和注释行
if line and not line.startswith("#"):
images.add(line) # 添加到镜像集合
except IOError as e:
# 处理文件读取错误
raise IOError(f"读取文件 {file_path} 失败: {e}")
return images
def verify_image(self, image: str) -> bool:
"""
验证镜像是否成功拉取
Args:
image: 镜像名称
Returns:
验证成功返回True,否则返回False
"""
# 执行docker images命令检查镜像是否存在
cmd = f"docker images --format '{{{{.Repository}}}}:{{{{.Tag}}}}' | grep '{image}'"
r, out, _ = self.run_cmd(cmd)
return r == 0 and out.strip() != ""
def pull_single_image(self, image: str) -> Tuple[bool, str]:
"""
拉取单个Docker镜像,支持多次重试
Args:
image: 要拉取的镜像名称
Returns:
元组: (是否成功, 输出信息)
"""
# 记录开始时间
start_time = time.time()
# 尝试多次拉取镜像
for attempt in range(self.retry_count + 1):
logger.info(f"正在拉取 {image} (第 {attempt + 1} 次尝试)")
# 执行docker pull命令
r, out, error = self.run_cmd(f"docker pull {image}")
if r == 0:
# 拉取成功
elapsed = time.time() - start_time
logger.info(f"✓ 成功: {image} (耗时: {elapsed:.2f}秒)")
# 验证镜像是否真的拉取成功
if self.verify_image(image):
return True, out
else:
logger.warning(f"镜像验证失败: {image}")
else:
# 拉取失败,记录错误信息
logger.warning(f"第 {attempt + 1} 次尝试失败: {image}: {error}")
# 如果不是最后一次尝试,等待一段时间后重试
if attempt < self.retry_count:
time.sleep(2) # 失败后等待2秒再重试
# 所有尝试都失败
elapsed = time.time() - start_time
logger.error(f"✗ 失败: {image} 经过 {self.retry_count + 1} 次尝试 (耗时: {elapsed:.2f}秒)")
return False, error
def format_time(self, seconds: float) -> str:
"""
格式化时间,将秒数转换为更易读的格式
Args:
seconds: 秒数
Returns:
格式化后的时间字符串
"""
# 将秒数转换为更易读的时间格式
if seconds < 60:
return f"{seconds:.1f}秒"
elif seconds < 3600:
return f"{seconds/60:.1f}分钟"
else:
return f"{seconds/3600:.1f}小时"
def display_progress(self, completed: int, total: int, success: int, failed: int) -> None:
"""
显示当前进度信息
Args:
completed: 已完成的任务数
total: 总任务数
success: 成功任务数
failed: 失败任务数
"""
# 计算已用时间和预估剩余时间
elapsed = time.time() - self.start_time
progress = completed / total if total > 0 else 0
# 计算预估剩余时间
remaining = (elapsed / progress) * (1 - progress) if progress > 0 else 0
# 构建进度条
bar_length = 30
filled_length = int(bar_length * progress)
bar = '█' * filled_length + '░' * (bar_length - filled_length)
# 显示进度信息
logger.info(
f"进度: [{bar}] {completed}/{total} ({progress*100:.1f}%) "
f"成功: {success} 失败: {failed} "
f"已用时间: {self.format_time(elapsed)} "
f"预计剩余: {self.format_time(remaining)}"
)
def batch_pull(self, file_path: str) -> Tuple[int, int]:
"""
批量拉取Docker镜像
Args:
file_path: 包含镜像列表的文件路径
Returns:
元组: (成功数量, 失败数量)
"""
try:
# 获取镜像列表
images = self.get_image_list(file_path)
except Exception as e:
# 处理获取镜像列表时的错误
logger.error(str(e))
return 0, 0
# 检查镜像列表是否为空
if not images:
logger.warning("未找到要拉取的镜像")
return 0, 0
# 记录开始时间和镜像数量
self.start_time = time.time()
image_list = list(images) # 将集合转换为列表,便于索引访问
total_images = len(image_list)
logger.info(f"找到 {total_images} 个唯一镜像,使用 {self.max_workers} 个工作进程开始拉取...")
# 初始化计数器
success_count = 0
failed_count = 0
completed_count = 0
# 创建进程池进行并行处理
with multiprocessing.Pool(processes=self.max_workers) as pool:
# 异步提交所有拉取任务
results = [pool.apply_async(self.pull_single_image, (img,)) for img in image_list]
# 处理每个任务的结果
for i, (image, result) in enumerate(zip(image_list, results)):
try:
# 获取任务结果,设置超时时间
success, msg = result.get(timeout=self.timeout)
# 更新计数器
if success:
success_count += 1
else:
failed_count += 1
completed_count += 1
# 存储结果
self.results[image] = success
# 每完成一个任务显示一次进度
self.display_progress(completed_count, total_images, success_count, failed_count)
except multiprocessing.TimeoutError:
# 处理任务超时
logger.error(f"超时: {image}")
failed_count += 1
completed_count += 1
self.results[image] = False
except Exception as e:
# 处理其他异常
logger.error(f"处理 {image} 时出错: {e}")
failed_count += 1
completed_count += 1
self.results[image] = False
# 计算总耗时
total_time = time.time() - self.start_time
# 显示最终结果
logger.info(f"任务完成: 成功 {success_count} 个, 失败 {failed_count} 个, 总耗时: {self.format_time(total_time)}")
# 如果有失败的镜像,显示失败列表
if failed_count > 0:
failed_images = [img for img, success in self.results.items() if not success]
logger.info(f"失败镜像列表: {', '.join(failed_images)}")
# 将失败的镜像写入文件,方便后续重试
failed_file = f"failed_images_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
try:
with open(failed_file, 'w', encoding='utf-8') as f:
for img in failed_images:
f.write(f"{img}\n")
logger.info(f"已将失败镜像列表写入文件: {failed_file}")
except Exception as e:
logger.error(f"写入失败镜像列表时出错: {e}")
return success_count, failed_count
def main():
"""主函数,处理命令行参数并执行批量拉取操作"""
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description='批量拉取Docker镜像工具')
# 添加命令行参数
parser.add_argument('--file', '-f', default='image.txt',
help='镜像列表文件路径 (默认: image.txt)')
parser.add_argument('--workers', '-w', type=int,
help=f'并行工作进程数 (默认: CPU核心数,最大4)')
parser.add_argument('--retry', '-r', type=int, default=2,
help='失败重试次数 (默认: 2)')
parser.add_argument('--timeout', '-t', type=int, default=1800,
help='单个镜像拉取超时时间(秒) (默认: 1800)')
parser.add_argument('--verbose', '-v', action='store_true',
help='显示详细日志')
parser.add_argument('--verify', action='store_true',
help='拉取后验证镜像是否存在')
# 解析命令行参数
args = parser.parse_args()
# 如果指定了详细模式,设置日志级别为DEBUG
if args.verbose:
logger.setLevel(logging.DEBUG)
logger.debug("已启用详细日志模式")
# 创建Docker拉取管理器实例
manager = DockerPullManager(
max_workers=args.workers, # 设置工作进程数
retry_count=args.retry, # 设置重试次数
timeout=args.timeout # 设置超时时间
)
# 执行批量拉取操作
success, failed = manager.batch_pull(args.file)
# 根据失败数量设置退出码
sys.exit(1 if failed > 0 else 0)
# 当脚本直接运行时执行main函数
if __name__ == '__main__':
main()
批量拉取镜像 shell版本
#!/bin/bash
# 批量Docker镜像拉取工具
# 按批次顺序拉取镜像,每次拉取10个,成功后继续下一批
set -euo pipefail
# 默认参数
BATCH_SIZE=10
RETRY_COUNT=2
TIMEOUT=1800
VERBOSE=false
SCRIPT_NAME=$(basename "$0")
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 全局变量
TOTAL_SUCCESS=0
TOTAL_FAILED=0
FAILED_IMAGES=()
START_TIME=$(date +%s)
# 日志函数
log_info() {
echo -e "${BLUE}[$(date '+%Y-%m-%d %H:%M:%S')] [INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[$(date '+%Y-%m-%d %H:%M:%S')] [SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[$(date '+%Y-%m-%d %H:%M:%S')] [WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[$(date '+%Y-%m-%d %H:%M:%S')] [ERROR]${NC} $1"
}
log_debug() {
if [[ "$VERBOSE" == "true" ]]; then
echo -e "${BLUE}[$(date '+%Y-%m-%d %H:%M:%S')] [DEBUG]${NC} $1"
fi
}
# 显示帮助信息
show_help() {
cat << EOF
批量Docker镜像拉取工具
用法: $SCRIPT_NAME [选项] 镜像列表文件
参数:
镜像列表文件 包含Docker镜像列表的文件路径
选项:
-b, --batch-size 每批拉取的镜像数量 (默认: 10)
-r, --retry 失败重试次数 (默认: 2)
-t, --timeout 单个镜像拉取超时时间(秒) (默认: 1800)
-v, --verbose 显示详细日志
-h, --help 显示此帮助信息
镜像列表文件格式:
每行一个镜像名称,支持注释行(以#开头)和空行
示例:
$SCRIPT_NAME images.txt
$SCRIPT_NAME -b 5 -r 3 images.txt
$SCRIPT_NAME -v images.txt
EOF
}
# 格式化时间
format_time() {
local seconds=$1
if [[ $seconds -lt 60 ]]; then
echo "${seconds}秒"
elif [[ $seconds -lt 3600 ]]; then
echo "$(echo "scale=1; $seconds/60" | bc)分钟"
else
echo "$(echo "scale=1; $seconds/3600" | bc)小时"
fi
}
# 检查文件是否存在
check_file_exists() {
local file_path=$1
if [[ ! -f "$file_path" ]]; then
log_error "镜像列表文件未找到: $file_path"
exit 1
fi
}
# 读取镜像列表
read_image_list() {
local file_path=$1
check_file_exists "$file_path"
log_debug "开始读取文件: $file_path"
# 使用grep和sed过滤非空行和非注释行,然后输出去重后的结果
grep -v '^[[:space:]]*$' "$file_path" | grep -v '^[[:space:]]*#' | sed 's/^[[:space:]]*//;s/[[:space:]]*$//' | sort -u
log_info "从 $file_path 读取镜像列表完成"
}
# 验证镜像是否成功拉取
verify_image() {
local image=$1
local result
result=$(docker images --format '{{.Repository}}:{{.Tag}}' | grep -E "^${image}$" || true)
if [[ -n "$result" ]]; then
return 0
else
return 1
fi
}
# 拉取单个镜像
pull_single_image() {
local image=$1
local start_time=$(date +%s)
local attempt=0
local max_attempts=$((RETRY_COUNT + 1))
while [[ $attempt -lt $max_attempts ]]; do
((attempt++))
log_info "正在拉取 ${image} (第 ${attempt} 次尝试)"
# 使用timeout命令设置超时
local pull_output
local pull_error
local pull_exit_code
if pull_output=$(timeout "${TIMEOUT}" docker pull "$image" 2>&1); then
pull_exit_code=0
pull_error=""
else
pull_exit_code=$?
pull_error="$pull_output"
pull_output=""
fi
if [[ $pull_exit_code -eq 0 ]]; then
local elapsed=$(($(date +%s) - start_time))
log_success "成功拉取 ${image} (耗时: ${elapsed}秒)"
# 验证镜像是否真的拉取成功
if verify_image "$image"; then
return 0
else
log_warning "镜像验证失败: $image"
fi
else
log_warning "第 ${attempt} 次尝试失败: $image"
if [[ -n "$pull_error" ]]; then
log_warning "错误信息: $pull_error"
fi
fi
# 如果不是最后一次尝试,等待后重试
if [[ $attempt -lt $max_attempts ]]; then
log_info "等待2秒后重试..."
sleep 2
fi
done
# 所有尝试都失败
local elapsed=$(($(date +%s) - start_time))
log_error "失败: $image 经过 ${RETRY_COUNT} 次尝试 (耗时: ${elapsed}秒)"
return 1
}
# 拉取一批镜像
pull_batch() {
local -n batch_images_ref=$1
local batch_num=$2
local total_batches=$3
local batch_start_time=$(date +%s)
local success_count=0
local failed_count=0
local batch_size=${#batch_images_ref[@]}
echo
log_info "========================================"
log_info "开始第 ${batch_num}/${total_batches} 批次拉取 (${batch_size} 个镜像)"
log_info "========================================"
echo
for i in "${!batch_images_ref[@]}"; do
local image="${batch_images_ref[$i]}"
local image_num=$((i + 1))
echo
log_info "[${batch_num}/${total_batches}] 镜像 ${image_num}/${batch_size}: ${image}"
if pull_single_image "$image"; then
((success_count++))
else
((failed_count++))
FAILED_IMAGES+=("$image")
fi
done
local batch_elapsed=$(($(date +%s) - batch_start_time))
echo
log_info "第 ${batch_num} 批次完成:"
log_info " 成功: ${success_count} 个"
log_info " 失败: ${failed_count} 个"
log_info " 耗时: ${batch_elapsed} 秒"
echo "$success_count $failed_count"
}
# 显示最终结果
show_final_results() {
local total_images=$1
local total_time=$(($(date +%s) - START_TIME))
echo
log_info "========================================"
log_info "批量拉取任务完成!"
log_info "========================================"
log_info "总镜像数: $total_images"
log_info "成功拉取: $TOTAL_SUCCESS"
log_info "拉取失败: $TOTAL_FAILED"
local success_rate
if [[ $total_images -gt 0 ]]; then
success_rate=$(echo "scale=1; $TOTAL_SUCCESS * 100 / $total_images" | bc)
else
success_rate="0.0"
fi
log_info "成功率: ${success_rate}%"
log_info "总耗时: $(format_time $total_time)"
log_info "结束时间: $(date '+%Y-%m-%d %H:%M:%S')"
if [[ ${#FAILED_IMAGES[@]} -gt 0 ]]; then
echo
log_info "失败的镜像列表:"
for i in "${!FAILED_IMAGES[@]}"; do
log_info " $((i + 1)). ${FAILED_IMAGES[$i]}"
done
fi
}
# 主函数
main() {
local file_path=""
# 解析命令行参数
while [[ $# -gt 0 ]]; do
case $1 in
-b|--batch-size)
BATCH_SIZE="$2"
if ! [[ "$BATCH_SIZE" =~ ^[1-9][0-9]*$ ]]; then
log_error "批次大小必须是正整数"
exit 1
fi
shift 2
;;
-r|--retry)
RETRY_COUNT="$2"
if ! [[ "$RETRY_COUNT" =~ ^[0-9]+$ ]]; then
log_error "重试次数必须是非负整数"
exit 1
fi
shift 2
;;
-t|--timeout)
TIMEOUT="$2"
if ! [[ "$TIMEOUT" =~ ^[1-9][0-9]*$ ]]; then
log_error "超时时间必须是正整数"
exit 1
fi
shift 2
;;
-v|--verbose)
VERBOSE=true
shift
;;
-h|--help)
show_help
exit 0
;;
*)
if [[ -z "$file_path" ]]; then
file_path="$1"
else
log_error "未知参数: $1"
show_help
exit 1
fi
shift
;;
esac
done
if [[ -z "$file_path" ]]; then
log_error "请提供镜像列表文件路径"
show_help
exit 1
fi
# 检查依赖
if ! command -v docker &> /dev/null; then
log_error "Docker 未安装或未在PATH中"
exit 1
fi
if ! command -v bc &> /dev/null; then
log_error "bc 计算器未安装,请安装 bc 工具"
exit 1
fi
# 读取镜像列表
local images=()
# 直接从文件读取镜像列表
while IFS= read -r line; do
# 去除前后空格
line=$(echo "$line" | sed 's/^[[:space:]]*//;s/[[:space:]]*$//')
# 忽略空行和注释行
if [[ -n "$line" && ! "$line" =~ ^[[:space:]]*# ]]; then
images+=("$line")
fi
done < "$file_path"
if [[ ${#images[@]} -eq 0 ]]; then
log_warning "没有找到要拉取的镜像"
exit 0
fi
log_info "从 $file_path 读取到 ${#images[@]} 个镜像"
# 计算批次数量
local total_images=${#images[@]}
local total_batches=$(( (total_images + BATCH_SIZE - 1) / BATCH_SIZE ))
echo
log_info "开始批量拉取任务:"
log_info " 总镜像数: $total_images"
log_info " 批次大小: $BATCH_SIZE"
log_info " 总批次数: $total_batches"
log_info " 开始时间: $(date '+%Y-%m-%d %H:%M:%S')"
# 按批次处理镜像
for ((batch_num = 1; batch_num <= total_batches; batch_num++)); do
local start_idx=$(((batch_num - 1) * BATCH_SIZE))
local end_idx=$((start_idx + BATCH_SIZE))
if [[ $end_idx -gt $total_images ]]; then
end_idx=$total_images
fi
# 提取当前批次的镜像
local batch_images=("${images[@]:$start_idx:$((end_idx - start_idx))}")
# 拉取当前批次
local result
result=$(pull_batch batch_images "$batch_num" "$total_batches")
read -r success failed <<< "$result"
TOTAL_SUCCESS=$((TOTAL_SUCCESS + success))
TOTAL_FAILED=$((TOTAL_FAILED + failed))
# 如果不是最后一批,显示进度
if [[ $batch_num -lt $total_batches ]]; then
local completed_images=$((batch_num * BATCH_SIZE))
local progress
progress=$(echo "scale=1; $completed_images * 100 / $total_images" | bc)
local elapsed=$(($(date +%s) - START_TIME))
echo
log_info "当前进度: ${completed_images}/${total_images} (${progress}%)"
log_info "累计成功: $TOTAL_SUCCESS, 累计失败: $TOTAL_FAILED"
log_info "已用时间: $(format_time $elapsed)"
log_info "准备开始下一批次..."
sleep 1 # 批次间短暂休息
fi
done
# 显示最终结果
show_final_results "$total_images"
}
# 捕获中断信号
trap 'log_info "用户中断操作"; exit 1' INT TERM
# 运行主函数
main "$@"