Python 实现在线视频播放完整方案:从基础到生产环境

在 Web 开发中,在线视频播放是教育平台、自媒体网站、企业培训系统的核心功能之一。Python 凭借其简洁的语法、丰富的生态库,能快速实现视频上传、转码、权限控制、流式播放等核心需求。本文将基于 FastAPI(高性能异步框架)+ FFmpeg + Video.js 技术栈,从架构设计到代码落地,带你搭建一套稳定、高效的 Python 在线视频播放系统。

一、技术选型

1. 核心技术栈

后端框架:FastAPI 0.100+(异步高性能、自动生成 API 文档、类型提示友好)

视频存储

本地存储(开发 / 测试环境)

阿里云 OSS / 腾讯云 COS(生产环境,高可用、高并发、CDN 兼容)

视频转码:FFmpeg(工业级视频处理工具,支持转码为 HLS 流式格式)

前端播放:Video.js(跨浏览器兼容,支持 HLS 协议,自定义程度高)

权限控制:JWT(JSON Web Token,无状态身份验证,防止视频盗播)

流式传输协议:HLS(HTTP Live Streaming,基于切片的流式传输,适配移动端 / 浏览器)

辅助工具

Redis(缓存视频状态、JWT 黑名单、访问频率限制)

Celery(分布式任务队列,处理异步视频转码)

RabbitMQ/Redis(Celery 消息代理,存储转码任务队列)

2. 技术选型说明

为什么选 FastAPI?

FastAPI 是 Python 生态中性能最优的异步 Web 框架,支持并发请求处理,适合视频流式传输等 I/O 密集型场景;同时自动生成 Swagger 文档,开发调试更高效。

为什么选 HLS 协议?

HLS 协议将视频切分为 5-10 秒的 .ts 切片文件,通过 .m3u8 索引文件管理,支持断点续传、自适应码率(根据网络状况切换清晰度),且基于 HTTP 传输,无需额外端口,兼容性覆盖 99% 以上的浏览器和移动端设备。

为什么用 Celery + FFmpeg?

视频转码是 CPU 密集型操作,同步处理会阻塞接口。Celery 作为分布式任务队列,可将转码任务异步化,配合 FFmpeg 实现视频格式转换、切片、水印等功能,是 Python 生态中最成熟的视频处理组合。

存储方案选择

开发环境用本地存储简化配置,生产环境必须使用云存储(OSS/COS),避免本地磁盘容量限制和单点故障,且云存储可直接对接 CDN 加速,降低视频加载延迟。

二、系统架构设计

在线视频播放的核心流程为:视频上传 → 异步转码 → 存储 → 权限验证 → 流式传输 → 前端播放,架构如下:

plaintext

用户 → 前端(Video.js)→ CDN → 后端(FastAPI)→ 云存储/本地存储

                                  ↓

                              消息代理(RabbitMQ/Redis)

                                  ↓

                              异步任务(Celery)→ 转码服务(FFmpeg)

                                  ↓

                              视频切片(.m3u8 + .ts)

关键模块说明:

上传模块:接收前端上传的原始视频,保存到临时存储,触发异步转码任务。

转码模块:Celery worker 调用 FFmpeg,将视频转码为 HLS 格式(生成 .m3u8 索引 + .ts 切片)。

存储模块:转码完成后,将切片文件存储到目标存储(本地 / 云存储),更新视频状态。

权限模块:通过 JWT 验证用户身份,仅授权用户可获取视频切片。

传输模块:响应前端切片请求,支持流式传输和断点续传。

前端模块:用 Video.js 解析 .m3u8 文件,实现视频播放。

三、实战实现(分步骤)

步骤 1:环境准备

安装基础环境

Python 3.9+(推荐 3.10)

FFmpeg:

Windows:下载 FFmpeg 安装包,配置环境变量(将 bin 目录添加到 PATH)。

Linux(Ubuntu/Debian):sudo apt update && sudo apt install ffmpeg。

验证:终端输入 ffmpeg -version,显示版本信息则安装成功。

消息代理:RabbitMQ(推荐)或 Redis(本文以 RabbitMQ 为例)

Ubuntu:sudo apt install rabbitmq-server,启动服务 sudo systemctl start rabbitmq-server。

Redis(缓存 + 可选消息代理):sudo apt install redis-server。

创建项目并安装依赖

bash

运行

# 创建项目目录mkdirpython-video-player&&cdpython-video-player# 创建虚拟环境python-mvenv venv# 激活虚拟环境(Windows:venv\Scripts\activate;Linux/Mac:source venv/bin/activate)# 安装核心依赖pipinstallfastapi uvicorn celery[redis]redis python-multipart pyjwt python-dotenv aliyun-oss-python-sdk requests

步骤 2:项目结构设计

plaintext

python-video-player/

├── app/

│  ├── __init__.py

│  ├── main.py          # FastAPI 入口文件

│  ├── config.py        # 配置文件

│  ├── models/          # 数据模型(Pydantic + 数据库模型)

│  │  ├── __init__.py

│  │  ├── video.py    # 视频相关模型

│  │  └── user.py      # 用户相关模型

│  ├── api/            # API 路由

│  │  ├── __init__.py

│  │  ├── auth.py      # 认证接口(登录/注册)

│  │  ├── video.py    # 视频上传/播放/状态接口

│  │  └── dependencies.py # 依赖项(JWT 验证)

│  ├── services/        # 业务逻辑层

│  │  ├── __init__.py

│  │  ├── video_service.py # 视频上传/转码/存储逻辑

│  │  └── auth_service.py  # 认证逻辑

│  ├── tasks/          # Celery 异步任务

│  │  ├── __init__.py

│  │  └── transcode_task.py # 视频转码任务

│  ├── utils/          # 工具类

│  │  ├── __init__.py

│  │  ├── jwt_util.py  # JWT 生成/验证工具

│  │  └── oss_util.py  # 云存储工具(阿里云 OSS)

│  └── storage/        # 本地存储目录(开发环境)

│      ├── original/    # 原始视频

│      └── transcode/  # 转码后切片

├── .env                # 环境变量配置

├── celery_worker.py    # Celery worker 启动文件

└── requirements.txt    # 依赖清单

步骤 3:配置文件

3.1 环境变量配置(.env)

env

# 应用配置

APP_HOST=0.0.0.0

APP_PORT=8000

APP_DEBUG=True

SECRET_KEY=your-secret-key-32bytes-long-12345678 # 密钥(至少 32 位)

# 数据库配置(可选,存储视频元信息,本文用 SQLite 简化)

DATABASE_URL=sqlite:///./video.db

# 本地存储配置(开发环境)

LOCAL_UPLOAD_PATH=./app/storage/original/

LOCAL_TRANSCODE_PATH=./app/storage/transcode/

LOCAL_BASE_URL=http://localhost:8000/video/

# 阿里云 OSS 配置(生产环境)

OSS_ENDPOINT=oss-cn-beijing.aliyuncs.com

OSS_ACCESS_KEY_ID=你的AccessKeyId

OSS_ACCESS_KEY_SECRET=你的AccessKeySecret

OSS_BUCKET_NAME=你的BucketName

OSS_BASE_URL=https://你的BucketName.oss-cn-beijing.aliyuncs.com/video/

# JWT 配置

JWT_SECRET_KEY=${SECRET_KEY}

JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60 # 令牌有效期 1 小时

# Celery 配置

CELERY_BROKER_URL=amqp://guest:guest@localhost:5672// # RabbitMQ 地址

CELERY_RESULT_BACKEND=redis://localhost:6379/0 # Redis 存储任务结果

# 转码配置

TRANSCODE_SLICE_DURATION=10 # 切片时长(秒)

TRANSCODE_RESOLUTIONS=720p:1280x720,480p:854x480,360p:640x360 # 多码率分辨率

TRANSCODE_BITRATE=720p:2000k,480p:1000k,360p:500k # 对应码率

3.2 配置文件(app/config.py)

python

运行

importosfromdotenvimportload_dotenvfromtypingimportDict,Optional# 加载环境变量load_dotenv()classSettings:# 应用配置APP_HOST:str=os.getenv("APP_HOST","0.0.0.0")APP_PORT:int=int(os.getenv("APP_PORT",8000))APP_DEBUG:bool=os.getenv("APP_DEBUG","True").lower()=="true"SECRET_KEY:str=os.getenv("SECRET_KEY")# 数据库配置DATABASE_URL:str=os.getenv("DATABASE_URL","sqlite:///./video.db")# 本地存储配置LOCAL_UPLOAD_PATH:str=os.getenv("LOCAL_UPLOAD_PATH")LOCAL_TRANSCODE_PATH:str=os.getenv("LOCAL_TRANSCODE_PATH")LOCAL_BASE_URL:str=os.getenv("LOCAL_BASE_URL")# 阿里云 OSS 配置OSS_ENDPOINT:Optional[str]=os.getenv("OSS_ENDPOINT")OSS_ACCESS_KEY_ID:Optional[str]=os.getenv("OSS_ACCESS_KEY_ID")OSS_ACCESS_KEY_SECRET:Optional[str]=os.getenv("OSS_ACCESS_KEY_SECRET")OSS_BUCKET_NAME:Optional[str]=os.getenv("OSS_BUCKET_NAME")OSS_BASE_URL:Optional[str]=os.getenv("OSS_BASE_URL")# JWT 配置JWT_SECRET_KEY:str=os.getenv("JWT_SECRET_KEY")JWT_ACCESS_TOKEN_EXPIRE_MINUTES:int=int(os.getenv("JWT_ACCESS_TOKEN_EXPIRE_MINUTES",60))# Celery 配置CELERY_BROKER_URL:str=os.getenv("CELERY_BROKER_URL")CELERY_RESULT_BACKEND:str=os.getenv("CELERY_RESULT_BACKEND")# 转码配置TRANSCODE_SLICE_DURATION:int=int(os.getenv("TRANSCODE_SLICE_DURATION",10))TRANSCODE_RESOLUTIONS:Dict[str,str]=dict(item.split(":")foriteminos.getenv("TRANSCODE_RESOLUTIONS").split(","))TRANSCODE_BITRATE:Dict[str,str]=dict(item.split(":")foriteminos.getenv("TRANSCODE_BITRATE").split(","))# 初始化存储目录def__init__(self):os.makedirs(self.LOCAL_UPLOAD_PATH,exist_ok=True)os.makedirs(self.LOCAL_TRANSCODE_PATH,exist_ok=True)# 实例化配置settings=Settings()

步骤 4:数据模型与数据库初始化

4.1 数据模型(app/models/video.py)

使用 SQLAlchemy 作为 ORM 工具,存储视频元信息:

python

运行

fromsqlalchemyimportColumn,Integer,String,ForeignKey,Enum,DateTimefromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportrelationshipfromdatetimeimportdatetimeimportenumimportuuidBase=declarative_base()# 视频状态枚举classVideoStatus(str,enum.Enum):PENDING="pending"# 待转码TRANSCODING="transcoding"# 转码中COMPLETED="completed"# 转码完成FAILED="failed"# 转码失败classVideo(Base):__tablename__="videos"id=Column(Integer,primary_key=True,index=True)video_id=Column(String(36),unique=True,index=True,default=lambda:str(uuid.uuid4()))# 视频唯一标识user_id=Column(Integer,ForeignKey("users.id"),nullable=False)# 上传用户IDtitle=Column(String(255),nullable=False)# 视频标题original_filename=Column(String(255),nullable=False)# 原始文件名file_size=Column(Integer,nullable=False)# 文件大小(字节)duration=< href="https://zhiq.zhaopin.com/moment/86380511">< href="https://zq.zhaopin.com/moment/86380398">< href="https://zq-mobile.zhaopin.com/moment/86380398">< href="https://zhiq.zhaopin.com/moment/86380398">Column(Integer,nullable=True)# 视频时长(秒)m3u8_url=Column(String(512),nullable=True)# HLS 索引文件地址status=Column(Enum(VideoStatus),default=VideoStatus.PENDING)# 视频状态created_at=Column(DateTime,default=datetime.utcnow)updated_at=Column(DateTime,default=datetime.utcnow,onupdate=datetime.utcnow)# 关联用户user=relationship("User",back_populates="videos")classUser(Base):__tablename__="users"id=Column(Integer,primary_key=True,index=True)username=Column(String(50),unique=True,index=True,nullable=False)email=Column(String(100),unique=True,index=True,nullable=False)hashed_password=Column(String(255),nullable=False)created_at=Column(DateTime,default=datetime.utcnow)# 关联视频videos=relationship("Video",back_populates="user")

4.2 数据库初始化(app/init.py)

python

运行

fromsqlalchemyimportcreate_enginefromsqlalchemy.ormimportsessionmakerfromapp.configimportsettingsfromapp.models.videoimportBase# 创建数据库引擎engine=create_engine(settings.DATABASE_URL,connect_args={"check_same_thread":False}if"sqlite"insettings.DATABASE_URLelse{})# 创建会话工厂SessionLocal=sessionmaker(autocommit=False,autoflush=False,bind=engine)# 创建数据库表defcreate_db_tables():Base.metadata.create_all(bind=engine)# 依赖项:获取数据库会话defget_db():db=SessionLocal()try:yielddbfinally:db.close()

步骤 5:核心工具类实现

5.1 JWT 工具类(app/utils/jwt_util.py)

python

运行

fromdatetimeimportdatetime,timedeltafromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfromapp.configimportsettings# 密码加密上下文pwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")defverify_password(plain_password:str,hashed_password:str)->bool:"""验证密码"""returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password:str)->str:"""生成密码哈希"""returnpwd_context.hash(password)defcreate_access_token(data:dict)->< href="https://zq.zhaopin.com/moment/86380339">< href="https://zq-mobile.zhaopin.com/moment/86380339">< href="https://zhiq.zhaopin.com/moment/86380339">< href="https://zq.zhaopin.com/moment/86380297">< href="https://zq-mobile.zhaopin.com/moment/86380297">str:"""生成 JWT 访问令牌"""to_encode=data.copy()expire=datetime.utcnow()+timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,settings.JWT_SECRET_KEY,algorithm="HS256")returnencoded_jwtdefdecode_access_token(token:str)->dict:"""解码 JWT 令牌"""try:payload=jwt.decode(token,settings.JWT_SECRET_KEY,algorithms=["HS256"])returnpayloadexceptJWTError:returnNone

5.2 云存储工具类(app/utils/oss_util.py)

以阿里云 OSS 为例,实现文件上传功能:

python

运行

fromaliyunsdkcore.clientimportAcsClientfromaliyunsdkcore.exceptionsimportClientException,ServerExceptionfromaliyunsdkoss.request.v20190517importPutObjectRequestimportosfromapp.configimportsettingsclassOSSUtil:def__init__(self):self.client=AcsClient(settings.OSS_ACCESS_KEY_ID,settings.OSS_ACCESS_KEY_SECRET,settings.OSS_ENDPOINT.split(".")[0].split("-")[-1]# 提取区域(如 beijing))self.bucket_name=settings.OSS_BUCKET_NAME        self.base_url=settings.OSS_BASE_URLdefupload_file(self,local_file_path:str,oss_object_key:str)->str:"""

        上传文件到 OSS

        :param local_file_path: 本地文件路径

        :param oss_object_key: OSS 存储路径(如 video/xxx/720p/index.m3u8)

        :return: OSS 文件访问 URL

        """try:request=PutObjectRequest.PutObjectRequest()request.set_Bucket(self.bucket_name)request.set_Key(oss_object_key)# 读取文件内容withopen(local_file_path,"rb")asf:request.set_Content(f.read())# 发送请求self.client.do_action_with_exception(request)returnf"{self.base_url}{oss_object_key}"except(ClientException,ServerException)ase:raiseException(f"OSS 上传失败:{str(e)}")

步骤 6:异步转码任务(Celery)

6.1 Celery 初始化(app/tasks/init.py)

python

运行

fromceleryimportCeleryfromapp.configimportsettings# 初始化 Celerycelery_app=Celery("video_tasks",broker=settings.CELERY_BROKER_URL,backend=settings.CELERY_RESULT_BACKEND,include=["app.tasks.transcode_task"])# 配置 Celerycelery_app.conf.update(task_serializer="json",result_serializer="json",accept_content=["json"],timezone="Asia/Shanghai",enable_utc=True,)

6.2 转码任务实现(app/tasks/transcode_task.py)

python

运行

importosimportsubprocessfrompathlibimportPathfromapp.tasksimportcelery_appfromapp.configimportsettingsfromapp.models.videoimportVideo,VideoStatusfromappimportSessionLocalfromapp.utils.oss_utilimportOSSUtil# 初始化 OSS 工具(生产环境)oss_util=OSSUtil()ifsettings.OSS_ACCESS_KEY_IDelseNonedefget_video_duration(video_path:str)->int:"""获取视频时长(秒)"""cmd=["ffprobe","-v","error","-show_entries","format=duration","-of","default=noprint_wrappers=1:nokey=1",video_path]result=subprocess.run(cmd,capture_output=True,text=True)ifresult.returncode!=0:raiseException(f"获取视频时长失败:{result.stderr}")returnint(round(float(result.stdout.strip())))@celery_app.task(bind=True,max_retries=3)deftranscode_video(self,video_id:str):"""

    视频转码任务:将原始视频转码为 HLS 格式(多码率)

    :param video_id: 视频唯一标识

    """db=SessionLocal()try:# 1. 查询视频信息video=db.query(Video).filter(Video.video_id==video_id).first()ifnotvideo:raiseException(f"视频不存在:{video_id}")# 2. 更新视频状态为转码中video.status=VideoStatus.TRANSCODING        db.commit()# 3. 构建路径original_ext=Path(video.original_filename).suffix        original_video_path=os.path.join(settings.LOCAL_UPLOAD_PATH,f"{video_id}{original_ext}")transcode_root_path=os.path.join(settings.LOCAL_TRANSCODE_PATH,video_id)os.makedirs(transcode_root_path,exist_ok=True)# 4. 多码率转码(生成不同清晰度的切片)resolutions=< href="https://zhiq.zhaopin.com/moment/86380297">< href="https://zq.zhaopin.com/moment/86380246">< href="https://zq-mobile.zhaopin.com/moment/86380246">< href="https://zhiq.zhaopin.com/moment/86380246">< href="https://zq.zhaopin.com/moment/86380205">< href="https://zq-mobile.zhaopin.com/moment/86380205">settings.TRANSCODE_RESOLUTIONS        bitrates=settings.TRANSCODE_BITRATE        slice_duration=settings.TRANSCODE_SLICE_DURATIONforquality,resolutioninresolutions.items():quality_path=os.path.join(transcode_root_path,quality)os.makedirs(quality_path,exist_ok=True)# FFmpeg 转码命令cmd=["ffmpeg","-i",original_video_path,"-c:v","h264",# 视频编码(H.264 兼容性最好)"-c:a","aac",# 音频编码"-s",resolution,# 分辨率"-b:v",bitrates[quality],# 视频码率"-hls_time",str(slice_duration),# 切片时长"-hls_list_size","0",# 保留所有切片"-hls_segment_filename",os.path.join(quality_path,"%03d.ts"),# 切片文件名os.path.join(quality_path,"index.m3u8")# 索引文件]# 执行转码命令result=subprocess.run(cmd,capture_output=True,text=True)ifresult.returncode!=0:raiseException(f"转码失败({quality}):{result.stderr}")# 5. 生成主索引文件(包含所有码率,支持自适应切换)master_m3u8_path=os.path.join(transcode_root_path,"master.m3u8")withopen(master_m3u8_path,"w")asf:f.write("#EXTM3U\n#EXT-X-VERSION:3\n")forquality,resolutioninresolutions.items():bandwidth=int(bitrates[quality].replace("k",""))*1000# 带宽(bps)f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={bandwidth},RESOLUTION={resolution}\n")f.write(f"{quality}/index.m3u8\n")# 6. 上传到云存储(生产环境)ifoss_util:# 递归上传所有转码文件forroot,dirs,filesinos.walk(transcode_root_path):forfileinfiles:local_file=os.path.join(root,file)# 构建 OSS 存储路径(相对路径)relative_path=os.path.relpath(local_file,settings.LOCAL_TRANSCODE_PATH)oss_object_key=relative_path.replace("\\","/")# 统一路径分隔符oss_util.upload_file(local_file,oss_object_key)# 更新 m3u8 地址(云存储)m3u8_url=f"{settings.OSS_BASE_URL}{video_id}/master.m3u8"else:# 本地存储地址m3u8_url=f"{settings.LOCAL_BASE_URL}{video_id}/master.m3u8"# 7. 更新视频信息(时长、状态、m3u8 地址)video.duration=get_video_duration(original_video_path)video.status=VideoStatus.COMPLETED        video.m3u8_url=m3u8_url        db.commit()return{"status":"success","m3u8_url":m3u8_url}exceptExceptionase:# 转码失败,更新状态并重试ifvideo:video.status=VideoStatus.FAILED            db.commit()self.retry(exc=e,countdown=60)# 60 秒后重试finally:db.close()

6.3 Celery Worker 启动文件(celery_worker.py)

python

运行

fromapp.tasksimportcelery_appif__name__=="__main__":celery_app.worker_main()

步骤 7:FastAPI 接口实现

7.1 认证依赖(app/api/dependencies.py)

python

运行

fromfastapiimportDepends,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearerfromjoseimportJWTErrorfromapp.utils.jwt_utilimportdecode_access_tokenfromappimportget_dbfromsqlalchemy.ormimportSessionfromapp.models.videoimportUser# 定义 OAuth2 令牌提取器oauth2_scheme=OAuth2PasswordBearer(tokenUrl="/api/auth/login")defget_current_user(token:str=Depends(oauth2_scheme),db:Session=Depends(get_db))->User:"""获取当前登录用户(依赖项)"""credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="无效的令牌或令牌已过期",headers={"WWW-Authenticate":"Bearer"},)# 解码令牌payload=decode_access_token(token)ifnotpayload:raisecredentials_exception# 提取用户名username:str=payload.get("sub")ifnotusername:raisecredentials_exception# 查询用户user=db.query(User).filter(User.username==username).first()ifnotuser:raisecredentials_exceptionreturnuser

7.2 认证接口(app/api/auth.py)

python

运行

fromfastapiimportAPIRouter,Depends,HTTPException,statusfromfastapi.securityimportOAuth2PasswordRequestFormfromsqlalchemy.ormimportSessionfromappimportget_dbfromapp.models.videoimportUserfromapp.utils.jwt_utilimportverify_password,get_password_hash,create_access_tokenrouter=APIRouter(prefix="/api/auth",tags=["认证"])@router.post("/register",summary="用户注册")defregister(username:str,email:str,password:str,db:Session=Depends(get_db)):# 检查用户名/邮箱是否已存在ifdb.query(User).filter(User.username==username).first():raiseHTTPException(status_code=400,detail="用户名已存在")ifdb.query(User).filter(User.email==email).first():raiseHTTPException(status_code=400,detail="邮箱已存在")# 创建用户hashed_password=get_password_hash(password)user=User(username=username,email=email,hashed_password=hashed_password)db.add(user)db.commit()db.refresh(user)return{"message":"注册成功","user_id":user.id}@router.post("/login",summary="用户登录(获取 JWT 令牌)")deflogin(form_data:OAuth2PasswordRequestForm=Depends(),db:Session=Depends(get_db)):# 查询用户user=db.query(User).filter(User.username==form_data.username).first()# 验证密码ifnotuserornotverify_password(form_data.password,user.hashed_password):raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="用户名或密码错误",headers={"WWW-Authenticate":"Bearer"},)# 生成令牌access_token=create_access_token(data={"sub":user.username})return{"access_token":access_token,"token_type":"bearer","user_id":user.id,"username":user.username}

7.3 视频接口(app/api/video.py)

python

运行

fromfastapiimportAPIRouter,Depends,File,UploadFile,HTTPException,Requestfromfastapi.responsesimportFileResponse,StreamingResponsefromsqlalchemy.ormimportSessionimportosfrompathlibimportPathfromappimportget_dbfromapp.models.videoimportVideo,VideoStatusfromapp.api.dependenciesimportget_current_userfromapp.models.videoimportUserfromapp.tasks.transcode_taskimporttranscode_videofromapp.configimportsettingsrouter=APIRouter(prefix="/api/video",tags=["视频"])@router.post("/upload",summary="视频上传")defupload_video(title:str,file:UploadFile=File(...),current_user:User=Depends(get_current_user),db:Session=Depends(get_db)):# 验证文件类型allowed_extensions={".mp4",".mov",".avi",".mkv"}file_ext=Path(file.filename).suffix.lower()iffile_extnotinallowed_extensions:raiseHTTPException(status_code=400,detail="仅支持 MP4、MOV、AVI、MKV 格式")# 验证文件大小(最大 1GB)file_size=file.file.seek(0,2)file.file.seek(0)# 重置文件指针iffile_size>1024*1024*1024:raiseHTTPException(status_code=400,detail="文件大小不能超过 1GB")# 保存原始视频到本地video_id=str(Path(file.filename).stem)+"_"+str(os.urandom(4).hex())# 生成唯一标识original_video_path=os.path.join(settings.LOCAL_UPLOAD_PATH,f"{video_id}{file_ext}")withopen(original_video_path,"wb")asf:f.write(file.file.read())# 存入数据库video=Video(user_id=current_user.id,title=title,original_filename=file.filename,file_size=file_size,video_id=video_id)db.add(video)db.commit()db.refresh(video)# 触发异步转码任务transcode_video.delay(video_id)return{"message":"视频上传成功,转码中...","video_id":video_id,"status":video.status.value}@router.get("/status/{video_id}",summary="查询视频状态")defget_video_status(video_id:str,current_user:User=Depends(get_current_user),db:Session=Depends(get_db)):video=db.query(Video).filter(Video.video_id==video_id).first()ifnotvideo:raiseHTTPException(status_code=404,detail="视频不存在")# 验证权限(仅上传者可查询)ifvideo.user_id!=current_user.id:raiseHTTPException(status_code=403,detail="无权限访问该视频")return{"video_id":video.video_id,"title":video.title,"status":video.status.value,"duration":video.duration,"m3u8_url":video.m3u8_url,"created_at":video.created_at}@router.get("/play/{video_id}/{quality}/{filename}",summary="播放视频切片(.m3u8 或 .ts)")defplay_video_slice(video_id:str,quality:str,filename:str,request:Request,current_user:User=Depends(get_current_user),db:Session=Depends(get_db)):# 1. 验证视频存在性和权限video=db.query(Video).filter(Video.video_id==video_id).first()ifnotvideo:raiseHTTPException(status_code=404,detail="视频不存在")ifvideo.user_id!=current_user.id:raiseHTTPException(status_code=403,detail="无权限播放该视频")ifvideo.status!=VideoStatus.COMPLETED:raiseHTTPException(status_code=400,detail="视频转码中,暂无法播放")# 2. 验证清晰度是否支持ifqualitynotinsettings.TRANSCODE_RESOLUTIONS:raiseHTTPException(status_code=400,detail="不支持该清晰度")# 3. 本地存储读取(开发环境)ifnotsettings.OSS_ACCESS_KEY_ID:file_path=os.path.join(settings.LOCAL_TRANSCODE_PATH,video_id,quality,filename)ifnotos.path.exists(file_path):raiseHTTPException(status_code=404,detail="切片文件不存在")# 流式响应配置(支持断点续传)defstream_file():withopen(file_path,"rb")asf:whilechunk:=f.read(1024*1024):# 1MB 分块传输yieldchunk# 设置响应头headers={"Accept-Ranges":"bytes","Content-Length":str(os.path.getsize(file_path))}iffilename.endswith(".m3u8"):headers["Content-Type"]="application/x-mpegURL"eliffilename.endswith(".ts"):headers["Content-Type"]="video/MP2T"returnStreamingResponse(stream_file(),headers=headers)# 4. 生产环境:重定向到云存储 CDN 地址cdn_url=f"{settings.OSS_BASE_URL}{video_id}/{quality}/{filename}"return{"redirect_url":cdn_url}

步骤 8:FastAPI 入口文件(app/main.py)

python

运行

fromfastapiimportFastAPIfromfastapi.middleware.corsimportCORSMiddlewarefromappimportcreate_db_tablesfromapp.api.authimportrouterasauth_routerfromapp.api.videoimportrouterasvideo_routerfromapp.configimportsettings# 创建数据库表create_db_tables()# 初始化 FastAPI 应用app=FastAPI(title="Python 在线视频播放系统",description="基于 FastAPI + FFmpeg + Celery 实现的视频上传、转码、流式播放系统",version="1.0.0")# 配置 CORS(跨域资源共享)app.add_middleware(CORSMiddleware,allow_origins=["*"],# 生产环境替换为具体域名allow_credentials=True,allow_methods=["*"],allow_headers=["*"],)# 注册路由app.include_router(auth_router)app.include_router(video_router)# 根路由@app.get("/",summary="健康检查")defroot():return{"message":"Python 在线视频播放系统运行中","version":"1.0.0"}

步骤 9:前端播放实现(Video.js + HLS)

创建 index.html 文件,实现前端视频播放:

html

预览

<!DOCTYPEhtml><htmllang="zh-CN"><head><metacharset="UTF-8"><title>Python 在线视频播放</title><!-- 引入 Video.js 样式和脚本 --><linkhref="https://vjs.zencdn.net/8.6.1/video-js.css"rel="stylesheet"><scriptsrc="https://vjs.zencdn.net/8.6.1/video.min.js"></script><!-- 引入 HLS 插件(支持 HLS 协议) --><scriptsrc="https://cdn.jsdelivr.net/npm/videojs-contrib-hls@5.15.0/dist/videojs-contrib-hls.min.js"></script><style>.video-container{width:1280px;margin:50pxauto;}.controls{width:1280px;margin:20pxauto;text-align:center;}button{padding:10px20px;margin:010px;cursor:pointer;}</style></head><body><divclass="controls"><inputtype="text"id="videoId"placeholder="输入视频 ID"/><buttononclick="loadVideo()">加载视频</button></div><divclass="video-container"><videoid="video-player"class="video-js vjs-big-play-centered vjs-fluid"controlspreload="auto"poster="https://via.placeholder.com/1280x720?text=视频封面"><sourceid="video-source"src=""type="application/x-mpegURL">您的浏览器不支持 HTML5 视频播放</video></div><script>// 初始化 Video.jsconstplayer=videojs('video-player',{autoplay:false,controls:true,responsive:true,fluid:true,});// 加载视频functionloadVideo(){constvideoId=document.getElementById('videoId').value.trim();if(!videoId){alert('请输入视频 ID');return;}// 替换为实际的 m3u8 地址(从后端接口获取)consttoken="你的 JWT 令牌";// 登录后获取constm3u8Url=`http://localhost:8000/api/video/play/${videoId}/master.m3u8`;// 设置视频源player.src({src:m3u8Url,type:'application/x-mpegURL'});// 加载并播放player.load();player.play().catch(err=>{console.error('播放失败:',err);alert('播放失败,请检查视频 ID 或网络状态');});}// 监听播放错误player.on('error',function(){console.error('视频播放失败:',player.error());alert('视频播放失败,请联系管理员');});// 监听视频加载完成player.on('loadedmetadata',function(){console.log('视频时长:',player.duration());});</script></body></html>

步骤 10:启动系统

启动 RabbitMQ/Redis(确保消息代理已运行)。

启动 Celery Worker(处理转码任务):

bash

运行

celery-Acelery_worker worker--loglevel=info--queue=transcode

启动 FastAPI 应用

bash

运行

uvicorn app.main:app--host0.0.0.0--port8000--reload

访问系统

API 文档:http://localhost:8000/docs

前端播放页面:直接打开 index.html 文件,输入视频 ID 即可播放。

四、生产环境优化

1. 转码优化

多线程转码:通过 FFmpeg 的 -threads 参数设置转码线程数(如 -threads 4),利用多核 CPU 提升效率。

转码进度监控:在转码命令中添加 -progress pipe:1 参数,实时获取转码进度,存入 Redis,前端通过接口查询展示。

任务优先级:给 Celery 任务设置优先级(如 transcode_video.apply_async(args=[video_id], priority=10)),VIP 用户任务优先执行。

2. 存储与加速

CDN 全面集成:将云存储的视频文件接入 CDN,配置缓存策略(.m3u8 缓存 1 分钟,.ts 缓存 1 天),降低源站压力。

视频封面生成:转码时通过 FFmpeg 截取首帧作为封面(ffmpeg -i input.mp4 -ss 00:00:01 -vframes 1 cover.jpg),提升用户体验。

过期文件清理:使用 Celery 定时任务(celery beat)定期清理 30 天未访问的原始视频和转码文件,节省存储成本。

3. 防盗播增强

URL 签名:对视频访问 URL 添加签名和过期时间,防止恶意分享:

python

运行

importhashlibimporttimedefgenerate_signed_url(video_id:str,quality:str,filename:str,expire=3600):"""生成带签名的视频访问 URL"""timestamp=int(time.time())+expire    secret=settings.SECRET_KEY    sign_str=f"{video_id}_{quality}_{filename}_{timestamp}_{secret}"sign=hashlib.md5(sign_str.encode()).hexdigest()returnf"/api/video/play/{video_id}/{quality}/{filename}?t={timestamp}&sign={sign}"defverify_sign(video_id:str,quality:str,filename:str,timestamp:int,sign:str):"""验证签名"""iftime.time()>timestamp:returnFalsesecret=settings.SECRET_KEY    sign_str=f"{video_id}_{quality}_{filename}_{timestamp}_{secret}"returnhashlib.md5(sign_str.encode()).hexdigest()==sign

Referer 白名单:通过 Nginx 配置限制仅指定域名可访问视频,阻止盗链:

nginx

location~* \.(m3u8|ts)${valid_referersnone blocked *.yourdomain.com;if($invalid_referer){return403;}}

水印添加:转码时通过 FFmpeg 添加文字 / 图片水印(ffmpeg -i input.mp4 -i watermark.png -filter_complex "overlay=10:10" output.mp4)。

4. 高可用设计

集群部署:FastAPI 应用集群部署,通过 Nginx 负载均衡;Celery Worker 多节点部署,提高转码并发能力。

熔断降级:使用 tenacity 库实现重试机制,结合 Redis 限流(如 slowapi),避免流量峰值压垮系统。

监控告警:通过 Prometheus + Grafana 监控系统资源、转码成功率、播放成功率;使用 Sentry 捕获异常并告警。

五、常见问题排查

视频上传失败

检查 Python 配置(MAX_CONTENT_LENGTH)、Nginx 配置(client_max_body_size)是否支持大文件上传;确保存储目录有写入权限。

转码失败

检查 FFmpeg 环境变量配置,终端执行转码命令调试;确认视频文件格式支持,文件路径无中文 / 特殊字符。

播放卡顿

检查切片时长(建议 5-10 秒),CDN 是否生效;降低视频码率,增加低清晰度选项适配弱网络。

JWT 验证失败

检查令牌是否过期,密钥是否一致;前端请求头是否正确携带 Authorization: Bearer {token}。

Celery 任务不执行

检查消息代理(RabbitMQ/Redis)是否运行;确认 Celery Worker 已启动且监听正确队列。

六、总结

本文基于 Python + FastAPI + Celery + FFmpeg 实现了一套完整的在线视频播放系统,核心是通过异步转码生成 HLS 格式视频,结合 Video.js 实现跨端流式播放,并通过 JWT 保障权限安全。开发环境可快速验证功能,生产环境通过云存储、CDN、集群部署等方案优化性能和可用性。

该方案适用于中小型视频平台(如教育机构、企业培训系统、自媒体网站),如需支持超大规模并发(如千万级用户),可进一步接入专业视频云服务(如阿里云视频点播、腾讯云 VOD),简化转码、存储、分发等复杂流程,聚焦业务逻辑开发。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容