在Eino 中的callpacks包为组件的扩展提供回调(callback)的机制, 它允许用户在组件执行的不同阶段注入回调处理器(handlers), 比如开始, 结束或者错误发生时, 通过切片注入的方式常用于记录日志, 监控, 和统计分析等功能
示例:
// func (t *testChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
// defer func() {
// if err != nil {
// callbacks.OnEnd(ctx, err)
// }
// }()
//
// ctx = callbacks.OnStart(ctx, &model.CallbackInput{
// Messages: input,
// Tools: nil,
// Extra: nil,
// })
//
// // do smt
//
// ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
// Message: resp,
// Extra: nil,
// })
//
// return resp, nil
// }
定义Handler 接口, 及运行时信息RunInfo 类型
package callbacks
import "context"
type RunInfo struct {
Name string
Type string
Component Component
}
type CallbackInput any
type CallbackOutput any
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
}
Handler 接口定义了组件在运行开始(OnStart), 结束(OnEnd)以及错误发生时的需要处理的声明(OnError)
下面通过切面编程实现自动日志的记录的Handler, loggerCallbacks 实现 Handler 接口, 分别定义运行开始(OnStart), 结束(OnEnd)及错误发生时(OnError)的日志记录逻辑实现:
package callbacks
import (
"context"
"github.com/sirupsen/logrus"
)
type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)
// 日志handler 实现
type loggerCallbacks struct{}
func (l *loggerCallbacks) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
logrus.Infof("name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
return ctx
}
func (l *loggerCallbacks) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
logrus.Infof("name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
return ctx
}
func (l *loggerCallbacks) OnError(ctx context.Context, info *RunInfo, err error) context.Context {
logrus.Errorf("name: %v, type: %v, component: %v, error: %v", info.Name, info.Type, info.Component, err)
return ctx
}
使用示例:
package callbacks
import (
"context"
"fmt"
"strconv"
"testing"
)
func TestAspectInject(t *testing.T) {
t.Run("log_handler", func(t *testing.T) {
ctx := context.Background()
hb := &loggerCallbacks{}
runInfo := &RunInfo{
Name: "test_log",
Type: "sdf",
Component: "ChatTemplate",
}
ctx = InitCallbacks(ctx, runInfo, hb)
ctx = OnStart(ctx, 1)
//业务逻辑处理此处省略......
ctx = OnEnd(ctx, 2)
ctx = OnError(ctx, fmt.Errorf("3"))
})
}
示例输出结果如下:

image.png
- 示例中, 首先需要 InitCallbacks(ctx, runInfo, hb)函数来通过初始化回调的运行时信息(RunInfo),处理器信息 (Handler), 以及上下文信息和Manager, 具体实现如下:
func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context {
mgr, ok := newManager(info, handlers...)
if ok {
return ctxWithManager(ctx, mgr)
}
return ctxWithManager(ctx, nil)
}
var GlobalHandlers []Handler
func newManager(runInfo *RunInfo, handlers ...Handler) (*manager, bool) {
if len(handlers)+len(GlobalHandlers) == 0 {
return nil, false
}
hs := make([]Handler, len(GlobalHandlers))
copy(hs, GlobalHandlers)
return &manager{
globalHandlers: hs,
handlers: handlers,
runInfo: runInfo,
}, true
}
func ctxWithManager(ctx context.Context, manager *manager) context.Context {
return context.WithValue(ctx, CtxManagerKey, manager)
}
func managerFromCtx(ctx context.Context) (*manager, bool) {
v := ctx.Value(CtxManagerKey)
m, ok := v.(*manager)
if ok && m != nil {
n := *m
return &n, true
}
return nil, false
}
manager 主要用于管理处理器, 并通过上下文context 在调用链上进行传递, 类型定义如下:
type manager struct {
globalHandlers []Handler
handlers []Handler
runInfo *RunInfo
}
- 示例中通过 ctx = OnStart(ctx, 1) 来声明业务逻辑处理开始时的切面操作, 并传入业务逻辑数据, OnStart实现如下:
// OnStart invokes the OnStart logic for the particular context, ensuring that all registered
// handlers are executed in reverse order (compared to add order) when a process begins.
func OnStart[T any](ctx context.Context, input T) context.Context {
ctx, _ = On(ctx, input, OnStartHandle[T], true)
return ctx
}
// 遍历handler, 并调用handler的OnStart函数
func OnStartHandle[T any](ctx context.Context, input T,
runInfo *RunInfo, handlers []Handler) (context.Context, T) {
for i := len(handlers) - 1; i >= 0; i-- {
ctx = handlers[i].OnStart(ctx, runInfo, input)
}
return ctx, input
}
type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)
func On[T any](ctx context.Context, inOut T, handle Handle[T], start bool) (context.Context, T) {
mgr, ok := managerFromCtx(ctx)
if !ok {
return ctx, inOut
}
nMgr := *mgr
var info *RunInfo
if start {
info = nMgr.runInfo
nMgr.runInfo = nil
ctx = context.WithValue(ctx, CtxRunInfoKey, info)
} else {
if nMgr.runInfo != nil {
info = nMgr.runInfo
} else {
info, _ = ctx.Value(CtxRunInfoKey).(*RunInfo)
}
}
hs := make([]Handler, 0, len(nMgr.handlers)+len(nMgr.globalHandlers))
for _, handler := range append(nMgr.handlers, nMgr.globalHandlers...) {
hs = append(hs, handler)
}
var out T
// 从ctx中的manager中获取处理器, 并调用handler的OnStart函数
ctx, out = handle(ctx, inOut, info, hs)
return ctxWithManager(ctx, &nMgr), out
}
- OnEnd 与 OnError 实现类似:
// OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup
// and finalization when a process ends.
// handlers are executed in normal order (compared to add order).
func OnEnd[T any](ctx context.Context, output T) context.Context {
ctx, _ = On(ctx, output, OnEndHandle[T], false)
return ctx
}
// OnError invokes the OnError logic of the particular, notice that error in stream will not represent here.
// handlers are executed in normal order (compared to add order).
func OnError(ctx context.Context, err error) context.Context {
ctx, _ = On(ctx, err, OnErrorHandle, false)
return ctx
}
func OnErrorHandle(ctx context.Context, err error,
runInfo *RunInfo, handlers []Handler) (context.Context, error) {
for _, handler := range handlers {
ctx = handler.OnError(ctx, runInfo, err)
}
return ctx, err
}
func OnEndHandle[T any](ctx context.Context, output T,
runInfo *RunInfo, handlers []Handler) (context.Context, T) {
for _, handler := range handlers {
ctx = handler.OnEnd(ctx, runInfo, output)
}
return ctx, output
}
完整代码:
https://github.com/lbbwyt/eino_test/blob/master/callbacks/aspect_inject_test.go