随着AIGC和数字人技术的成熟,实时数字人也迎来了广泛使用,目前在娱乐、社交、教育、直播、客服领域都有很好地落地场景。
如果想实现数字人的实时性,除了数字人模型本身,工程上还要解决两个关键点:一个是如何将服务端生成的数字人展现到客户/用户端,另一个是在此基础上实现实时性,尽量减少用户的等待时间。
数字人展现给前端用户一般有两种技术方案。一种是传递音频流和嘴部数据,由前端进行渲染,渲染一般用到Live2D、Unreal Engine或者Unity等,通常适用于卡通形象的数字人;另一种是直接由服务端完成渲染,通过视频推流的方式推送给前端用户观看,通常适用于真人复刻形式的数字人,本文主要讲解的是第二种技术方案。
音视频推流一般使用RTC技术,但目前常见的RTC技术,或者服务商提供的SDK,均只实现了客户端之间的通信,即采集本地摄像头或传递某个视频文件,通过RTC/RTMP协议发送给接收端。通过调研发现FFmpeg具备推流能力,我们就可以在服务端调用FFmpeg,将数字人的音视频流推送到RTC服务。FFmpeg是一款强大的多媒体处理软件,支持各种视频处理操作,包括混流、推流等。
使用FFmpeg推流很简单,以下命令就实现了一个mp4文件的推流
ffmpeg -re -i input.mp4 -c copy -f flv rtmp://server/live/streamName
如果想将视频和音频混流推送,可以使用以下命令:
ffmpeg -re -y -an -i input.mp4 -i input.wav -c copy -f flv rtmp://server/live/streamName
那如何结合FFmpeg实现实时推流呢?这就需要在数字人模块生成每一帧图像的同时,同步将该帧图像和对应的音频采样数据,通过FFmpeg推流到RTMP。我们可以设置两个管道,一个接收视频帧数据,一个接收音频采样数据,分别有两个线程向两个管道匀速写入,以实现模拟直播推流的效果,具体实现代码如下:
import cv2
import subprocess
import time
import numpy as np
import librosa
import threading
import os
import hashlib
import asyncio
# 将视频流写入管道
def write_video_stream(cap, fps, pipe_name):
fd_pipe = os.open(pipe_name, os.O_WRONLY)
while True:
ret, frame = cap.read()
if not ret:
break
os.write(fd_pipe, frame.tobytes())
os.close(fd_pipe)
# 将音频流写入管道;
def write_audio_stream(cap, speech_array, fps, pipe_name):
fd_pipe = os.open(pipe_name, os.O_WRONLY)
wav_frame_num = int(44100 / fps)
while True:
# 由于音频流的采样率是44100, 而视频流的帧率是30, 因此需要对音频流进行分帧
speech = speech_array[frame_counter * wav_frame_num : (frame_counter+1) * wav_frame_num]
os.write(fd_pipe, speech.tostring())
frame_counter += 1
# 根据视频帧数决定音频写入次数
if frame_counter == int(cap.get(cv2.CAP_PROP_FRAME_COUNT)):
break
os.close(fd_pipe)
def push():
# 模拟数字人生成的视频流和音频流
# 使用OpenCV读取视频流
cap = cv2.VideoCapture("input.mp4")
# 使用librosa读取音频流
speech_array, sr = librosa.load("input.wav", sr=44100) # 对于rtmp, 音频速率是有要求的,这里采用了44100
speech_array = (speech_array*32767).astype(np.int16) # 转为整型
push_url = 'rtmp://xxxx.com/live/stream_name'
# 获取视频流的帧率、宽度和高度
fps = float(cap.get(5))
width = int(cap.get(3))
height = int(cap.get(4))
# 创建两个"named pipes",用于存放视频流和音频流
# 判断如果管道存在,则先unlink
if os.path.exists('video_pipe'):
os.unlink('video_pipe')
if os.path.exists('audio_pipe'):
os.unlink('audio_pipe')
os.mkfifo('video_pipe')
os.mkfifo('audio_pipe')
# ffmpeg命令,不做详解,可以参考ffmpeg文档
command = ['ffmpeg',
'-loglevel', 'info',
'-y', '-an',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(width, height),
'-r', str(fps),
'-i', 'video_pipe', # 视频流管道作为输入
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-i', 'audio_pipe', # 音频流管道作为输入
'-c:v', "libx264",
'-pix_fmt', 'yuv420p',
'-s', "960x540",
'-preset', 'ultrafast',
'-profile:v', 'baseline',
'-tune', 'zerolatency',
'-g', '2',
'-b:v', "1000k",
'-ac', '1',
'-ar', '44100',
'-acodec', 'aac',
'-shortest',
'-f', 'flv',
push_url]
# 启动进程运行ffmpeg命令
proc = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE)
# 创建两个线程,分别将视频流和音频流写入"named pipes"
video_thread = threading.Thread(target=write_video_stream, args=(cap, fps, 'video_pipe'))
audio_thread = threading.Thread(target=write_audio_stream, args=(cap, speech_array, fps, 'audio_pipe'))
video_thread.start()
audio_thread.start()
video_thread.join()
audio_thread.join()
proc.wait()
# Remove the "named pipes".
os.unlink('video_pipe')
os.unlink('audio_pipe')
if __name__ == "__main__":
push()
这里并未真正接入数字人模块,通过cv2读取视频文件每一帧模拟。正式接入方式,可以开启两个队列,由数字人模块分别写入音频帧和视频帧,两个线程分别从队列中读取音频帧和视频帧数据,再写入pipe管道即可。
参考文献:
包包凯:通过python实时生成音视频数据并通过ffmpeg推送和混流
https://stackoverflow.com/questions/74256808/how-to-merge-audio-and-video-in-bytes-using-ffmpeg