tunny源码阅读

前言

最近在学习Go并发,在同学强烈推荐下,阅读了tunny源码。在此记录自己的理解和感想。

tunny 基于Go实现的协程池

要去理解一个东西,最快的方式莫过于先去熟悉使用它。那么,现在我们就开始使用它:假设我们现在的需求是输入一个字符串,将它与"Hello"拼接后打印并返回,要求用tunny实现。

    #在NewFunc函数中需要对传入的函数进行转换并执行
    printHello := func(str interface{}) interface{} {
        fmt.Println("Hello!", str)
        return "Hello! " + str.(string)
    }
    pool3 := tunny.NewFunc(3, func(payload interface{}) interface{} {
        f, ok := payload.(func())
        if !ok {
            return nil
        }
        f()
        return f
    })
    pool3.Process(printHello("lizhuoming"))
    #而NewCallback的Process函数封装了这个操作
    printHello := func(str interface{}) interface{} {
        fmt.Println("Hello!", str)
        return "Hello! " + str.(string)
    }
    pool2 := tunny.NewCallback(2)
    pool2.Process(printHello("lizhuoming"))
#New的灵活度最高,我们可以定制自己的Worker
type myWorker struct {
    processor func(interface{}) interface{}
}

func (w *myWorker) Process(payload interface{}) interface{} {
    return w.processor(payload)
}

func (w *myWorker) BlockUntilReady() {}
func (w *myWorker) Interrupt()       {}
func (w *myWorker) Terminate()       {}

func main() {
    printHello := func(str interface{}) interface{} {
        fmt.Println("Hello!", str)
        return "Hello! " + str.(string)
    }

    pool1 := tunny.New(3, func() tunny.Worker {
        return &myWorker{
            processor: printHello,
        }
    })
    pool1.Process("lizhuoming")
}

源码分析

在熟悉了tunny的使用后,我们通过代码来看看它是如何工作的吧~

协程池的主要工作流程

在我们创建并指定协程池容量后,协程池会启动指定容量个协程。它们竞争向一个channel中写入 workRequest(它充当一个桥梁,连接 Process 函数与真正执行任务的协程)。当你调用 Process 函数时,它会通过这个桥梁将任务传递给协程,并在任务结束后,接收到协程返回的结果。
下面,我们来了解它的具体实现吧

桥梁以及Process函数与协程之间的通信实现

type Worker interface {
    // 执行任务
    Process(interface{}) interface{}
    // 在执行任务前执行,相当于init
    BlockUntilReady()
    // 在任务执行时被终止时,会执行该函数
    Interrupt()
    // 当协程被关闭时,执行该函数
    Terminate()
}

//协程池
type Pool struct {
        //正在执行的任务数量
        queuedJobs int64
        ctor    func() Worker
        workers []*workerWrapper
        //所有运行的协程会竞争向该channel写入workRequest
        reqChan chan workRequest
        workerMut sync.Mutex
}

//桥梁载体
type workRequest struct {
        //接收任务的channel
        jobChan chan<- interface{}
        //返回结果的channel
        retChan <-chan interface{}
        //中断协程的执行
        interruptFunc func()
}

//负责管理worker(stop函数)和goroutine(interrupt函数)的整个生命周期
type workerWrapper struct {
        worker        Worker
        interruptChan chan struct{}
        // workerWrapper 和 Pool 的reqChan是同一个(channel是引用传递)
        reqChan chan<- workRequest
        closeChan chan struct{}
        closedChan chan struct{}
}
func (p *Pool) ProcessTimed(
    payload interface{},
    timeout time.Duration,
) (interface{}, error) {
    atomic.AddInt64(&p.queuedJobs, 1)
    defer atomic.AddInt64(&p.queuedJobs, -1)

    tout := time.NewTimer(timeout)

    var request workRequest
    var open bool

    select {
    //读取桥梁载体
    case request, open = <-p.reqChan:
        if !open {
            return nil, ErrPoolNotRunning
        }
    //超时处理
    case <-tout.C:
        return nil, ErrJobTimedOut
    }

    select {
    //通过桥梁载体将任务传给协程
    case request.jobChan <- payload:
    case <-tout.C:
        //调用 workerWrapper 的 interrupt 方法,结束函数执行
        request.interruptFunc()
        return nil, ErrJobTimedOut
    }

    select {
    //接收返回数据
    case payload, open = <-request.retChan:
        if !open {
            return nil, ErrWorkerClosed
        }
    case <-tout.C:
        //调用 workerWrapper 的 interrupt 方法,结束函数执行
        request.interruptFunc()
        return nil, ErrJobTimedOut
    }

    tout.Stop()
    return payload, nil
}

func (w *workerWrapper) run() {
    jobChan, retChan := make(chan interface{}), make(chan interface{})
    defer func() {
        w.worker.Terminate()
        close(retChan)
        close(w.closedChan)
    }()

    for {
        w.worker.BlockUntilReady()
        select {
        // 给channel中写入桥梁载体,为协程私有
        case w.reqChan <- workRequest{
            jobChan:       jobChan,
            retChan:       retChan,
            interruptFunc: w.interrupt,
        }:
            select {
            //尝试读取任务
            case payload := <-jobChan:
                result := w.worker.Process(payload)
                select {
                case retChan <- result:
                case <-w.interruptChan:
                    w.interruptChan = make(chan struct{})
                }
            //执行被中断,新建中断 channel
            case _, _ = <-w.interruptChan:
                w.interruptChan = make(chan struct{})
            }
        // 协程被关闭
        case <-w.closeChan:
            return
        }
    }
}

协程池如何保证协程数恒定

func (p *Pool) SetSize(n int) {
    p.workerMut.Lock()
    defer p.workerMut.Unlock()

    lWorkers := len(p.workers)
    if lWorkers == n {
        return
    }
    // 给池中添加协程
    for i := lWorkers; i < n; i++ {
        p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
    }
    // 异步关闭超出的协程
    for i := n; i < lWorkers; i++ {
        p.workers[i].stop()
    }
    // 同步等待所有超出协程都关闭完成
    for i := n; i < lWorkers; i++ {
        p.workers[i].join()
    }
    p.workers = p.workers[:n]
}

一些感想

代码就说到这儿了,下面来谈谈我的感想:
(1)不得不说人家的代码健壮性真好,以后自己在写代码时也要借鉴人家的经验
(2)通过对 workerWrapper 和 workRequest 的设计和逻辑的拆分,使代码解耦,并且这样的代码逻辑看起来是非常清晰的

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • 轻量级线程:协程 在常用的并发模型中,多进程、多线程、分布式是最普遍的,不过近些年来逐渐有一些语言以first-c...
    Tenderness4阅读 6,359评论 2 10
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,939评论 25 707
  • 原文链接:https://github.com/EasyKotlin 在常用的并发模型中,多进程、多线程、分布式是...
    JackChen1024阅读 10,722评论 3 23
  • 第七届世界面包大赛中国预选赛华南分区的广州预选赛在2018年10月23-24日已成功举办。 我作为1号选手的助手出...
    独自修炼的我阅读 442评论 0 4
  • 我家全款买的 我妈给看两年了,不给看 就没法往下谈 不赶紧拿走东西,结果我妈回来了
    兔牙wendy阅读 172评论 0 1