Tube - AI background jobs

  • background jobs
    • 后台工作能很好地帮助我们处理超时的问题
    • 即使任务失败了也可以立即重试
  • upstash workflow
    • 点击查看 How to use and deploy Upstash Workflow with Next.js?,在当前项目中之前使用过upstash redis的rate limit服务
    • 更新 .env 文件中的 QStash 时,一定要按照文档提示的位置找到对应变量
    • install 版本:"@upstash/workflow": "^0.2.18"
    • 运行工作流节点,使用http请求curl -X POST http://localhost:3000/api/videos/workflows/title,正常请求成功的话会拿到一个 workflowRunId,当前运行这个节点只是简单的做个测试,更重要的是下一步保护节点的安全性
    • 保护工作流节点的安全性,防止任何人都可以触发工作流,配置QStash内置的验证请求,最终环境变量内容如下:(配置完成这一步后再运行工作流节点会提示验证失败)
    // .env
    
    # Upstash workflow
    UPSTASH_WORKFLOW_URL=...
    QSTASH_TOKEN=...  
    QSTASH_CURRENT_SIGNING_KEY="..."
    QSTASH_NEXT_SIGNING_KEY="..."
    
  • 在业务中触发工作流节点
    import { Client } from "@upstash/workflow";
    
    export const workflow = new Client({ token: process.env.QSTASH_TOKEN! });   
    
    • 以“AI生成视频标题”这个功能为例,先创建一个trpc路由 generateTitle,在该路由中发起一个workflow,然后在workflow中处理业务逻辑
    // src/modules/videos/server/procedure.ts
    
    import { workflow } from '@/lib/workflow'
    
    export const videosRouter = createTRPCRouter({
      ...
      generateTitle: protectedProcedure
        .input(z.object({videoId: z.uuid()}))
        .mutation(async ({ ctx, input }) => {
          const { id: userId } = ctx.user
    
          const { workflowRunId } = await workflow.trigger({
            url: `${process.env.UPSTASH_WORKFLOW_URL}/api/videos/workflows/title`, // workflow的地址
            body: { userId, videoId: input.videoId }, 
          }) 
    
          return workflowRunId
        })
    })
    
    // src/app/api/videos/workflows/title/route.ts
    
    import { serve } from "@upstash/workflow/nextjs"
    
    import { db } from '@/db'
    import { videos } from '@/db/schema'
    import { and, eq } from 'drizzle-orm'
    
    interface InputType {
      userId: string,
      videoId: string
    }
    
    export const { POST } = serve(
      async (context) => {
        const input = context.requestPayload as InputType
        const { userId, videoId } = input
    
        const video = await context.run('get-video', async () => {...})
        ...
      }
    )
    
  • 关于mux
    • muxPlaybackId: 公开给前端使用的ID,可用于播放视频、视频预览、封面图,是在视频资产Assets准备就绪 video.asset.ready 时生成的,mux自动生成的
    • muxTrackId: Asset里某一个视频媒体轨道的ID,比如视频轨道、音频轨道、文本字幕轨道
    • generated_subtitles 表示在Mux完成视频转码后,会自动生成一条字幕轨道Subtitle Track,语言是en,名字叫English,字幕轨道生成完成时,发送对应的Webhook,也就是 video.asset.track.ready 事件
    // src/modules/videos/server/procedure.ts
    
    export const videosRouter = createTRPCRouter({
      create: protectedProcedure
        .mutation(async ({ ctx }) => {
           const { id: userId } = ctx.user
        
           const upload = await mux.video.uploads.create({
             new_asset_settings: {
                passthrough: userId, // 传递用户ID到Mux
                playback_policy: ['public'], // 设置播放策略为公开
                input: [
                  {
                    generated_subtitles: [
                      {
                        language_code: 'en',
                        name: 'English'
                      }
                    ]
                  }
                ]
             }
             cors_origin: '*', // 允许所有CORS来源
           })
           ...
        })
    })
    
  • fetch 请求
    • Node.js 18+内置的api,发起HTTP请求,向远程服务器获取数据或发送数据
    • 基本语法:response.text() 读取返回的纯文本
    const response = await fetch(url, {
      method: 'GET' | 'POST' | 'PUT' | 'DELETE',
      headers: { ... },
      body: JSON.stringify(data)
    })
    
    const json = await response.json() // 解析响应体
    
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容