Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数


8.1 Flow Cache 数据流缓存

KisFlow也提供流式计算中的共享缓存,采用简单的本地缓存供开发者按需使用,有关本地缓存的第三方技术依赖选型: https://github.com/patrickmn/go-cache

8.1.1 go-cache

(1)安装

go get github.com/patrickmn/go-cache

(2)使用

import (
    "fmt"
    "github.com/patrickmn/go-cache"
    "time"
)

func main() {
    // Create a cache with a default expiration time of 5 minutes, and which
    // purges expired items every 10 minutes
    c := cache.New(5*time.Minute, 10*time.Minute)

    // Set the value of the key "foo" to "bar", with the default expiration time
    c.Set("foo", "bar", cache.DefaultExpiration)

    // Set the value of the key "baz" to 42, with no expiration time
    // (the item won't be removed until it is re-set, or removed using
    // c.Delete("baz")
    c.Set("baz", 42, cache.NoExpiration)

    // Get the string associated with the key "foo" from the cache
    foo, found := c.Get("foo")
    if found {
        fmt.Println(foo)
    }

    // Since Go is statically typed, and cache values can be anything, type
    // assertion is needed when values are being passed to functions that don't
    // take arbitrary types, (i.e. interface{}). The simplest way to do this for
    // values which will only be used once--e.g. for passing to another
    // function--is:
    foo, found := c.Get("foo")
    if found {
        MyFunction(foo.(string))
    }

    // This gets tedious if the value is used several times in the same function.
    // You might do either of the following instead:
    if x, found := c.Get("foo"); found {
        foo := x.(string)
        // ...
    }
    // or
    var foo string
    if x, found := c.Get("foo"); found {
        foo = x.(string)
    }
    // ...
    // foo can then be passed around freely as a string

    // Want performance? Store pointers!
    c.Set("foo", &MyStruct, cache.DefaultExpiration)
    if x, found := c.Get("foo"); found {
        foo := x.(*MyStruct)
            // ...
    }
}

详细参考:https://github.com/patrickmn/go-cache

8.1.2 KisFlow集成go-cache能力

(1) Flow提供抽象层接口

在Flow中提供有关Cache的操作的接口,如下:

kis-flow/kis/flow.go

type Flow interface {
    // Run 调度Flow,依次调度Flow中的Function并且执行
    Run(ctx context.Context) error
    // Link 将Flow中的Function按照配置文件中的配置进行连接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow数据到即将执行的Function层
    CommitRow(row interface{}) error
    // Input 得到flow当前执行Function的输入源数据
    Input() common.KisRowArr
    // GetName 得到Flow的名称
    GetName() string
    // GetThisFunction 得到当前正在执行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到当前正在执行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到当前正在执行的Function的Connector
    GetConnector() (Connector, error)
    // GetConnConf 得到当前正在执行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig 得到当前Flow的配置
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName 得到当前Flow的配置
    GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
    Next(acts ...ActionFunc) error

    // ++++++++++++++++++++++++++++++++++++++++
    // GetCacheData 得到当前Flow的缓存数据
    GetCacheData(key string) interface{}
    // SetCacheData 设置当前Flow的缓存数据
    SetCacheData(key string, value interface{}, Exp time.Duration)
}

SetCacheData()为设置本地缓存,Exp为超时时间,如果Exp为0,则为永久。
GetCacheData()为读取本地缓存。

(2)提供一些常量

提供有关缓存超时时间的一些常量。

kis-flow/common/const.go

// cache
const (
    // DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
    DeFaultFlowCacheCleanUp = 5 //单位 min
    // DefaultExpiration 默认GoCahce时间 ,永久保存
    DefaultExpiration time.Duration = 0
)

(3) KisFlow新增成员及初始化

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {

    // ... ...
    // ... ...

    // flow的本地缓存
    cache *cache.Cache // Flow流的临时缓存上线文环境
}

// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)

    // ... ...
    // ... ...

    // 初始化本地缓存
    flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)


    return flow
}

(4)实现接口

最后实现有关缓存读写操作的两个接口,代码如下:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) GetCacheData(key string) interface{} {

    if data, found := flow.cache.Get(key); found {
        return data
    }

    return nil
}

func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
    if Exp == common.DefaultExpiration {
        flow.cache.Set(key, value, cache.DefaultExpiration)
    } else {
        flow.cache.Set(key, value, Exp)
    }
}

8.2 MetaData 临时缓存参数

MetaData我们定义为Flow、Function、Connector每个层级都会提供一个map[string]interface{} 的结构来存放临时数据,这个数据的生命周期与各个实例的生命周期一致。

8.2.1 Flow添加MetaData

首先,KisFlow的成员新增metaData map[string]interface{}和对应的读写锁。

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // ... ...

    // ... ...

    // +++++++++++++++++++++++++++++++++++++++++++
    // flow的metaData
    metaData map[string]interface{} // Flow的自定义临时数据
    mLock    sync.RWMutex           // 管理metaData的读写锁
}

且在KisFlow的构造函数下对metaData成员进行内存初始化,如下:

kis-flow/flow/kis_flow.go

// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)
    
    // ... ...
    // ... ...

    // ++++++++++++++++++++++++++++++++++++++
    // 初始化临时数据
    flow.metaData = make(map[string]interface{})

    return flow
}

之后,给Flow添加MetaData的读写接口,实现非常的简单,如下:

kis-flow/kis/flow.go

type Flow interface {
    // Run 调度Flow,依次调度Flow中的Function并且执行
    Run(ctx context.Context) error
    // Link 将Flow中的Function按照配置文件中的配置进行连接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow数据到即将执行的Function层
    CommitRow(row interface{}) error
    // Input 得到flow当前执行Function的输入源数据
    Input() common.KisRowArr
    // GetName 得到Flow的名称
    GetName() string
    // GetThisFunction 得到当前正在执行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到当前正在执行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到当前正在执行的Function的Connector
    GetConnector() (Connector, error)
    // GetConnConf 得到当前正在执行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig 得到当前Flow的配置
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName 得到当前Flow的配置
    GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
    Next(acts ...ActionFunc) error
    // GetCacheData 得到当前Flow的缓存数据
    GetCacheData(key string) interface{}
    // SetCacheData 设置当前Flow的缓存数据
    SetCacheData(key string, value interface{}, Exp time.Duration)

    // ++++++++++++++++++++++++++++
    // GetMetaData 得到当前Flow的临时数据
    GetMetaData(key string) interface{}
    // SetMetaData 设置当前Flow的临时数据
    SetMetaData(key string, value interface{})
}

定义接口GetMetaData()SetMetaData(),分别作为读写接口。
最后来实现,如下:

kis-flow/flow/kis_flow_data.go

// GetMetaData 得到当前Flow对象的临时数据
func (flow *KisFlow) GetMetaData(key string) interface{} {
    flow.mLock.RLock()
    defer flow.mLock.RUnlock()

    data, ok := flow.metaData[key]
    if !ok {
        return nil
    }

    return data
}

// SetMetaData 设置当前Flow对象的临时数据
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
    flow.mLock.Lock()
    defer flow.mLock.Unlock()

    flow.metaData[key] = value
}

8.2.2 Function 添加MetaData

首先在BaseFunciton中添加成员metaData,如下:

kis-flow/function/kis_base_funciton.go

type BaseFunction struct {
    // Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
    Id     string
    Config *config.KisFuncConfig

    // flow
    flow kis.Flow //上下文环境KisFlow

    // connector
    connector kis.Connector

    // ++++++++++++++++++++++++
    // Function的自定义临时数据
    metaData map[string]interface{}
    // 管理metaData的读写锁
    mLock sync.RWMutex

    // link
    N kis.Function //下一个流计算Function
    P kis.Function //上一个流计算Function

在Funciton构造函数的地方,这里需要进行改进下,每个具体的Funciton都需要一个构造函数来初始化metaData成员,改动如下:

kis-flow/function/kis_base_function.go

func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
    var f kis.Function

    //工厂生产泛化对象
    // ++++++++++++++
    switch common.KisMode(config.FMode) {
    case common.V:
        f = NewKisFunctionV() // +++
    case common.S:
        f = NewKisFunctionS() // +++
    case common.L:
        f = NewKisFunctionL() // +++
    case common.C:
        f = NewKisFunctionC() // +++
    case common.E:
        f = NewKisFunctionE() // +++
    default:
        //LOG ERROR
        return nil
    }

    // 生成随机实例唯一ID
    f.CreateId()

    // 设置基础信息属性
    if err := f.SetConfig(config); err != nil {
        panic(err)
    }

    // 设置Flow
    if err := f.SetFlow(flow); err != nil {
        panic(err)
    }

    return f
}

其中每个构造函数如下:

kis-flow/function/kis_function_c.go

func NewKisFunctionC() kis.Function {
    f := new(KisFunctionC)

    // 初始化metaData
    f.metaData = make(map[string]interface{})

    return f
}

kis-flow/function/kis_function_v.go

func NewKisFunctionV() kis.Function {
    f := new(KisFunctionV)

    // 初始化metaData
    f.metaData = make(map[string]interface{})

    return f
}

kis-flow/function/kis_function_e.go

func NewKisFunctionE() kis.Function {
    f := new(KisFunctionE)

    // 初始化metaData
    f.metaData = make(map[string]interface{})

    return f
}

kis-flow/function/kis_function_s.go

func NewKisFunctionS() kis.Function {
    f := new(KisFunctionS)

    // 初始化metaData
    f.metaData = make(map[string]interface{})

    return f
}

kis-flow/function/kis_function_l.go

func NewKisFunctionL() kis.Function {
    f := new(KisFunctionL)

    // 初始化metaData
    f.metaData = make(map[string]interface{})

    return f
}

接下来,给Funciton抽象层,添加获取metaData成员的接口,如下:

kis-flow/kis/function.go

type Function interface {
    // Call 执行流式计算逻辑
    Call(ctx context.Context, flow Flow) error

    // SetConfig 给当前Function实例配置策略
    SetConfig(s *config.KisFuncConfig) error
    // GetConfig 获取当前Function实例配置策略
    GetConfig() *config.KisFuncConfig

    // SetFlow 给当前Function实例设置所依赖的Flow实例
    SetFlow(f Flow) error
    // GetFlow 获取当前Functioin实力所依赖的Flow
    GetFlow() Flow

    // AddConnector 给当前Function实例添加一个Connector
    AddConnector(conn Connector) error
    // GetConnector 获取当前Function实例所关联的Connector
    GetConnector() Connector

    // CreateId 给当前Funciton实力生成一个随机的实例KisID
    CreateId()
    // GetId 获取当前Function的FID
    GetId() string
    // GetPrevId 获取当前Function上一个Function节点FID
    GetPrevId() string
    // GetNextId 获取当前Function下一个Function节点FID
    GetNextId() string

    // Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
    Next() Function
    // Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
    Prev() Function
    // SetN 设置下一层Function实例
    SetN(f Function)
    // SetP 设置上一层Function实例
    SetP(f Function)

    // ++++++++++++++++++++++++++++++++++
    // GetMetaData 得到当前Function的临时数据
    GetMetaData(key string) interface{}
    // SetMetaData 设置当前Function的临时数据
    SetMetaData(key string, value interface{})
}

对上述新增的两个接口的实现,在BaseFunction中实现就可以了。

kis-flow/funciton/kis_base_function.go

// GetMetaData 得到当前Function的临时数据
func (base *BaseFunction) GetMetaData(key string) interface{} {
    base.mLock.RLock()
    defer base.mLock.RUnlock()

    data, ok := base.metaData[key]
    if !ok {
        return nil
    }

    return data
}

// SetMetaData 设置当前Function的临时数据
func (base *BaseFunction) SetMetaData(key string, value interface{}) {
    base.mLock.Lock()
    defer base.mLock.Unlock()

    base.metaData[key] = value
}

8.2.3 Connector添加MetaData

首先,给KisConnector添加metaData成员,如下:

kis-flow/conn/kis_connector.go

type KisConnector struct {
    // Connector ID
    CId string
    // Connector Name
    CName string
    // Connector Config
    Conf *config.KisConnConfig
    // Connector Init
    onceInit sync.Once
    
    // ++++++++++++++
    // KisConnector的自定义临时数据
    metaData map[string]interface{}
    // 管理metaData的读写锁
    mLock sync.RWMutex
}

// NewKisConnector 根据配置策略创建一个KisConnector
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
    conn := new(KisConnector)
    conn.CId = id.KisID(common.KisIdTypeConnnector)
    conn.CName = config.CName
    conn.Conf = config

    // +++++++++++++++++++++++++++++++++++
    conn.metaData = make(map[string]interface{})

    return conn
}

且在构造函数中进行对metaData的初始化。

其次,给Connector抽象层,提供获取和设置MetaData的接口,如下:

kis-flow/kis/connector.go

type Connector interface {
    // Init 初始化Connector所关联的存储引擎链接等
    Init() error
    // Call 调用Connector 外挂存储逻辑的读写操作
    Call(ctx context.Context, flow Flow, args interface{}) error
    // GetId 获取Connector的ID
    GetId() string
    // GetName 获取Connector的名称
    GetName() string
    // GetConfig 获取Connector的配置信息
    GetConfig() *config.KisConnConfig
    // GetMetaData 得到当前Connector的临时数据

    // +++++++++++++++++++++++++++++++
    GetMetaData(key string) interface{}
    // SetMetaData 设置当前Connector的临时数据
    SetMetaData(key string, value interface{})
}

最后在KisConnector实现上述两个接口,如下:

kis-flow/conn/kis_connector.go

// GetMetaData 得到当前Connector的临时数据
func (conn *KisConnector) GetMetaData(key string) interface{} {
    conn.mLock.RLock()
    defer conn.mLock.RUnlock()

    data, ok := conn.metaData[key]
    if !ok {
        return nil
    }

    return data
}

// SetMetaData 设置当前Connector的临时数据
func (conn *KisConnector) SetMetaData(key string, value interface{}) {
    conn.mLock.Lock()
    defer conn.mLock.Unlock()

    conn.metaData[key] = value
}

8.3 Params 配置文件参数

KisFlow提供了配置文件中,在配置Flow、Function、Connector等的默认携带参数:Params。
如下:
Function:

kistype: func
fname: funcName1
fmode: Verify
source:
  name: 公众号抖音商城户订单数据
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName1_param1
    default2: funcName1_param2

Flow:

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
    params:
      myKey1: flowValue1-1
      myKey2: flowValue1-2
  - fname: funcName2
    params:
      myKey1: flowValue2-1
      myKey2: flowValue2-2
  - fname: funcName3
    params:
      myKey1: flowValue3-1
      myKey2: flowValue3-2

Connector:

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2

这里面开发者均可以给定义的模块,提供Params,其中Flow提供的Params也会叠加到Function中去。

我们在之前构建Flow模块的时候,已经将这些参数读取进了每个模块的内存中,但是并没有给开发者暴露接口。

8.3.1 Flow添加Param读取接口

首先给Flow提供Param的查询接口:

kis-flow/kis/flow.go

type Flow interface {
    // ... ...
    // ... ...
    
    // GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
    GetFuncParam(key string) string
    // GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
    GetFuncParamAll() config.FParam
}

实现如下:

kis-flow/flow/kis_flow_data.go

// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
func (flow *KisFlow) GetFuncParam(key string) string {
    flow.fplock.RLock()
    defer flow.fplock.RUnlock()

    if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
        if value, vok := param[key]; vok {
            return value
        }
    }

    return ""
}

// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
func (flow *KisFlow) GetFuncParamAll() config.FParam {
    flow.fplock.RLock()
    defer flow.fplock.RUnlock()

    param, ok := flow.funcParams[flow.ThisFunctionId]
    if !ok {
        return nil
    }

    return param
}

GetFuncParam()GetFuncParamAll()分别为取出一个key,和取出全部的参数,但都是取出当前正在执行的Function的Params参数。

8.3.2 单元测试

我们这里给FlowName1中的每个Function添加一些参数。

kis-flow/test/load_conf/flow-FlowName1.yml

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
    params:
      myKey1: flowValue1-1
      myKey2: flowValue1-2
  - fname: funcName2
    params:
      myKey1: flowValue2-1
      myKey2: flowValue2-2
  - fname: funcName3
    params:
      myKey1: flowValue3-1
      myKey2: flowValue3-2

然后再分别给这里面关联的Function依次配置一些默认的自定义配置参数,如下:

kis-flow/test/load_conf/func/func-FuncName1.yml

kistype: func
fname: funcName1
fmode: Verify
source:
  name: 公众号抖音商城户订单数据
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName1_param1
    default2: funcName1_param2

kis-flow/test/load_conf/func/func-FuncName2.yml

kistype: func
fname: funcName2
fmode: Save
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  cname: ConnName1
  default_params:
    default1: funcName2_param1
    default2: funcName2_param2

kis-flow/test/load_conf/func/func-FuncName3.yml

kistype: func
fname: funcName3
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName3_param1
    default2: funcName3_param2

我们给FuncName2关联的Connector也配置一些Param参数,如下:

kis-flow/test/load_conf/conn/conn-ConnName1.yml

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2

最后,为了验证我们的配置参数可以在Function执行的过程中被准确的取出,我们依次改造了每个Funciton和Connector的业务函数,把各自Param打印出来,如下:

kis-flow/test/faas/faas_demo1.go

func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName1Handler ----")
    
    // ++++++++++++++++
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for index, row := range flow.Input() {
        // 打印数据
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // 计算结果数据
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交结果数据
        _ = flow.CommitRow(resultStr)
    }

    return nil
}

kis-flow/test/faas/faas_demo2.go

func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName2Handler ----")
    // ++++++++++++++++
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        conn, err := flow.GetConnector()
        if err != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
            return err
        }

        if conn.Call(ctx, flow, row) != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
            return err
        }

        // 计算结果数据
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交结果数据
        _ = flow.CommitRow(resultStr)
    }

    return nil
}

kis-flow/test/faas/faas_demo3.go

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName3Handler ----")
    // ++++
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}

kis-flow/test/caas/caas_demo1.go

func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
    fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
        flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

    // +++++++++++ 
    fmt.Printf("Params = %+v\n", conn.GetConfig().Params)

    fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

    return nil
}

最后,我们来编写单元测试用例代码,如下:

kis-flow/test/kis_params_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestParams(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName1")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd到kis-flow/test/下,执行

 go test -test.v -test.paniconexit0 -test.run  TestParams 

结果如下:

=== RUN   TestParams
....
....

---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
...
...

---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]

...
...

---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]

...
...
--- PASS: TestParams (0.01s)
PASS
ok      kis-flow/test   0.433s

我们可以看到,现在可以正确的取出各个层级的Params的配置参数了。

8.4 【V0.7】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.7


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352