Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

连载中...
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度


首先我们要先定义KisFlow的核心结构体,KisFlow结构体,通过上述的设计理念,我们得知,KisFlow表示整个一条数据计算流的结构。其中每次数据在一个Flow上,依次执行挂载在当前Flow的Function。

2.3.1 KisFunction家族

KisFunction应该是一个链式调用,所以结构体的基本形态应该是一个链表,通过一次Function的调用结束后,默认可以调度到下一个Function的节点上。 在KisFlow中,一共有 saveload, calculate, extend, varify等多种行为的Funciton,所以这里我们采用上述五种function的模板类,方便今后在不同针对不同特征的function做更加灵活和功能隔离的拓展和改造。

整体的KisFunction的类图设计如下:

2.2.2 抽象层KisFunction定义

kis-flow中创建一个新的目录function用来存放function的代码。
首先抽象接口编写在kis/目录下。

kis-flow/kis/function.go

package kis

import (
    "context"
    "kis-flow/config"
)

// Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元,
//             任意个KisFunction可以组合成一个KisFlow
type Function interface {
    // Call 执行流式计算逻辑
    Call(ctx context.Context, flow Flow) error

    // SetConfig 给当前Function实例配置策略
    SetConfig(s *config.KisFuncConfig) error
    // GetConfig 获取当前Function实例配置策略
    GetConfig() *config.KisFuncConfig

    // SetFlow 给当前Function实例设置所依赖的Flow实例
    SetFlow(f Flow) error
    // GetFlow 获取当前Functioin实力所依赖的Flow
    GetFlow() Flow

    // CreateId 给当前Funciton实力生成一个随机的实例KisID
    CreateId()
    // GetId 获取当前Function的FID
    GetId() string
    // GetPrevId 获取当前Function上一个Function节点FID
    GetPrevId() string
    // GetNextId 获取当前Function下一个Function节点FID
    GetNextId() string

    // Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
    Next() Function
    // Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
    Prev() Function
    // SetN 设置下一层Function实例
    SetN(f Function)
    // SetP 设置上一层Function实例
    SetP(f Function)
}

2.2.3 KisId随机唯一实例ID

上述提出了一个新的概念KisId。 KisID为Function的实例ID,用于KisFlow内部区分不同的实例对象。KisId 和 Function Config中的 Fid的区别在于,Fid用来形容一类Funcion策略的ID,而KisId则为在KisFlow中KisFunction已经实例化的 实例对象ID 这个ID是随机生成且唯一。

创建kis-flow/id/目录,且创建kis_id.go 文件,实现有关kis_id生成的算法。

kis-flow/id/kis_id.go

package id

import (
    "github.com/google/uuid"
    "kis-flow/common"
    "strings"
)

// KisID 获取一个中随机实例ID
// 格式为  "prefix1-[prefix2-][prefix3-]ID"
// 如:flow-1234567890
// 如:func-1234567890
// 如: conn-1234567890
// 如: func-1-1234567890
func KisID(prefix ...string) (kisId string) {

    idStr := strings.Replace(uuid.New().String(), "-", "", -1)
    kisId = formatKisID(idStr, prefix...)

    return
}

func formatKisID(idStr string, prefix ...string) string {
    var kisId string

    for _, fix := range prefix {
        kisId += fix
        kisId += common.KisIdJoinChar
    }

    kisId += idStr

    return kisId
}

kisId模块提供KisID()方法,这里面依赖了第三方分布式ID生成库github.com/google/uuid,生成的随机ID为一个字符串,且调用者可以提供多个前缀,通过-符号进行拼接,得到的随机字符串ID,如:func-1234567890

针对KisId的前缀,提供了一些字符串的枚举,如下:

kis-flow/common/const.go

// KisIdType 用户生成KisId的字符串前缀
const (
    KisIdTypeFlow       = "flow"
    KisIdTypeConnnector = "conn"
    KisIdTypeFunction   = "func"
    KisIdTypeGlobal     = "global"
    KisIdJoinChar       = "-"
)

2.2.4 BaseFunction基础父类

按照设计,我们需要提供一个BaseFunction作为Function的一个子类,实现一些基础的功能接口。留下Call()让具体模式的KisFunctionX来重写实现,下面来进行对BaseFunction结构的定义。
kis-flow/function/创建kis_base_funciton.go 文件。

A. 结构定义

kis-flow/function/kis_base_function.go

package function

import (
    "context"
    "errors"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/id"
    "kis-flow/kis"
)

type BaseFunction struct {
    // Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
    Id     string
    Config *config.KisFuncConfig

    // flow
    Flow kis.Flow //上下文环境KisFlow

    // link
    N kis.Function //下一个流计算Function
    P kis.Function //上一个流计算Function
}

B. 方法实现

kis-flow/function/kis_base_function.go

// Call
// BaseFunction 为空实现,目的为了让其他具体类型的KisFunction,如KisFunction_V 来继承BaseFuncion来重写此方法
func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }

func (base *BaseFunction) Next() kis.Function {
    return base.N
}

func (base *BaseFunction) Prev() kis.Function {
    return base.P
}

func (base *BaseFunction) SetN(f kis.Function) {
    base.N = f
}

func (base *BaseFunction) SetP(f kis.Function) {
    base.P = f
}

func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
    if s == nil {
        return errors.New("KisFuncConfig is nil")
    }

    base.Config = s

    return nil
}

func (base *BaseFunction) GetId() string {
    return base.Id
}

func (base *BaseFunction) GetPrevId() string {
    if base.P == nil {
        //Function为首结点
        return common.FunctionIdFirstVirtual
    }
    return base.P.GetId()
}

func (base *BaseFunction) GetNextId() string {
    if base.N == nil {
        //Function为尾结点
        return common.FunctionIdLastVirtual
    }
    return base.N.GetId()
}

func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
    return base.Config
}

func (base *BaseFunction) SetFlow(f kis.Flow) error {
    if f == nil {
        return errors.New("KisFlow is nil")
    }
    base.Flow = f
    return nil
}

func (base *BaseFunction) GetFlow() kis.Flow {
    return base.Flow
}

func (base *BaseFunction) CreateId() {
    base.Id = id.KisID(common.KisIdTypeFunction)
}

这里注意 GetPrevId()GetNextId()两个方法实现,因为如果当前Functioin为双向链表中的第一个节点或者最后一个节点,那么他们的上一个或者下一个是没有节点的,那么ID也就不存在,为了在使用中不出现得不到ID的情况,我们提供了两个虚拟FID,做特殊情况的边界处理,定义在const.go中。

kis-flow/common/const.go

const (
    // FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID
    FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
    // FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID
    FunctionIdLastVirtual = "FunctionIdLastVirtual"
)

2.2.5 KisFunction的V/S/L/C/E等模式类定义

下面分别实现V/S/L/C/E 五种不同模式的KisFunction子类。这里分别用创建文件来实现。

A. KisFunctionV

kis-flow/function/kis_function_v.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionV struct {
    BaseFunction
}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionV, flow = %+v\n", flow)

    // TODO 调用具体的Function执行方法

    return nil
}


B. KisFunctionS

kis-flow/function/kis_function_s.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionS struct {
    BaseFunction
}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionS, flow = %+v\n", flow)

    // TODO 调用具体的Function执行方法

    return nil
}

C. KisFunctionL

kis-flow/function/kis_function_l.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionL struct {
    BaseFunction
}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionL, flow = %+v\n", flow)

    // TODO 调用具体的Function执行方法

    return nil
}

D. KisFunctionC

kis-flow/function/kis_function_c.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionC struct {
    BaseFunction
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunction_C, flow = %+v\n", flow)

    // TODO 调用具体的Function执行方法

    return nil
}

E. KisFunctionE

kis-flow/function/kis_function_e.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionE struct {
    BaseFunction
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionE, flow = %+v\n", flow)

    // TODO 调用具体的Function执行方法

    return nil
}

2.2.6 创建KisFunction实例

下面提供一个创建具体模式Function的方法,这里采用简单工厂方法模式来实现创建对象。

kis-flow/function/kis_base_function.go

func (base *BaseFunction) CreateId() {
    base.Id = id.KisID(common.KisIdTypeFunction)
}

// NewKisFunction 创建一个NsFunction
// flow: 当前所属的flow实例
// s : 当前function的配置策略
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
    var f kis.Function

    //工厂生产泛化对象
    switch common.KisMode(config.FMode) {
    case common.V:
        f = new(KisFunctionV)
        break
    case common.S:
        f = new(KisFunctionS)
    case common.L:
        f = new(KisFunctionL)
    case common.C:
        f = new(KisFunctionC)
    case common.E:
        f = new(KisFunctionE)
    default:
        //LOG ERROR
        return nil
    }

    // 生成随机实例唯一ID
    f.CreateId()

    //设置基础信息属性
    if err := f.SetConfig(config); err != nil {
        panic(err)
    }

    if err := f.SetFlow(flow); err != nil {
        panic(err)
    }

    return f
}


注意 NewKisFunction()方法返回的是一个抽象的interface Function

还要注意,目前到这里没有实现Flow对象,但是KisFunciton的创建需要依赖传递一个Flow对象,我们现在可以暂时简单创建一个Flow对象的构造方法,之后在实现Flow章节再完善这部分的代码。
kis-filw/kis/中创建flow.go文件。

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/config"
)

type Flow interface {
   // TODO
}

kis-flow/flow/下创建kis_flow.go文件,实现如下:

kis-flow/flow/kis_flow.go

package flow

import "kis-flow/config"

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    Id   string
    Name string
    // TODO
}

// TODO for test
// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)

    // 基础信息
    flow.Id = id.KisID(common.KisIdTypeFlow)
    flow.Name = conf.FlowName

    return flow
}

2.2.7 单元测试KisFunction创建实例

现在来对上述的KisFunction实例的创建做一个简单的单元测试,在kis-flow/test/创建kis_function_test.go文件。

kis-flow/test/kis_function_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/flow"
    "kis-flow/function"
    "testing"
)

func TestNewKisFunction(t *testing.T) {
    ctx := context.Background()

    // 1. 创建一个KisFunction配置实例
    source := config.KisSource{
        Name: "公众号抖音商城户订单数据",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
    if myFuncConfig1 == nil {
        panic("myFuncConfig1 is nil")
    }

    // 2. 创建一个 KisFlow 配置实例
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. 创建一个KisFlow对象
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. 创建一个KisFunction对象
    func1 := function.NewKisFunction(flow1, myFuncConfig1)

    if err := func1.Call(ctx, flow1); err != nil {
        t.Errorf("func1.Call() error = %v", err)
    }
}

流程很简单,分为四个小步骤:

  1. 创建一个KisFunction配置实例
  2. 创建一个 KisFlow 配置实例
  3. 创建一个KisFlow对象
  4. 创建一个KisFunction对象

cd到kis-flow/test/目录下执行:

go test -test.v -test.paniconexit0 -test.run TestNewKisFunction

结果如下:

=== RUN   TestNewKisFunction
KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

--- PASS: TestNewKisFunction (0.00s)
PASS
ok      kis-flow/test   1.005s

我们已经调用到了具体的KisFunciton_C实例的Call()方法。

2.5 【V0.1】 源代码

https://github.com/aceld/kis-flow/releases/tag/v0.1


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow


连载中...
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容