classic pipeline example

workflow for pipeline

pipeline-v1.jpg

runing result

enter contrl+c  to stop pipeline program
result.png

code example

package main

import (
        "log"
        "math/rand"
        "os"
        "os/signal"
        "syscall"
        "time"
)

/*
classic pipeline demo write by perrynzhou@gmail.com
*/
const (
        batchSize = 8
)

/*
note:
        for range 在chan上有如下特性
                1.如果chan上有数据,则for 继续往下执行,如果chan没有数据则for 会阻塞
                2.如果chan被close了,则chan为nil,for range会退出循环。
*/
type PipeFeature struct {
        input1 chan int64
        input2 chan int64
        input3 chan int64
        done   chan struct{}
        stop   chan struct{}
}

func NewPipeFeature() *PipeFeature {
        return &PipeFeature{
                input1: make(chan int64, batchSize),
                input2: make(chan int64, batchSize),
                input3: make(chan int64, batchSize),
                done:   make(chan struct{}),
                stop:   make(chan struct{}),
        }
}
func (p *PipeFeature) init() {
        log.Println("...init running...")
        defer close(p.input1)
        for {
                select {
                case <-p.done:
                        log.Println("...init stop...")
                        return
                default:
                        time.Sleep(5 * time.Millisecond)
                        p.input1 <- rand.Int63n(65535)
                }
        }
}
func (p *PipeFeature) stage1() {
        log.Println("...stage1 running...")
        defer close(p.input2)
        for v := range p.input1 { //will block util input1 close
                v = v - rand.Int63n(1024)
                p.input2 <- v
        }
        log.Println("stage1 done...")
}
func (p *PipeFeature) stage2() {
        log.Println("...stage2 running...")
        defer close(p.input3)
        for v := range p.input2 {
                v = v + 1
                p.input3 <- v
        }
        log.Println("stage2 done...")
}
func (p *PipeFeature) stage3() {
        log.Println("...stage3 running...")
        for v3 := range p.input3 { //will block
                v3 = v3 + rand.Int63n(100)
        }
        log.Println("stage3 done...")
}
func (p *PipeFeature) Run() {
        log.Println("start pipeline...")
        go p.init() //order2- recv data from done and closed input1, return this function
        go p.stage1() order 3-if input1 is closed,break for loop, and close input2 before return 
        go p.stage2() //order 4-if input2 is closed ,break for range input2 and close input3 before return 
       // order 5- if input3 is closed,stage3 return
        p.stage3() //  will block util input3 closed after call stage2
        p.stop <- struct{}{} // order 6-send stop flag to stop chan before end Run function
}
func (p *PipeFeature) Stop() {
        p.done <- struct{}{}  // order 1-let init function to stop
      //order 7 - already recv data from stop chan
        <-p.stop //wait for recv stop chan
        log.Println("stop pipeline...")
}
func main() {
        pipe := NewPipeFeature()
        defer pipe.Stop()
        sigs := make(chan os.Signal, 1)
        signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
        go pipe.Run()
        for {
                select {
                case <-sigs:
                        log.Println("recieve stop signal")
                        return
                }
        }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 12,161评论 0 10
  • 寒雪梅中尽,春风柳上归。和煦的春风轻拂脸面,草小的学子绽放笑颜。问心为何如花蕾般绚烂?开学典礼的盛况...
    临水伊人阅读 5,858评论 0 1
  • 一个傻不拉几的我听完了整部电影的发音仍然不知悔改得觉得它是法国电影,直到看完才在简介中发现它是俄罗斯出品,...
    曦翎阅读 3,630评论 0 0
  • 俗话说,一人吃两人补。准妈妈在怀孕期间,为了胎儿的正常发育和健康成长,需要补充大量的营养,不能挑食,营养要全面均衡...
    芭乐街阅读 1,696评论 0 0
  • 姓名:刘小琼 公司:宁波大发化纤有限公司 宁波盛和塾《六项精进》第235期学员 【日精进打卡第123天】 知~学习...
    刘小琼123阅读 851评论 0 0