go 流处理器思路

流水线编程,场景适合的话能够让我们的代码结构更加清晰

还有更多使用方式,可以直接参考go-zero文档 https://github.com/tal-tech/zero-doc/blob/main/doc/fx.md


package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/tal-tech/go-zero/core/fx"
)

func main() {
    ch := make(chan int)

    go inputStream(ch)
    go outputStream(ch)

    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
    <-c
}

func inputStream(ch chan int) {
    count := 0
    for {
        ch <- count
        time.Sleep(time.Millisecond * 500)
        count++
    }
}

func outputStream(ch chan int) {
    fx.From(func(source chan<- interface{}) {  //生产资源
        for c := range ch {
            source <- c
        }
    }).Walk(func(item interface{}, pipe chan<- interface{}) {  //并发处理上一步生产的资源并写入pipe给下游继续加工(并发处理资源的work个数可以通过配置控制)
        count := item.(int)
        pipe <- count
    }).Filter(func(item interface{}) bool {  //串行的过滤资源
        itemInt := item.(int)
        if itemInt%2 == 0 {
            return true
        }
        return false
    }).ForEach(func(item interface{}) {  //串行的收集结果资源
        fmt.Println(item)
    })
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容