主从设计模式的Go实现

在流水线设计模式之外,主从模式(Boss-worker)也是一种重要的多线程设计模式。在主从模式中,存在一个主人线程(Boss),它负责将工作分成同样的几份,并分配给从线程(Worker),Worker各自分头完成工作,最后Boss负责将多个Worker线程的工作成果合并,下面用Go来演示一下这种设计模式。

演示用的例子实现的是类似于Linux中grep的功能,搜索文本中匹配的字符,并列出匹配的行。文本搜索的目标可能是多个文件,对每个文件的搜索是独立的,因此可以利用主从模式提高在多核CPU上提高多文件搜索的效率。

首先定义数据结构,Worker线程的数目取决于CPU核的数目,因为文件的搜索涉及文件I/O和大量内存计算,Worker线程的数目超过CPU核数时会带来线程切换的额外负担,而对提高搜索效率没有效果。

Result结构定义了搜索结果,文件名-匹配行数-匹配行内容。Job结构用于Boss线程和Worker线程的通信,搜索文件-搜索结果。

var workers = runtime.NumCPU() //number of workers

type Result struct {
    filename string //file name
    lino     int    //line number
    line     string //string content of line
}

type Job struct {
    filename string        //the name of the file on procesing
    results  chan<- Result //channel that any result to be sent
}

首先设置Go会使用的CPU核数,然后解析命令行参数,获得 搜索超时,匹配用的正则表达式和搜索目标文件。正则表达式在编译后获得一个表达式指针,这个指针会被Worker线程共享使用。通常,使用共享指针不是线程安全的,但是*regexp.Regexp在Go文档中已经被声明为线程安全了,因此可以放心使用。最后,调用grep()开始工作。

    runtime.GOMAXPROCS(runtime.NumCPU()) // Use all the machine's cores
    ...
    //Compile the input regex
    // lineRx is a shared pointer to value, which shall be a cause of
    //  concern since it's not thread safe, but Go doc *regexp.Regexp is
    //  safe to be shared in as many routines
    if lineRx, err := regexp.Compile(pattern); err != nil {
        log.Fatalf("invalid regexp: %s\n", err)
    } else {
        var timeout int64 = 1e9 * 60 * 10 // 10 minutes!
        if *timeoutOpt != 0 {
            timeout = *timeoutOpt * 1e9
        }
        grep(timeout, lineRx, commandLineFiles(files))
    }

在grep()中,创建三个双向的channel,jobs用于Boss线程分配工作给Worker, results用于Worker线程汇报搜索结果,done中是标志结束的channel。results channel设置了最长1000的缓冲区,当缓冲区满,而Worker线程需要向results中添加数据时, Worker线程会被阻塞直到results的数据被处理缓冲区有空余。

func grep(timeout int64, lineRx *regexp.Regexp, filenames []string) {
    //create channels
    jobs := make(chan Job, workers)
    results := make(chan Result, minimum(1000, len(filenames)))
    done := make(chan struct{}, workers)

    go addJobs(jobs, filenames, results) //boss assign jobs
    for i := 0; i < workers; i++ {
        go doJobs(done, lineRx, jobs) //worker do jobs
    }
    //wait for work to submit work result
    waitAndProcessResults(timeout, done, results)
}

addJobs()将文件名和result channel发送到job channel。doJobs()接受job channel和正则表达式,进行搜索。所有的工作完成后,通过done channel发送完成标志。

func addJobs(jobs chan<- Job, filenames []string, results chan<- Result) {
    for _, filename := range filenames {
        jobs <- Job{filename, results}
    }
    close(jobs)
}

func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
    for job := range jobs {
        job.Do(lineRx)
    }
    done <- struct{}{}
}

搜索的实现按下不表, waitAndProcessResults()等待Worker线程完成并打印结果。select会阻塞Boss线程直到接收到result, finish 或者 done channel 数据。阻塞的Boss线程会睡眠,从而不会消耗CPU资源来死等。每次收到一个done, 表明一个Worker线程的工作完成,当所有的Worker线程都完成后,Boss线程不需要阻塞,可以顺畅地打印所有的结果。

func waitAndProcessResults(timeout int64, done <-chan struct{},
    results <-chan Result) {
    finish := time.After(time.Duration(timeout))
    for working := workers; working > 0; {
        select { // Blocking
        case result := <-results:
            fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
        case <-finish:
            fmt.Println("timed out")
            return // Time's up so finish with what results there were
        case <-done:
            working--
        }
    }
    for {
        select { // Nonblocking
        case result := <-results:
            fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
        case <-finish:
            fmt.Println("timed out")
            return // Time's up so finish with what results there were
        default:
            return
        }
    }
}

搜索的具体实现对于本文的主题没有太大的意义,因此做简要说明,将文件全部读入缓存后,按行检测匹配。

func (job Job) Do(lineRx *regexp.Regexp) {
    file, err := os.Open(job.filename)
    if err != nil {
        log.Printf("error: %s\n", err)
        return
    }
    defer file.Close()
    reader := bufio.NewReader(file)
    for lino := 1; ; lino++ {
        line, err := reader.ReadBytes('\n')
        line = bytes.TrimRight(line, "\n\r")
        if lineRx.Match(line) {
            job.results <- Result{job.filename, lino, string(line)}
        }
        if err != nil {
            if err != io.EOF {
                log.Printf("error:%d: %s\n", lino, err)
            }
            break
        }
    }
}

最后验证一下程序。

./cgrep runtime.GOOS cgrep.go
cgrep.go:90:    if runtime.GOOS == "windows" {

代码清单:[https://github.com/KevinACoder/gobook/blob/master/src/cgrep3/cgrep.go]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容

  • Chapter 8 Goroutines and Channels Go enable two styles of...
    SongLiang阅读 1,578评论 0 3
  • 何为Reactor线程模型? Reactor模式是事件驱动的,有一个或多个并发输入源,有一个Service Han...
    未名枯草阅读 3,491评论 2 11
  • 写在前面的话 代码中的# > 表示的是输出结果 输入 使用input()函数 用法 注意input函数输出的均是字...
    FlyingLittlePG阅读 2,743评论 0 8
  • 面向并发的内存模型 在早期,CPU都是以单核的形式顺序执行机器指令。Go语言的祖先C语言正是这种顺序编程语言的代表...
    Gundy_阅读 598评论 0 3
  • 三日假期匆匆而过,上班的第一天,还是有点忙。盼了许久的落户信息终于下来了,小欢喜。下午在更新公众号,顺便听...
    水漓阅读 138评论 0 0