Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型
案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用
DownLoad kis-flow source
$go get github.com/aceld/kis-flow
案例源代码
https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines
如果需要同一个Flow在多个Goroutine中同时并发执行,那么可以通过flow.Fork()
函数,克隆一份内存隔离但是具备相同配置的Flow实例,然后分别在不同的协程中去计算执行各自的数据流。
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
"sync"
)
func main() {
ctx := context.Background()
// Get a WaitGroup
var wg sync.WaitGroup
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
if flow1 == nil {
panic("flow1 is nil")
}
// Fork the flow
flowClone1 := flow1.Fork(ctx)
// Add to WaitGroup
wg.Add(2)
// Run Flow1
go func() {
defer wg.Done()
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
}()
// Run FlowClone1
go func() {
defer wg.Done()
// Submit a string
_ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`)
if err := flowClone1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
}()
// Wait for Goroutines to finish
wg.Wait()
fmt.Println("All flows completed.")
return
}
func init() {
// Register functions
kis.Pool().FaaS("VerifyStu", VerifyStu)
kis.Pool().FaaS("AvgStuScore", AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow
Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型
案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(三)-KisFlow在多协程中的应用