Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
接下来我们来增强KisFlow中Function对业务数据处理的聚焦,将之前Function的写法:
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName3Handler ----")
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
是从flow.Input()
中 获取到原始数据,改成可以直接获取到业务想要的具体数据结构类型,而无需断言等类型判断和转换。改成的Function扩展参数用法大致如下:
proto
type StuScores struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type StuAvgScore struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
FaaS
type AvgStuScoreIn struct {
serialize.DefaultSerialize
proto.StuScores
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// 提交结果数据
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}
return nil
}
这样,我们可以通过第三个形式参数rows
直接拿到我们期待的目标输出结构体数据,不需要断言和转换,更加关注业务方的开发效率。
当然,如果你希望获取到原始的数据,依然可以从flow.Input()
中获取到。
本章将实现KisFlow上述功能。
11.1 FaaS业务回调函数自描述
本节将完成FaaS的自描述概念改造,我们知道之前的FaaS回调如下:
type FaaS func(context.Context, Flow) error
那么我们需要一个结构体,来描述这个函数属性,包括他的函数名称、函数地址、形参数量、相残类型、返回值类型等等。
11.1.1 FaaSDesc 回调自描述类型
在kis-flow/kis/
下,新创建一个文件faas.go
,定义如下结构体:
kis-flow/kis/faas.go
// FaaS Function as a Service
// 将
// type FaaS func(context.Context, Flow) error
// 改为
// type FaaS func(context.Context, Flow, ...interface{}) error
// 可以通过可变参数的任意输入类型进行数据传递
type FaaS interface{}
// FaaSDesc FaaS 回调计算业务函数 描述
type FaaSDesc struct {
FnName string // Function名称
f interface{} // FaaS 函数
fName string // 函数名称
ArgsType []reflect.Type // 函数参数类型(集合)
ArgNum int // 函数参数个数
FuncType reflect.Type // 函数类型
FuncValue reflect.Value // 函数值(函数地址)
}
将之前的FaaS改进成interface{}
,而FaaSDesc
具备了一些属性。
- FnName: 表示当前Function的名称,例如我们之前例子的"funcDemo1" 等,这个是用来KisFlow给Function标识的FunctionName。
- f:表示定义的FaaS函数。
- fName: 定义f函数的函数名称。
- ArgsType:定义的f函数的形参类型列表,这是一个slice。
- ArgNum:定义的f函数的输入形参个数。
- FuncType:定义的f函数的数据类型。
- FuncValue:定义的f函数的函数值(可以被调度的函数地址)。
11.1.2 新建一个FaaSDesc对象
下面,提供一个新建FaaSDesc
的构造函数,形参的类型就是KisFlow的FunctionName和定义的FaaS函数,如下:
kis-flow/kis/faas.go
// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
// 传入的回调函数FaaS,函数值(函数地址)
funcValue := reflect.ValueOf(f)
// 传入的回调函数FaaS 类型
funcType := funcValue.Type()
// 判断传递的FaaS指针是否是函数类型
if !isFuncType(funcType) {
return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
}
// 判断传递的FaaS函数是否有返回值类型是只包括(error)
if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
return nil, errors.New("function must have exactly one return value of type error")
}
// FaaS函数的参数类型集合
argsType := make([]reflect.Type, funcType.NumIn())
// 获取FaaS的函数名称
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
// 确保 FaaS func(context.Context, Flow, ...interface{}) error 形参列表,存在context.Context 和 kis.Flow
// 是否包含kis.Flow类型的形参
containsKisFlow := false
// 是否包含context.Context类型的形参
containsCtx := false
// 遍历FaaS的形参类型
for i := 0; i < funcType.NumIn(); i++ {
// 取出第i个形式参数类型
paramType := funcType.In(i)
if isFlowType(paramType) {
// 判断是否包含kis.Flow类型的形参
containsKisFlow = true
} else if isContextType(paramType) {
// 判断是否包含context.Context类型的形参
containsCtx = true
} else if isSliceType(paramType) {
// 判断是否包含Slice类型的形参
// 获取当前参数Slice的元素类型
itemType := paramType.Elem()
// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 获取指针指向的结构体类型
}
} else {
// Other types are not supported...
}
// 将当前形参类型追加到argsType集合中
argsType[i] = paramType
}
if !containsKisFlow {
// 不包含kis.Flow类型的形参,返回错误
return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
if !containsCtx {
// 不包含context.Context类型的形参,返回错误
return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
// 返回FaaSDesc描述实例
return &FaaSDesc{
FnName: fnName,
f: f,
fName: fullName,
ArgsType: argsType,
ArgNum: len(argsType),
FuncType: funcType,
FuncValue: funcValue,
}, nil
}
这里面通过用reflect反射能力,依次从f函数中获取相关的属性值,存放在FaaSDesc
中。
这里面为了确保开发者传递的FaaS原因满足如下格式:
type FaaS func(context.Context, Flow, ...interface{}) error
所以对形参context.Context
和形参Flow
做了严格的形参类型校验,其中的校验方法如下:
kis-flow/kis/faas.go
// isFuncType 判断传递进来的 paramType 是否是函数类型
func isFuncType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Func
}
// isFlowType 判断传递进来的 paramType 是否是 kis.Flow 类型
func isFlowType(paramType reflect.Type) bool {
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
return paramType.Implements(flowInterfaceType)
}
// isContextType 判断传递进来的 paramType 是否是 context.Context 类型
func isContextType(paramType reflect.Type) bool {
typeName := paramType.Name()
return strings.Contains(typeName, "Context")
}
// isSliceType 判断传递进来的 paramType 是否是切片类型
func isSliceType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Slice
}
在NewFaaSDesc()
用containsKisFlow
和containsCtx
两个bool类型的变量来判断是否包括Context和Flow类型。
下面这段代码是为了兼容传递的形参类型是结构体指针时候的兼容:
// ... ...
// 获得当前形参类型
itemType := paramType.Elem()
// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 获取指针指向的结构体类型
}
// ... ...
比如开发者传递的FaaS函数原型如下:
func MyFaaSDemo(ctx context.Context, flow Flow, []*A) error
和:
func MyFaaSDemo(ctx context.Context, flow Flow, []A) error
11.1.3 注册FaaS函数
那么接下来,我们将kisPool
模块,的注册FaaS函数的方法修改成注册一个FaaSDesc描述,修改后的注册方法如下:
kis-flow/kis/pool.go
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
func (pool *kisPool) FaaS(fnName string, f FaaS) {
// 当注册FaaS计算逻辑回调时,创建一个FaaSDesc描述对象
faaSDesc, err := NewFaaSDesc(fnName, f)
if err != nil {
panic(err)
}
pool.fnLock.Lock() // 写锁
defer pool.fnLock.Unlock()
if _, ok := pool.fnRouter[fnName]; !ok {
// 将FaaSDesc描述对象注册到fnRouter中
pool.fnRouter[fnName] = faaSDesc
} else {
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
panic(errString)
}
log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}
那么现在的fnRouter
中保存的key依然是FunctionName,但是value则为当前FaaS函数的描述对象FaaSDesc.
11.1.4 kisPool调度FaaSDesc
最后再调度Function的时候,通过FaaSDesc取出调度函数地址和函数形参列表进行函数的调度。
修改的后的CallFunction()
如下:
kis-flow/kis/pool.go
// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// 被调度Function的形参列表
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// 如果是Flow类型形参,则将 flow的值传入
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// 如果是Context类型形参,则将 ctx的值传入
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
// 如果是Slice类型形参,则将 flow.Input()的值传入
if isSliceType(argType) {
params = append(params, value)
continue
}
// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
params = append(params, reflect.Zero(argType))
}
// 调用当前Function 的计算逻辑
retValues := funcDesc.FuncValue.Call(params)
// 取出第一个返回值,如果是nil,则返回nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// 如果返回值是error类型,则返回error
return retValues[0].Interface().(error)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
函数的整体调度逻辑大致如下:
首选通过fnName进行从fnRouter
路由到对应的FaaSDesc。遍历FaaSDesc的形参列表:
将Context和Flow对象依次取出来,将额外传递的自定义切片形参取出来,如果传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值,如下:
params = append(params, reflect.Zero(argType))
最后执行函数的调度:
retValues := funcDesc.FuncValue.Call(params)
得到第一个返回值error的数值,为nil则返回nil,否则返回error类型。
这样我们的FaaS自描述的调度模式就建立成功了,那么有了这套功能KisFlow可以做什么事情呢,下一节我们可以再调度FaaSDesc的时候将传递的自定义形参的数据类型进行序列化,得到开发者期待的数据类型。
11.2 FaaS形参的自定义数据类型序列化
11.2.1 Serialize序列化接口
首先,我们定义一个数据序列化接口,在kis-flow/kis/
下创建serialize.go 文件,如下:
kis-flow/kis/serialize.go
// Serialize 数据序列化接口
type Serialize interface {
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
// Marshal 用于将指定类型的值序列化为 KisRowArr。
Marshal(interface{}) (common.KisRowArr, error)
}
其中KisRowArr是我们KisFlow中传递每个Function的数据切片,之前我们定义在了kis-flow/common/data_type.go
中:
package common
// KisRow 一行数据
type KisRow interface{}
// KisRowArr 一次业务的批量数据
type KisRowArr []KisRow
/*
KisDataMap 当前Flow承载的全部数据
key : 数据所在的Function ID
value: 对应的KisRow
*/
type KisDataMap map[string]KisRowArr
Serialize
提供了两个接口:
- UnMarshal:用于将 KisRowArr 反序列化为指定类型的值。
- Marshal:用于将指定类型的值序列化为 KisRowArr。
KisFlow会提供一个默认的Serialize
给每个FaaS函数,开发者也可以自定义自己的Serialize
来对FaaS传递的形参进行自定义的数据序列化动作。
11.2.2 KisFlow默认的Serialize序列化
KisFlow提供一个默认的Serialize序列化实例,主要以Json格式为主,在kis-flow/
下创建serialize/
文件夹,在kis-flow/serialize/
下创建serialize_default.go
文件,实现的序列化和反序列化代码如下:
kis-flow/serialize/serialize_default.go
package serialize
import (
"encoding/json"
"fmt"
"kis-flow/common"
"reflect"
)
type DefaultSerialize struct{}
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
// 确保传入的类型是一个切片
if r.Kind() != reflect.Slice {
return reflect.Value{}, fmt.Errorf("r must be a slice")
}
slice := reflect.MakeSlice(r, 0, len(arr))
// 遍历每个元素并尝试反序列化
for _, row := range arr {
var elem reflect.Value
var err error
// 尝试断言为结构体或指针
elem, err = unMarshalStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 尝试直接反序列化字符串
elem, err = unMarshalJsonString(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 尝试先序列化为 JSON 再反序列化
elem, err = unMarshalJsonStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
} else {
return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
}
}
return slice, nil
}
// Marshal 用于将指定类型的值序列化为 KisRowArr(json 序列化)。
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
var arr common.KisRowArr
switch reflect.TypeOf(i).Kind() {
case reflect.Slice, reflect.Array:
slice := reflect.ValueOf(i)
for i := 0; i < slice.Len(); i++ {
// 序列化每个元素为 JSON 字符串,并将其添加到切片中。
jsonBytes, err := json.Marshal(slice.Index(i).Interface())
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v ", err)
}
arr = append(arr, string(jsonBytes))
}
default:
// 如果不是切片或数组类型,则直接序列化整个结构体为 JSON 字符串。
jsonBytes, err := json.Marshal(i)
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v ", err)
}
arr = append(arr, string(jsonBytes))
}
return arr, nil
}
其中一些函数定义如下:
kis-flow/serialize/serialize_default.go
// 尝试断言为结构体或指针
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 检查 row 是否为结构体或结构体指针类型
rowType := reflect.TypeOf(row)
if rowType == nil {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
}
// 如果 row 是指针类型,则获取它指向的类型
if rowType.Kind() == reflect.Ptr {
// 空指针
if reflect.ValueOf(row).IsNil() {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
// 解引用
row = reflect.ValueOf(row).Elem().Interface()
// 拿到解引用后的类型
rowType = reflect.TypeOf(row)
}
// 检查是否可以将 row 断言为 elemType(目标类型)
if !rowType.AssignableTo(elemType) {
return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
}
// 将 row 转换为 reflect.Value 并返回
return reflect.ValueOf(row), nil
}
// 尝试直接反序列化字符串(将Json字符串 反序列化为 结构体)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 判断源数据是否可以断言成string
str, ok := row.(string)
if !ok {
return reflect.Value{}, fmt.Errorf("not a string")
}
// 创建一个新的结构体实例,用于存储反序列化后的值
elem := reflect.New(elemType).Elem()
// 尝试将json字符串反序列化为结构体。
if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
}
return elem, nil
}
// 尝试先序列化为 JSON 再反序列化(将结构体转换成Json字符串,再将Json字符串 反序列化为 结构体)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 将 row 序列化为 JSON 字符串
jsonBytes, err := json.Marshal(row)
if err != nil {
return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v ", err)
}
// 创建一个新的结构体实例,用于存储反序列化后的值
elem := reflect.New(elemType).Interface()
// 将 JSON 字符串反序列化为结构体
if err := json.Unmarshal(jsonBytes, elem); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v ", err)
}
return reflect.ValueOf(elem).Elem(), nil
}
- UnMarshal(): 的实现 首先判断形参是否是一个Slice,如果是的话,那么切片中的每个元素的数据进行序列化,优先尝试
unMarshalStruct()
结构体反序列化,其次尝试json字符串的反序列化unMarshalJsonString()
,最后再尝试具备相同属性的结构体但是名称不同的反序列化unMarshalJsonStruct()
。 - Marshal(): 则是将任意类型序列化为json二进制字符串存储在KisRowArr中。
注意:KisFlow目前的默认序列化只实现了json格式的序列化,开发者可以参考DefaultSerialize{} 来实现自己其他格式的数据序列化和反序列化动作。
11.2.3 默认的默认的Serialize实例
在serialize的接口定义中,定义一个全局默认的序列化实例,defaultSerialize。
kis-flow/kis/serialize.go
// defaultSerialize KisFlow提供的默认序列化实现(开发者可以自定义)
var defaultSerialize = &serialize.DefaultSerialize{}
同时提供一个判断一个数据类型是否实现了抽象接口Serialize
的校验方法,如下:
kis-flow/kis/serialize.go
// isSerialize 判断传递进来的 paramType 是否实现了 Serialize 接口
func isSerialize(paramType reflect.Type) bool {
return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}
11.2.4 FaaSDesc实现Serialize序列化接口
现在将FaaSDesc去继承且实现Serialize接口,在调度FaaSDesc的时候将传递的输入参数进行序列化得到相对应的具体类型形参,定义如下:
kis-flow/kis/faas.go
// FaaSDesc FaaS 回调计算业务函数 描述
type FaaSDesc struct {
// +++++++
Serialize // 当前Function的数据输入输出序列化实现
// +++++++
FnName string // Function名称
f interface{} // FaaS 函数
fName string // 函数名称
ArgsType []reflect.Type // 函数参数类型(集合)
ArgNum int // 函数参数个数
FuncType reflect.Type // 函数类型
FuncValue reflect.Value // 函数值(函数地址)
}
然后,在构造方法NewFaaSDesc()
加上对自定义形参的判断,判断传递的自定义形参是否实现了Serialize
的两个序列化接口,如果实现了,则使用自定义的序列化接口,如果没有实现,则使用默认的DefaultSerialize{}
实例。
kis-flow/kis/faas.go
// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
// ++++++++++
// 输入输出序列化实例
var serializeImpl Serialize
// ++++++++++
// ... ...
// ... ...
// 遍历FaaS的形参类型
for i := 0; i < funcType.NumIn(); i++ {
// 取出第i个形式参数类型
paramType := funcType.In(i)
if isFlowType(paramType) {
// 判断是否包含kis.Flow类型的形参
containsKisFlow = true
} else if isContextType(paramType) {
// 判断是否包含context.Context类型的形参
containsCtx = true
} else if isSliceType(paramType) {
// 获取当前参数Slice的元素类型
itemType := paramType.Elem()
// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 获取指针指向的结构体类型
}
// +++++++++++++++++++++++++++++
// Check if f implements Serialize interface
// (检测传递的FaaS函数是否实现了Serialize接口)
if isSerialize(itemType) {
// 如果当前形参实现了Serialize接口,则使用当前形参的序列化实现
serializeImpl = reflect.New(itemType).Interface().(Serialize)
} else {
// 如果当前形参没有实现Serialize接口,则使用默认的序列化实现
serializeImpl = defaultSerialize // Use global default implementation
}
// +++++++++++++++++++++++++++++++
} else {
// Other types are not supported
}
// 将当前形参类型追加到argsType集合中
argsType[i] = paramType
}
// ... ...
// ... ...
// 返回FaaSDesc描述实例
return &FaaSDesc{
Serialize: serializeImpl,
FnName: fnName,
f: f,
fName: fullName,
ArgsType: argsType,
ArgNum: len(argsType),
FuncType: funcType,
FuncValue: funcValue,
}, nil
}
11.2.5 完成调度FaaS数据序列化
最后在调度FaaSDesc的时候,解析形参的时候,如果是自定义的Slice参数,则对齐进行反序列化操作,将flow.Input()
的原数据反序列化成为开发者需要的结构体数据,进行调度FaaS,实现如下:
kis-flow/kis/pool.go
// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// 被调度Function的形参列表
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// 如果是Flow类型形参,则将 flow的值传入
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// 如果是Context类型形参,则将 ctx的值传入
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
// 如果是Slice类型形参,则将 flow.Input()的值传入
if isSliceType(argType) {
// +++++++++++++++++++
// 将flow.Input()中的原始数据,反序列化为argType类型的数据
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
if err != nil {
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
} else {
params = append(params, value)
continue
}
// +++++++++++++++++++
}
// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
params = append(params, reflect.Zero(argType))
}
// 调用当前Function 的计算逻辑
retValues := funcDesc.FuncValue.Call(params)
// 取出第一个返回值,如果是nil,则返回nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// 如果返回值是error类型,则返回error
return retValues[0].Interface().(error)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
这样我们就将数据序列化的动作和FaaSDesc模块结合起来了,接下来,我们写一个单元测试来测试这部分的能力。
11.3 自定义形参序列化单元测试
11.3.1 Flow与Function的配置文件定义
单元测试,我们新建两个Function配置如下:
kis-flow/test/load_conf/func/func-avgStuScore.yml
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: 学生平均分
must:
- stu_id
kis-flow/test/load_conf/func/func-PrintStuAvgScore.yml
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: 学生平均分
must:
- stu_id
然后我们来定义一个Flow将上述的两个Function链接起来
kis-flow/test/load_conf/flow/flow-StuAvg.yml
kistype: flow
status: 1
flow_name: StuAvg
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore
11.3.2 自定义基础数据proto定义
在kis-flow/test/
下创建proto/
文件夹,创建一个自定义的基础数据proto,为了今后数据协议的复用,如下:
kis-flow/test/proto/stu_score.go
package proto
// 学生学习分数
type StuScores struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
// 学生的平均分
type StuAvgScore struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
11.3.3 定义两个FaaS计算回调函数
定义两个FaaS计算函数,一个为计算一个Student的平均分,一个打印Student的平均分,如下:
kis-flow/test/faas/faas_stu_score_avg.go
package faas
import (
"context"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
proto.StuScores
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// 提交结果数据
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}
return nil
}
AvgStuScore()
方法为我们改进之后的FaaS函数,其中第三个形参rows []*AvgStuScoreIn
为我们自定义序列化的形参,之前我们通过flow.Input()
来拿到原始的数据,然后进行遍历,其实现在依然可以这么处理,但是每次可能需要开发者在FaaS中自行断言判断,对开发的效率有些成本,那么现在开发者完全可以通过AvgStuScoreIn
来描述一个形参的数据,然后在AvgStuScore
的业务中,通过遍历rows
得到已经序列化好的结构体,增加的代码的可读性也降低的写业务的开发成本,提高了效率。
打印平均分的FaaS实现如下:
kis-flow/test/faas/faas_stu_score_avg_print.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}
与上述函数一样,我们依然采用自定义的输入形参来进行逻辑开发。
11.3.4 单元测试用例
接下来我们来编写上面Flow的测试用例单元测试,代码如下:
kis-flow/test/kis_auto_inject_param_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/file"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/test/faas"
"kis-flow/test/proto"
"testing"
)
func TestAutoInjectParamWithConfig(t *testing.T) {
ctx := context.Background()
kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("StuAvg")
if flow1 == nil {
panic("flow1 is nil")
}
// 3. 提交原始数据
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
_ = flow1.CommitRow(faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
// 提交原始数据(json字符串)
_ = flow1.CommitRow(`{"stu_id":101}`)
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
在提交原始数据的时候,我们这里面采用的是使用默认的序列化方式,支持json的反序列化支持,在CommitRow()
的时候,一共提交的3条数据,前两条是提交的结构体数据,最后一次是提交的json字符串,目前都可以支持。
cd 到kis-flow/test/
下,执行:
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
得到结果如下:
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
...
...
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
...
...
Add FlowRouter FlowName=StuAvg
context.Background
====> After CommitSrcData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]]
KisFunctionC, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023af80 ThisFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]] inPut:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
context.Background
====> After commitCurData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]]
KisFunctionE, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023b000 ThisFunctionId:func-7f308d00f4fa49488760ff1dfb85dc46 PrevFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]] inPut:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
stuid: [100], avg score: [2]
stuid: [100], avg score: [2]
stuid: [101], avg score: [0]
--- PASS: TestAutoInjectParamWithConfig (0.01s)
PASS
ok kis-flow/test 0.030s
11.4 【V1.0】 源代码
https://github.com/aceld/kis-flow/releases/tag/v1.0
作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow
Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本