Eino中的面向切片编程实现

在Eino 中的callpacks包为组件的扩展提供回调(callback)的机制, 它允许用户在组件执行的不同阶段注入回调处理器(handlers), 比如开始, 结束或者错误发生时, 通过切片注入的方式常用于记录日志, 监控, 和统计分析等功能

示例:

//  func (t *testChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
//      defer func() {
//          if err != nil {
//              callbacks.OnEnd(ctx, err)
//          }
//      }()
//
//      ctx = callbacks.OnStart(ctx, &model.CallbackInput{
//          Messages: input,
//          Tools:    nil,
//          Extra:    nil,
//      })
//
//      // do smt
//
//      ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
//          Message: resp,
//          Extra:   nil,
//      })
//
//      return resp, nil
//  }

定义Handler 接口, 及运行时信息RunInfo 类型

package callbacks

import "context"

type RunInfo struct {
    Name      string
    Type      string
    Component Component
}

type CallbackInput any

type CallbackOutput any

type Handler interface {
    OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
    OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
    OnError(ctx context.Context, info *RunInfo, err error) context.Context
}

Handler 接口定义了组件在运行开始(OnStart), 结束(OnEnd)以及错误发生时的需要处理的声明(OnError)

下面通过切面编程实现自动日志的记录的Handler, loggerCallbacks 实现 Handler 接口, 分别定义运行开始(OnStart), 结束(OnEnd)及错误发生时(OnError)的日志记录逻辑实现:

package callbacks

import (
    "context"
    "github.com/sirupsen/logrus"
)

type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)

// 日志handler 实现
type loggerCallbacks struct{}

func (l *loggerCallbacks) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
    logrus.Infof("name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
    return ctx
}

func (l *loggerCallbacks) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
    logrus.Infof("name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
    return ctx
}

func (l *loggerCallbacks) OnError(ctx context.Context, info *RunInfo, err error) context.Context {
    logrus.Errorf("name: %v, type: %v, component: %v, error: %v", info.Name, info.Type, info.Component, err)
    return ctx
}

使用示例:

package callbacks

import (
    "context"
    "fmt"
    "strconv"
    "testing"
)

func TestAspectInject(t *testing.T) {
    t.Run("log_handler", func(t *testing.T) {
        ctx := context.Background()
        hb := &loggerCallbacks{}
        runInfo := &RunInfo{
            Name:      "test_log",
            Type:      "sdf",
            Component: "ChatTemplate",
        }
        ctx = InitCallbacks(ctx, runInfo, hb)
        ctx = OnStart(ctx, 1)

       //业务逻辑处理此处省略......

        ctx = OnEnd(ctx, 2)
        ctx = OnError(ctx, fmt.Errorf("3"))

    })

}

示例输出结果如下:


image.png
  1. 示例中, 首先需要 InitCallbacks(ctx, runInfo, hb)函数来通过初始化回调的运行时信息(RunInfo),处理器信息 (Handler), 以及上下文信息和Manager, 具体实现如下:
func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context {
    mgr, ok := newManager(info, handlers...)
    if ok {
        return ctxWithManager(ctx, mgr)
    }

    return ctxWithManager(ctx, nil)
}



var GlobalHandlers []Handler

func newManager(runInfo *RunInfo, handlers ...Handler) (*manager, bool) {
    if len(handlers)+len(GlobalHandlers) == 0 {
        return nil, false
    }

    hs := make([]Handler, len(GlobalHandlers))
    copy(hs, GlobalHandlers)

    return &manager{
        globalHandlers: hs,
        handlers:       handlers,
        runInfo:        runInfo,
    }, true
}


func ctxWithManager(ctx context.Context, manager *manager) context.Context {
    return context.WithValue(ctx, CtxManagerKey, manager)
}

func managerFromCtx(ctx context.Context) (*manager, bool) {
    v := ctx.Value(CtxManagerKey)
    m, ok := v.(*manager)
    if ok && m != nil {
        n := *m
        return &n, true
    }

    return nil, false
}

manager 主要用于管理处理器, 并通过上下文context 在调用链上进行传递, 类型定义如下:

type manager struct {
    globalHandlers []Handler
    handlers       []Handler
    runInfo        *RunInfo
}
  1. 示例中通过 ctx = OnStart(ctx, 1) 来声明业务逻辑处理开始时的切面操作, 并传入业务逻辑数据, OnStart实现如下:
// OnStart invokes the OnStart logic for the particular context, ensuring that all registered
// handlers are executed in reverse order (compared to add order) when a process begins.
func OnStart[T any](ctx context.Context, input T) context.Context {
    ctx, _ = On(ctx, input, OnStartHandle[T], true)
    return ctx
}


// 遍历handler,  并调用handler的OnStart函数
func OnStartHandle[T any](ctx context.Context, input T,
    runInfo *RunInfo, handlers []Handler) (context.Context, T) {

    for i := len(handlers) - 1; i >= 0; i-- {
        ctx = handlers[i].OnStart(ctx, runInfo, input)
    }

    return ctx, input
}



type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)

func On[T any](ctx context.Context, inOut T, handle Handle[T], start bool) (context.Context, T) {
    mgr, ok := managerFromCtx(ctx)
    if !ok {
        return ctx, inOut
    }
    nMgr := *mgr

    var info *RunInfo
    if start {
        info = nMgr.runInfo
        nMgr.runInfo = nil
        ctx = context.WithValue(ctx, CtxRunInfoKey, info)
    } else {
        if nMgr.runInfo != nil {
            info = nMgr.runInfo
        } else {
            info, _ = ctx.Value(CtxRunInfoKey).(*RunInfo)
        }
    }

    hs := make([]Handler, 0, len(nMgr.handlers)+len(nMgr.globalHandlers))
    for _, handler := range append(nMgr.handlers, nMgr.globalHandlers...) {
        hs = append(hs, handler)
    }

    var out T
    // 从ctx中的manager中获取处理器, 并调用handler的OnStart函数
    ctx, out = handle(ctx, inOut, info, hs)
    return ctxWithManager(ctx, &nMgr), out
}
  1. OnEnd 与 OnError 实现类似:
// OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup
// and finalization when a process ends.
// handlers are executed in normal order (compared to add order).
func OnEnd[T any](ctx context.Context, output T) context.Context {
    ctx, _ = On(ctx, output, OnEndHandle[T], false)

    return ctx
}

// OnError invokes the OnError logic of the particular, notice that error in stream will not represent here.
// handlers are executed in normal order (compared to add order).
func OnError(ctx context.Context, err error) context.Context {
    ctx, _ = On(ctx, err, OnErrorHandle, false)

    return ctx
}


func OnErrorHandle(ctx context.Context, err error,
    runInfo *RunInfo, handlers []Handler) (context.Context, error) {

    for _, handler := range handlers {
        ctx = handler.OnError(ctx, runInfo, err)
    }

    return ctx, err
}

func OnEndHandle[T any](ctx context.Context, output T,
    runInfo *RunInfo, handlers []Handler) (context.Context, T) {

    for _, handler := range handlers {
        ctx = handler.OnEnd(ctx, runInfo, output)
    }

    return ctx, output
}

完整代码:
https://github.com/lbbwyt/eino_test/blob/master/callbacks/aspect_inject_test.go

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容