在 Web 开发中,在线视频播放是教育平台、自媒体网站、企业培训系统的核心功能之一。Python 凭借其简洁的语法、丰富的生态库,能快速实现视频上传、转码、权限控制、流式播放等核心需求。本文将基于 FastAPI(高性能异步框架)+ FFmpeg + Video.js 技术栈,从架构设计到代码落地,带你搭建一套稳定、高效的 Python 在线视频播放系统。
一、技术选型
1. 核心技术栈
后端框架:FastAPI 0.100+(异步高性能、自动生成 API 文档、类型提示友好)
视频存储:
本地存储(开发 / 测试环境)
阿里云 OSS / 腾讯云 COS(生产环境,高可用、高并发、CDN 兼容)
视频转码:FFmpeg(工业级视频处理工具,支持转码为 HLS 流式格式)
前端播放:Video.js(跨浏览器兼容,支持 HLS 协议,自定义程度高)
权限控制:JWT(JSON Web Token,无状态身份验证,防止视频盗播)
流式传输协议:HLS(HTTP Live Streaming,基于切片的流式传输,适配移动端 / 浏览器)
辅助工具:
Redis(缓存视频状态、JWT 黑名单、访问频率限制)
Celery(分布式任务队列,处理异步视频转码)
RabbitMQ/Redis(Celery 消息代理,存储转码任务队列)
2. 技术选型说明
为什么选 FastAPI?
FastAPI 是 Python 生态中性能最优的异步 Web 框架,支持并发请求处理,适合视频流式传输等 I/O 密集型场景;同时自动生成 Swagger 文档,开发调试更高效。
为什么选 HLS 协议?
HLS 协议将视频切分为 5-10 秒的 .ts 切片文件,通过 .m3u8 索引文件管理,支持断点续传、自适应码率(根据网络状况切换清晰度),且基于 HTTP 传输,无需额外端口,兼容性覆盖 99% 以上的浏览器和移动端设备。
为什么用 Celery + FFmpeg?
视频转码是 CPU 密集型操作,同步处理会阻塞接口。Celery 作为分布式任务队列,可将转码任务异步化,配合 FFmpeg 实现视频格式转换、切片、水印等功能,是 Python 生态中最成熟的视频处理组合。
存储方案选择:
开发环境用本地存储简化配置,生产环境必须使用云存储(OSS/COS),避免本地磁盘容量限制和单点故障,且云存储可直接对接 CDN 加速,降低视频加载延迟。
二、系统架构设计
在线视频播放的核心流程为:视频上传 → 异步转码 → 存储 → 权限验证 → 流式传输 → 前端播放,架构如下:
plaintext
用户 → 前端(Video.js)→ CDN → 后端(FastAPI)→ 云存储/本地存储
↓
消息代理(RabbitMQ/Redis)
↓
异步任务(Celery)→ 转码服务(FFmpeg)
↓
视频切片(.m3u8 + .ts)
关键模块说明:
上传模块:接收前端上传的原始视频,保存到临时存储,触发异步转码任务。
转码模块:Celery worker 调用 FFmpeg,将视频转码为 HLS 格式(生成 .m3u8 索引 + .ts 切片)。
存储模块:转码完成后,将切片文件存储到目标存储(本地 / 云存储),更新视频状态。
权限模块:通过 JWT 验证用户身份,仅授权用户可获取视频切片。
传输模块:响应前端切片请求,支持流式传输和断点续传。
前端模块:用 Video.js 解析 .m3u8 文件,实现视频播放。
三、实战实现(分步骤)
步骤 1:环境准备
安装基础环境:
Python 3.9+(推荐 3.10)
FFmpeg:
Windows:下载 FFmpeg 安装包,配置环境变量(将 bin 目录添加到 PATH)。
Linux(Ubuntu/Debian):sudo apt update && sudo apt install ffmpeg。
验证:终端输入 ffmpeg -version,显示版本信息则安装成功。
消息代理:RabbitMQ(推荐)或 Redis(本文以 RabbitMQ 为例)
Ubuntu:sudo apt install rabbitmq-server,启动服务 sudo systemctl start rabbitmq-server。
Redis(缓存 + 可选消息代理):sudo apt install redis-server。
创建项目并安装依赖:
bash
运行
# 创建项目目录mkdirpython-video-player&&cdpython-video-player# 创建虚拟环境python-mvenv venv# 激活虚拟环境(Windows:venv\Scripts\activate;Linux/Mac:source venv/bin/activate)# 安装核心依赖pipinstallfastapi uvicorn celery[redis]redis python-multipart pyjwt python-dotenv aliyun-oss-python-sdk requests
步骤 2:项目结构设计
plaintext
python-video-player/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 入口文件
│ ├── config.py # 配置文件
│ ├── models/ # 数据模型(Pydantic + 数据库模型)
│ │ ├── __init__.py
│ │ ├── video.py # 视频相关模型
│ │ └── user.py # 用户相关模型
│ ├── api/ # API 路由
│ │ ├── __init__.py
│ │ ├── auth.py # 认证接口(登录/注册)
│ │ ├── video.py # 视频上传/播放/状态接口
│ │ └── dependencies.py # 依赖项(JWT 验证)
│ ├── services/ # 业务逻辑层
│ │ ├── __init__.py
│ │ ├── video_service.py # 视频上传/转码/存储逻辑
│ │ └── auth_service.py # 认证逻辑
│ ├── tasks/ # Celery 异步任务
│ │ ├── __init__.py
│ │ └── transcode_task.py # 视频转码任务
│ ├── utils/ # 工具类
│ │ ├── __init__.py
│ │ ├── jwt_util.py # JWT 生成/验证工具
│ │ └── oss_util.py # 云存储工具(阿里云 OSS)
│ └── storage/ # 本地存储目录(开发环境)
│ ├── original/ # 原始视频
│ └── transcode/ # 转码后切片
├── .env # 环境变量配置
├── celery_worker.py # Celery worker 启动文件
└── requirements.txt # 依赖清单
步骤 3:配置文件
3.1 环境变量配置(.env)
env
# 应用配置
APP_HOST=0.0.0.0
APP_PORT=8000
APP_DEBUG=True
SECRET_KEY=your-secret-key-32bytes-long-12345678 # 密钥(至少 32 位)
# 数据库配置(可选,存储视频元信息,本文用 SQLite 简化)
DATABASE_URL=sqlite:///./video.db
# 本地存储配置(开发环境)
LOCAL_UPLOAD_PATH=./app/storage/original/
LOCAL_TRANSCODE_PATH=./app/storage/transcode/
LOCAL_BASE_URL=http://localhost:8000/video/
# 阿里云 OSS 配置(生产环境)
OSS_ENDPOINT=oss-cn-beijing.aliyuncs.com
OSS_ACCESS_KEY_ID=你的AccessKeyId
OSS_ACCESS_KEY_SECRET=你的AccessKeySecret
OSS_BUCKET_NAME=你的BucketName
OSS_BASE_URL=https://你的BucketName.oss-cn-beijing.aliyuncs.com/video/
# JWT 配置
JWT_SECRET_KEY=${SECRET_KEY}
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60 # 令牌有效期 1 小时
# Celery 配置
CELERY_BROKER_URL=amqp://guest:guest@localhost:5672// # RabbitMQ 地址
CELERY_RESULT_BACKEND=redis://localhost:6379/0 # Redis 存储任务结果
# 转码配置
TRANSCODE_SLICE_DURATION=10 # 切片时长(秒)
TRANSCODE_RESOLUTIONS=720p:1280x720,480p:854x480,360p:640x360 # 多码率分辨率
TRANSCODE_BITRATE=720p:2000k,480p:1000k,360p:500k # 对应码率
3.2 配置文件(app/config.py)
python
运行
importosfromdotenvimportload_dotenvfromtypingimportDict,Optional# 加载环境变量load_dotenv()classSettings:# 应用配置APP_HOST:str=os.getenv("APP_HOST","0.0.0.0")APP_PORT:int=int(os.getenv("APP_PORT",8000))APP_DEBUG:bool=os.getenv("APP_DEBUG","True").lower()=="true"SECRET_KEY:str=os.getenv("SECRET_KEY")# 数据库配置DATABASE_URL:str=os.getenv("DATABASE_URL","sqlite:///./video.db")# 本地存储配置LOCAL_UPLOAD_PATH:str=os.getenv("LOCAL_UPLOAD_PATH")LOCAL_TRANSCODE_PATH:str=os.getenv("LOCAL_TRANSCODE_PATH")LOCAL_BASE_URL:str=os.getenv("LOCAL_BASE_URL")# 阿里云 OSS 配置OSS_ENDPOINT:Optional[str]=os.getenv("OSS_ENDPOINT")OSS_ACCESS_KEY_ID:Optional[str]=os.getenv("OSS_ACCESS_KEY_ID")OSS_ACCESS_KEY_SECRET:Optional[str]=os.getenv("OSS_ACCESS_KEY_SECRET")OSS_BUCKET_NAME:Optional[str]=os.getenv("OSS_BUCKET_NAME")OSS_BASE_URL:Optional[str]=os.getenv("OSS_BASE_URL")# JWT 配置JWT_SECRET_KEY:str=os.getenv("JWT_SECRET_KEY")JWT_ACCESS_TOKEN_EXPIRE_MINUTES:int=int(os.getenv("JWT_ACCESS_TOKEN_EXPIRE_MINUTES",60))# Celery 配置CELERY_BROKER_URL:str=os.getenv("CELERY_BROKER_URL")CELERY_RESULT_BACKEND:str=os.getenv("CELERY_RESULT_BACKEND")# 转码配置TRANSCODE_SLICE_DURATION:int=int(os.getenv("TRANSCODE_SLICE_DURATION",10))TRANSCODE_RESOLUTIONS:Dict[str,str]=dict(item.split(":")foriteminos.getenv("TRANSCODE_RESOLUTIONS").split(","))TRANSCODE_BITRATE:Dict[str,str]=dict(item.split(":")foriteminos.getenv("TRANSCODE_BITRATE").split(","))# 初始化存储目录def__init__(self):os.makedirs(self.LOCAL_UPLOAD_PATH,exist_ok=True)os.makedirs(self.LOCAL_TRANSCODE_PATH,exist_ok=True)# 实例化配置settings=Settings()
步骤 4:数据模型与数据库初始化
4.1 数据模型(app/models/video.py)
使用 SQLAlchemy 作为 ORM 工具,存储视频元信息:
python
运行
fromsqlalchemyimportColumn,Integer,String,ForeignKey,Enum,DateTimefromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportrelationshipfromdatetimeimportdatetimeimportenumimportuuidBase=declarative_base()# 视频状态枚举classVideoStatus(str,enum.Enum):PENDING="pending"# 待转码TRANSCODING="transcoding"# 转码中COMPLETED="completed"# 转码完成FAILED="failed"# 转码失败classVideo(Base):__tablename__="videos"id=Column(Integer,primary_key=True,index=True)video_id=Column(String(36),unique=True,index=True,default=lambda:str(uuid.uuid4()))# 视频唯一标识user_id=Column(Integer,ForeignKey("users.id"),nullable=False)# 上传用户IDtitle=Column(String(255),nullable=False)# 视频标题original_filename=Column(String(255),nullable=False)# 原始文件名file_size=Column(Integer,nullable=False)# 文件大小(字节)duration=< href="https://zhiq.zhaopin.com/moment/86380511">< href="https://zq.zhaopin.com/moment/86380398">< href="https://zq-mobile.zhaopin.com/moment/86380398">< href="https://zhiq.zhaopin.com/moment/86380398">Column(Integer,nullable=True)# 视频时长(秒)m3u8_url=Column(String(512),nullable=True)# HLS 索引文件地址status=Column(Enum(VideoStatus),default=VideoStatus.PENDING)# 视频状态created_at=Column(DateTime,default=datetime.utcnow)updated_at=Column(DateTime,default=datetime.utcnow,onupdate=datetime.utcnow)# 关联用户user=relationship("User",back_populates="videos")classUser(Base):__tablename__="users"id=Column(Integer,primary_key=True,index=True)username=Column(String(50),unique=True,index=True,nullable=False)email=Column(String(100),unique=True,index=True,nullable=False)hashed_password=Column(String(255),nullable=False)created_at=Column(DateTime,default=datetime.utcnow)# 关联视频videos=relationship("Video",back_populates="user")
4.2 数据库初始化(app/init.py)
python
运行
fromsqlalchemyimportcreate_enginefromsqlalchemy.ormimportsessionmakerfromapp.configimportsettingsfromapp.models.videoimportBase# 创建数据库引擎engine=create_engine(settings.DATABASE_URL,connect_args={"check_same_thread":False}if"sqlite"insettings.DATABASE_URLelse{})# 创建会话工厂SessionLocal=sessionmaker(autocommit=False,autoflush=False,bind=engine)# 创建数据库表defcreate_db_tables():Base.metadata.create_all(bind=engine)# 依赖项:获取数据库会话defget_db():db=SessionLocal()try:yielddbfinally:db.close()
步骤 5:核心工具类实现
5.1 JWT 工具类(app/utils/jwt_util.py)
python
运行
fromdatetimeimportdatetime,timedeltafromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfromapp.configimportsettings# 密码加密上下文pwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")defverify_password(plain_password:str,hashed_password:str)->bool:"""验证密码"""returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password:str)->str:"""生成密码哈希"""returnpwd_context.hash(password)defcreate_access_token(data:dict)->< href="https://zq.zhaopin.com/moment/86380339">< href="https://zq-mobile.zhaopin.com/moment/86380339">< href="https://zhiq.zhaopin.com/moment/86380339">< href="https://zq.zhaopin.com/moment/86380297">< href="https://zq-mobile.zhaopin.com/moment/86380297">str:"""生成 JWT 访问令牌"""to_encode=data.copy()expire=datetime.utcnow()+timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,settings.JWT_SECRET_KEY,algorithm="HS256")returnencoded_jwtdefdecode_access_token(token:str)->dict:"""解码 JWT 令牌"""try:payload=jwt.decode(token,settings.JWT_SECRET_KEY,algorithms=["HS256"])returnpayloadexceptJWTError:returnNone
5.2 云存储工具类(app/utils/oss_util.py)
以阿里云 OSS 为例,实现文件上传功能:
python
运行
fromaliyunsdkcore.clientimportAcsClientfromaliyunsdkcore.exceptionsimportClientException,ServerExceptionfromaliyunsdkoss.request.v20190517importPutObjectRequestimportosfromapp.configimportsettingsclassOSSUtil:def__init__(self):self.client=AcsClient(settings.OSS_ACCESS_KEY_ID,settings.OSS_ACCESS_KEY_SECRET,settings.OSS_ENDPOINT.split(".")[0].split("-")[-1]# 提取区域(如 beijing))self.bucket_name=settings.OSS_BUCKET_NAME self.base_url=settings.OSS_BASE_URLdefupload_file(self,local_file_path:str,oss_object_key:str)->str:"""
上传文件到 OSS
:param local_file_path: 本地文件路径
:param oss_object_key: OSS 存储路径(如 video/xxx/720p/index.m3u8)
:return: OSS 文件访问 URL
"""try:request=PutObjectRequest.PutObjectRequest()request.set_Bucket(self.bucket_name)request.set_Key(oss_object_key)# 读取文件内容withopen(local_file_path,"rb")asf:request.set_Content(f.read())# 发送请求self.client.do_action_with_exception(request)returnf"{self.base_url}{oss_object_key}"except(ClientException,ServerException)ase:raiseException(f"OSS 上传失败:{str(e)}")
步骤 6:异步转码任务(Celery)
6.1 Celery 初始化(app/tasks/init.py)
python
运行
fromceleryimportCeleryfromapp.configimportsettings# 初始化 Celerycelery_app=Celery("video_tasks",broker=settings.CELERY_BROKER_URL,backend=settings.CELERY_RESULT_BACKEND,include=["app.tasks.transcode_task"])# 配置 Celerycelery_app.conf.update(task_serializer="json",result_serializer="json",accept_content=["json"],timezone="Asia/Shanghai",enable_utc=True,)
6.2 转码任务实现(app/tasks/transcode_task.py)
python
运行
importosimportsubprocessfrompathlibimportPathfromapp.tasksimportcelery_appfromapp.configimportsettingsfromapp.models.videoimportVideo,VideoStatusfromappimportSessionLocalfromapp.utils.oss_utilimportOSSUtil# 初始化 OSS 工具(生产环境)oss_util=OSSUtil()ifsettings.OSS_ACCESS_KEY_IDelseNonedefget_video_duration(video_path:str)->int:"""获取视频时长(秒)"""cmd=["ffprobe","-v","error","-show_entries","format=duration","-of","default=noprint_wrappers=1:nokey=1",video_path]result=subprocess.run(cmd,capture_output=True,text=True)ifresult.returncode!=0:raiseException(f"获取视频时长失败:{result.stderr}")returnint(round(float(result.stdout.strip())))@celery_app.task(bind=True,max_retries=3)deftranscode_video(self,video_id:str):"""
视频转码任务:将原始视频转码为 HLS 格式(多码率)
:param video_id: 视频唯一标识
"""db=SessionLocal()try:# 1. 查询视频信息video=db.query(Video).filter(Video.video_id==video_id).first()ifnotvideo:raiseException(f"视频不存在:{video_id}")# 2. 更新视频状态为转码中video.status=VideoStatus.TRANSCODING db.commit()# 3. 构建路径original_ext=Path(video.original_filename).suffix original_video_path=os.path.join(settings.LOCAL_UPLOAD_PATH,f"{video_id}{original_ext}")transcode_root_path=os.path.join(settings.LOCAL_TRANSCODE_PATH,video_id)os.makedirs(transcode_root_path,exist_ok=True)# 4. 多码率转码(生成不同清晰度的切片)resolutions=< href="https://zhiq.zhaopin.com/moment/86380297">< href="https://zq.zhaopin.com/moment/86380246">< href="https://zq-mobile.zhaopin.com/moment/86380246">< href="https://zhiq.zhaopin.com/moment/86380246">< href="https://zq.zhaopin.com/moment/86380205">< href="https://zq-mobile.zhaopin.com/moment/86380205">settings.TRANSCODE_RESOLUTIONS bitrates=settings.TRANSCODE_BITRATE slice_duration=settings.TRANSCODE_SLICE_DURATIONforquality,resolutioninresolutions.items():quality_path=os.path.join(transcode_root_path,quality)os.makedirs(quality_path,exist_ok=True)# FFmpeg 转码命令cmd=["ffmpeg","-i",original_video_path,"-c:v","h264",# 视频编码(H.264 兼容性最好)"-c:a","aac",# 音频编码"-s",resolution,# 分辨率"-b:v",bitrates[quality],# 视频码率"-hls_time",str(slice_duration),# 切片时长"-hls_list_size","0",# 保留所有切片"-hls_segment_filename",os.path.join(quality_path,"%03d.ts"),# 切片文件名os.path.join(quality_path,"index.m3u8")# 索引文件]# 执行转码命令result=subprocess.run(cmd,capture_output=True,text=True)ifresult.returncode!=0:raiseException(f"转码失败({quality}):{result.stderr}")# 5. 生成主索引文件(包含所有码率,支持自适应切换)master_m3u8_path=os.path.join(transcode_root_path,"master.m3u8")withopen(master_m3u8_path,"w")asf:f.write("#EXTM3U\n#EXT-X-VERSION:3\n")forquality,resolutioninresolutions.items():bandwidth=int(bitrates[quality].replace("k",""))*1000# 带宽(bps)f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={bandwidth},RESOLUTION={resolution}\n")f.write(f"{quality}/index.m3u8\n")# 6. 上传到云存储(生产环境)ifoss_util:# 递归上传所有转码文件forroot,dirs,filesinos.walk(transcode_root_path):forfileinfiles:local_file=os.path.join(root,file)# 构建 OSS 存储路径(相对路径)relative_path=os.path.relpath(local_file,settings.LOCAL_TRANSCODE_PATH)oss_object_key=relative_path.replace("\\","/")# 统一路径分隔符oss_util.upload_file(local_file,oss_object_key)# 更新 m3u8 地址(云存储)m3u8_url=f"{settings.OSS_BASE_URL}{video_id}/master.m3u8"else:# 本地存储地址m3u8_url=f"{settings.LOCAL_BASE_URL}{video_id}/master.m3u8"# 7. 更新视频信息(时长、状态、m3u8 地址)video.duration=get_video_duration(original_video_path)video.status=VideoStatus.COMPLETED video.m3u8_url=m3u8_url db.commit()return{"status":"success","m3u8_url":m3u8_url}exceptExceptionase:# 转码失败,更新状态并重试ifvideo:video.status=VideoStatus.FAILED db.commit()self.retry(exc=e,countdown=60)# 60 秒后重试finally:db.close()
6.3 Celery Worker 启动文件(celery_worker.py)
python
运行
fromapp.tasksimportcelery_appif__name__=="__main__":celery_app.worker_main()
步骤 7:FastAPI 接口实现
7.1 认证依赖(app/api/dependencies.py)
python
运行
fromfastapiimportDepends,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearerfromjoseimportJWTErrorfromapp.utils.jwt_utilimportdecode_access_tokenfromappimportget_dbfromsqlalchemy.ormimportSessionfromapp.models.videoimportUser# 定义 OAuth2 令牌提取器oauth2_scheme=OAuth2PasswordBearer(tokenUrl="/api/auth/login")defget_current_user(token:str=Depends(oauth2_scheme),db:Session=Depends(get_db))->User:"""获取当前登录用户(依赖项)"""credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="无效的令牌或令牌已过期",headers={"WWW-Authenticate":"Bearer"},)# 解码令牌payload=decode_access_token(token)ifnotpayload:raisecredentials_exception# 提取用户名username:str=payload.get("sub")ifnotusername:raisecredentials_exception# 查询用户user=db.query(User).filter(User.username==username).first()ifnotuser:raisecredentials_exceptionreturnuser
7.2 认证接口(app/api/auth.py)
python
运行
fromfastapiimportAPIRouter,Depends,HTTPException,statusfromfastapi.securityimportOAuth2PasswordRequestFormfromsqlalchemy.ormimportSessionfromappimportget_dbfromapp.models.videoimportUserfromapp.utils.jwt_utilimportverify_password,get_password_hash,create_access_tokenrouter=APIRouter(prefix="/api/auth",tags=["认证"])@router.post("/register",summary="用户注册")defregister(username:str,email:str,password:str,db:Session=Depends(get_db)):# 检查用户名/邮箱是否已存在ifdb.query(User).filter(User.username==username).first():raiseHTTPException(status_code=400,detail="用户名已存在")ifdb.query(User).filter(User.email==email).first():raiseHTTPException(status_code=400,detail="邮箱已存在")# 创建用户hashed_password=get_password_hash(password)user=User(username=username,email=email,hashed_password=hashed_password)db.add(user)db.commit()db.refresh(user)return{"message":"注册成功","user_id":user.id}@router.post("/login",summary="用户登录(获取 JWT 令牌)")deflogin(form_data:OAuth2PasswordRequestForm=Depends(),db:Session=Depends(get_db)):# 查询用户user=db.query(User).filter(User.username==form_data.username).first()# 验证密码ifnotuserornotverify_password(form_data.password,user.hashed_password):raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="用户名或密码错误",headers={"WWW-Authenticate":"Bearer"},)# 生成令牌access_token=create_access_token(data={"sub":user.username})return{"access_token":access_token,"token_type":"bearer","user_id":user.id,"username":user.username}
7.3 视频接口(app/api/video.py)
python
运行
fromfastapiimportAPIRouter,Depends,File,UploadFile,HTTPException,Requestfromfastapi.responsesimportFileResponse,StreamingResponsefromsqlalchemy.ormimportSessionimportosfrompathlibimportPathfromappimportget_dbfromapp.models.videoimportVideo,VideoStatusfromapp.api.dependenciesimportget_current_userfromapp.models.videoimportUserfromapp.tasks.transcode_taskimporttranscode_videofromapp.configimportsettingsrouter=APIRouter(prefix="/api/video",tags=["视频"])@router.post("/upload",summary="视频上传")defupload_video(title:str,file:UploadFile=File(...),current_user:User=Depends(get_current_user),db:Session=Depends(get_db)):# 验证文件类型allowed_extensions={".mp4",".mov",".avi",".mkv"}file_ext=Path(file.filename).suffix.lower()iffile_extnotinallowed_extensions:raiseHTTPException(status_code=400,detail="仅支持 MP4、MOV、AVI、MKV 格式")# 验证文件大小(最大 1GB)file_size=file.file.seek(0,2)file.file.seek(0)# 重置文件指针iffile_size>1024*1024*1024:raiseHTTPException(status_code=400,detail="文件大小不能超过 1GB")# 保存原始视频到本地video_id=str(Path(file.filename).stem)+"_"+str(os.urandom(4).hex())# 生成唯一标识original_video_path=os.path.join(settings.LOCAL_UPLOAD_PATH,f"{video_id}{file_ext}")withopen(original_video_path,"wb")asf:f.write(file.file.read())# 存入数据库video=Video(user_id=current_user.id,title=title,original_filename=file.filename,file_size=file_size,video_id=video_id)db.add(video)db.commit()db.refresh(video)# 触发异步转码任务transcode_video.delay(video_id)return{"message":"视频上传成功,转码中...","video_id":video_id,"status":video.status.value}@router.get("/status/{video_id}",summary="查询视频状态")defget_video_status(video_id:str,current_user:User=Depends(get_current_user),db:Session=Depends(get_db)):video=db.query(Video).filter(Video.video_id==video_id).first()ifnotvideo:raiseHTTPException(status_code=404,detail="视频不存在")# 验证权限(仅上传者可查询)ifvideo.user_id!=current_user.id:raiseHTTPException(status_code=403,detail="无权限访问该视频")return{"video_id":video.video_id,"title":video.title,"status":video.status.value,"duration":video.duration,"m3u8_url":video.m3u8_url,"created_at":video.created_at}@router.get("/play/{video_id}/{quality}/{filename}",summary="播放视频切片(.m3u8 或 .ts)")defplay_video_slice(video_id:str,quality:str,filename:str,request:Request,current_user:User=Depends(get_current_user),db:Session=Depends(get_db)):# 1. 验证视频存在性和权限video=db.query(Video).filter(Video.video_id==video_id).first()ifnotvideo:raiseHTTPException(status_code=404,detail="视频不存在")ifvideo.user_id!=current_user.id:raiseHTTPException(status_code=403,detail="无权限播放该视频")ifvideo.status!=VideoStatus.COMPLETED:raiseHTTPException(status_code=400,detail="视频转码中,暂无法播放")# 2. 验证清晰度是否支持ifqualitynotinsettings.TRANSCODE_RESOLUTIONS:raiseHTTPException(status_code=400,detail="不支持该清晰度")# 3. 本地存储读取(开发环境)ifnotsettings.OSS_ACCESS_KEY_ID:file_path=os.path.join(settings.LOCAL_TRANSCODE_PATH,video_id,quality,filename)ifnotos.path.exists(file_path):raiseHTTPException(status_code=404,detail="切片文件不存在")# 流式响应配置(支持断点续传)defstream_file():withopen(file_path,"rb")asf:whilechunk:=f.read(1024*1024):# 1MB 分块传输yieldchunk# 设置响应头headers={"Accept-Ranges":"bytes","Content-Length":str(os.path.getsize(file_path))}iffilename.endswith(".m3u8"):headers["Content-Type"]="application/x-mpegURL"eliffilename.endswith(".ts"):headers["Content-Type"]="video/MP2T"returnStreamingResponse(stream_file(),headers=headers)# 4. 生产环境:重定向到云存储 CDN 地址cdn_url=f"{settings.OSS_BASE_URL}{video_id}/{quality}/{filename}"return{"redirect_url":cdn_url}
步骤 8:FastAPI 入口文件(app/main.py)
python
运行
fromfastapiimportFastAPIfromfastapi.middleware.corsimportCORSMiddlewarefromappimportcreate_db_tablesfromapp.api.authimportrouterasauth_routerfromapp.api.videoimportrouterasvideo_routerfromapp.configimportsettings# 创建数据库表create_db_tables()# 初始化 FastAPI 应用app=FastAPI(title="Python 在线视频播放系统",description="基于 FastAPI + FFmpeg + Celery 实现的视频上传、转码、流式播放系统",version="1.0.0")# 配置 CORS(跨域资源共享)app.add_middleware(CORSMiddleware,allow_origins=["*"],# 生产环境替换为具体域名allow_credentials=True,allow_methods=["*"],allow_headers=["*"],)# 注册路由app.include_router(auth_router)app.include_router(video_router)# 根路由@app.get("/",summary="健康检查")defroot():return{"message":"Python 在线视频播放系统运行中","version":"1.0.0"}
步骤 9:前端播放实现(Video.js + HLS)
创建 index.html 文件,实现前端视频播放:
html
预览
<!DOCTYPEhtml><htmllang="zh-CN"><head><metacharset="UTF-8"><title>Python 在线视频播放</title><!-- 引入 Video.js 样式和脚本 --><linkhref="https://vjs.zencdn.net/8.6.1/video-js.css"rel="stylesheet"><scriptsrc="https://vjs.zencdn.net/8.6.1/video.min.js"></script><!-- 引入 HLS 插件(支持 HLS 协议) --><scriptsrc="https://cdn.jsdelivr.net/npm/videojs-contrib-hls@5.15.0/dist/videojs-contrib-hls.min.js"></script><style>.video-container{width:1280px;margin:50pxauto;}.controls{width:1280px;margin:20pxauto;text-align:center;}button{padding:10px20px;margin:010px;cursor:pointer;}</style></head><body><divclass="controls"><inputtype="text"id="videoId"placeholder="输入视频 ID"/><buttononclick="loadVideo()">加载视频</button></div><divclass="video-container"><videoid="video-player"class="video-js vjs-big-play-centered vjs-fluid"controlspreload="auto"poster="https://via.placeholder.com/1280x720?text=视频封面"><sourceid="video-source"src=""type="application/x-mpegURL">您的浏览器不支持 HTML5 视频播放</video></div><script>// 初始化 Video.jsconstplayer=videojs('video-player',{autoplay:false,controls:true,responsive:true,fluid:true,});// 加载视频functionloadVideo(){constvideoId=document.getElementById('videoId').value.trim();if(!videoId){alert('请输入视频 ID');return;}// 替换为实际的 m3u8 地址(从后端接口获取)consttoken="你的 JWT 令牌";// 登录后获取constm3u8Url=`http://localhost:8000/api/video/play/${videoId}/master.m3u8`;// 设置视频源player.src({src:m3u8Url,type:'application/x-mpegURL'});// 加载并播放player.load();player.play().catch(err=>{console.error('播放失败:',err);alert('播放失败,请检查视频 ID 或网络状态');});}// 监听播放错误player.on('error',function(){console.error('视频播放失败:',player.error());alert('视频播放失败,请联系管理员');});// 监听视频加载完成player.on('loadedmetadata',function(){console.log('视频时长:',player.duration());});</script></body></html>
步骤 10:启动系统
启动 RabbitMQ/Redis(确保消息代理已运行)。
启动 Celery Worker(处理转码任务):
bash
运行
celery-Acelery_worker worker--loglevel=info--queue=transcode
启动 FastAPI 应用:
bash
运行
uvicorn app.main:app--host0.0.0.0--port8000--reload
访问系统:
API 文档:http://localhost:8000/docs
前端播放页面:直接打开 index.html 文件,输入视频 ID 即可播放。
四、生产环境优化
1. 转码优化
多线程转码:通过 FFmpeg 的 -threads 参数设置转码线程数(如 -threads 4),利用多核 CPU 提升效率。
转码进度监控:在转码命令中添加 -progress pipe:1 参数,实时获取转码进度,存入 Redis,前端通过接口查询展示。
任务优先级:给 Celery 任务设置优先级(如 transcode_video.apply_async(args=[video_id], priority=10)),VIP 用户任务优先执行。
2. 存储与加速
CDN 全面集成:将云存储的视频文件接入 CDN,配置缓存策略(.m3u8 缓存 1 分钟,.ts 缓存 1 天),降低源站压力。
视频封面生成:转码时通过 FFmpeg 截取首帧作为封面(ffmpeg -i input.mp4 -ss 00:00:01 -vframes 1 cover.jpg),提升用户体验。
过期文件清理:使用 Celery 定时任务(celery beat)定期清理 30 天未访问的原始视频和转码文件,节省存储成本。
3. 防盗播增强
URL 签名:对视频访问 URL 添加签名和过期时间,防止恶意分享:
python
运行
importhashlibimporttimedefgenerate_signed_url(video_id:str,quality:str,filename:str,expire=3600):"""生成带签名的视频访问 URL"""timestamp=int(time.time())+expire secret=settings.SECRET_KEY sign_str=f"{video_id}_{quality}_{filename}_{timestamp}_{secret}"sign=hashlib.md5(sign_str.encode()).hexdigest()returnf"/api/video/play/{video_id}/{quality}/{filename}?t={timestamp}&sign={sign}"defverify_sign(video_id:str,quality:str,filename:str,timestamp:int,sign:str):"""验证签名"""iftime.time()>timestamp:returnFalsesecret=settings.SECRET_KEY sign_str=f"{video_id}_{quality}_{filename}_{timestamp}_{secret}"returnhashlib.md5(sign_str.encode()).hexdigest()==sign
Referer 白名单:通过 Nginx 配置限制仅指定域名可访问视频,阻止盗链:
nginx
location~* \.(m3u8|ts)${valid_referersnone blocked *.yourdomain.com;if($invalid_referer){return403;}}
水印添加:转码时通过 FFmpeg 添加文字 / 图片水印(ffmpeg -i input.mp4 -i watermark.png -filter_complex "overlay=10:10" output.mp4)。
4. 高可用设计
集群部署:FastAPI 应用集群部署,通过 Nginx 负载均衡;Celery Worker 多节点部署,提高转码并发能力。
熔断降级:使用 tenacity 库实现重试机制,结合 Redis 限流(如 slowapi),避免流量峰值压垮系统。
监控告警:通过 Prometheus + Grafana 监控系统资源、转码成功率、播放成功率;使用 Sentry 捕获异常并告警。
五、常见问题排查
视频上传失败:
检查 Python 配置(MAX_CONTENT_LENGTH)、Nginx 配置(client_max_body_size)是否支持大文件上传;确保存储目录有写入权限。
转码失败:
检查 FFmpeg 环境变量配置,终端执行转码命令调试;确认视频文件格式支持,文件路径无中文 / 特殊字符。
播放卡顿:
检查切片时长(建议 5-10 秒),CDN 是否生效;降低视频码率,增加低清晰度选项适配弱网络。
JWT 验证失败:
检查令牌是否过期,密钥是否一致;前端请求头是否正确携带 Authorization: Bearer {token}。
Celery 任务不执行:
检查消息代理(RabbitMQ/Redis)是否运行;确认 Celery Worker 已启动且监听正确队列。
六、总结
本文基于 Python + FastAPI + Celery + FFmpeg 实现了一套完整的在线视频播放系统,核心是通过异步转码生成 HLS 格式视频,结合 Video.js 实现跨端流式播放,并通过 JWT 保障权限安全。开发环境可快速验证功能,生产环境通过云存储、CDN、集群部署等方案优化性能和可用性。
该方案适用于中小型视频平台(如教育机构、企业培训系统、自媒体网站),如需支持超大规模并发(如千万级用户),可进一步接入专业视频云服务(如阿里云视频点播、腾讯云 VOD),简化转码、存储、分发等复杂流程,聚焦业务逻辑开发。