Golang 通道,同步等待组 并发爬虫

Golang:通道,同步等待组 并发爬虫

在Go的并发编程中有一句很经典的话:不要以共享内存的方式去通信,而要以通信的方式去共享内存。

在Go语言中并不鼓励用锁保护共享状态的方式在不同的Goroutine中分享信息(以共享内存的方式去通信)。而是鼓励通过channel将共享状态或共享状态的变化在各个Goroutine之间传递(以通信的方式去共享内存),这样同样能像用锁一样保证在同一的时间只有一个Goroutine访问共享状态。

当然,在主流的编程语言中为了保证多线程之间共享数据安全性和一致性,都会提供一套基本的同步工具集,如锁,条件变量,原子操作等等。Go语言标准库也毫不意外的提供了这些同步机制,使用方式也和其他语言也差不多。

image

WaitGroup

WaitGroup,同步等待组。

在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。主goroutine调用了Add()方法来设置要等待的goroutine的数量。然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。

Add()方法

Add这个方法,用来设置到WaitGroup的计数器的值。我们可以理解为每个waitgroup中都有一个计数器 用来表示这个同步等待组中要执行的goroutin的数量。

如果计数器的数值变为0,那么就表示等待时被阻塞的goroutine都被释放,如果计数器的数值为负数,那么就会引发恐慌,程序就报错了。

Done()方法

Done()方法,就是当WaitGroup同步等待组中的某个goroutine执行完毕后,设置这个WaitGroup的counter数值减1。

Wait()方法

Wait()方法,表示让当前的goroutine等待,进入阻塞状态。一直到WaitGroup的计数器为零。才能解除阻塞, 这个goroutine才能继续执行。

示例代码


package main

import (
    "fmt"
    "sync"
)
var wg sync.WaitGroup // 创建同步等待组对象
func main()  {
    /*
    WaitGroup:同步等待组
        可以使用Add(),设置等待组中要 执行的子goroutine的数量,
        
        在main 函数中,使用wait(),让主程序处于等待状态。直到等待组中子程序执行完毕。解除阻塞

        子gorotuine对应的函数中。wg.Done(),用于让等待组中的子程序的数量减1
     */
    //设置等待组中,要执行的goroutine的数量
    wg.Add(2)
    go fun1()
    go fun2()
    fmt.Println("main进入阻塞状态。。。等待wg中的子goroutine结束。。")
    wg.Wait() //表示main goroutine进入等待,意味着阻塞
    fmt.Println("main,解除阻塞。。")

}
func fun1()  {
    for i:=1;i<=10;i++{
        fmt.Println("fun1.。。i:",i)
    }
    wg.Done() //给wg等待中的执行的goroutine数量减1.同Add(-1)
}
func fun2()  {
    defer wg.Done()
    for j:=1;j<=10;j++{
        fmt.Println("\tfun2..j,",j)
    }
}

channel通道

通道可以被认为是Goroutines通信的管道。类似于管道中的水从一端到另一端的流动,数据可以从一端发送到另一端,通过通道接收。

在前面讲Go语言的并发时候,我们就说过,当多个Goroutine想实现共享数据的时候,虽然也提供了传统的同步机制,但是Go语言强烈建议的是使用Channel通道来实现Goroutines之间的通信。

“不要通过共享内存来通信,而应该通过通信来共享内存” 这是一句风靡golang社区的经典语

接收和发送

一个通道发送和接收数据,默认是阻塞的。当一个数据被发送到通道时,在发送语句中被阻塞,直到另一个Goroutine从该通道读取数据。相对地,当从通道读取数据时,读取被阻塞,直到一个Goroutine将数据写入该通道。

示例代码:以下代码加入了睡眠,可以更好的理解channel的阻塞

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    done := make(chan bool) // 通道
    go func() {
        fmt.Println("子goroutine执行。。。")
        time.Sleep(3 * time.Second)
        data := <-ch1 // 从通道中读取数据
        fmt.Println("data:", data)
        done <- true
    }()
    // 向通道中写数据。。
    time.Sleep(5 * time.Second)
    ch1 <- 100

    <-done
    fmt.Println("main。。over")

}

在上面的程序中,我们先创建了一个chan bool通道。然后启动了一条子Goroutine,并循环打印10个数字。然后我们向通道ch1中写入输入true。
然后在主goroutine中,我们从ch1中读取数据。这一行代码是阻塞的,这意味着在子Goroutine将数据写入到该通道之前,主goroutine将不会执行到下一行代码。

因此,我们可以通过channel实现子goroutine和主goroutine之间的通信。当子goroutine执行完毕前,主goroutine会因为读取ch1中的数据而阻塞。从而保证了子goroutine会先执行完毕。这就消除了对时间的需求。

在之前的程序中,我们要么让主goroutine进入睡眠,以防止主要的Goroutine退出。要么通过WaitGroup来保证子goroutine先执行完毕,主goroutine才结束。

死锁

使用通道时要考虑的一个重要因素是死锁。如果Goroutine在一个通道上发送数据,那么预计其他的Goroutine应该接收数据。如果这种情况不发生,那么程序将在运行时出现死锁。

类似地,如果Goroutine正在等待从通道接收数据,那么另一些Goroutine将会在该通道上写入数据,否则程序将会死锁。

示例代码
package main

func main() {  
    ch := make(chan int)
    ch <- 5
}
报错:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /Users/ruby/go/src/l_goroutine/demo08_chan.go:5 +0x50

Goroutine

Goroutine 是实际并发执行的实体,它底层是使用协程(coroutine)实现并发,coroutine是一种运行在用户态的用户线程,类似于 greenthread,go底层选择使用coroutine的出发点是因为,它具有以下特点:

用户空间 避免了内核态和用户态的切换导致的成本
可以由语言和框架层进行调度
更小的栈空间允许创建大量的实例

Goroutine 调度器

Go并发调度: G-P-M模型

在操作系统提供的内核线程之上,Go搭建了一个特有的两级线程模型。goroutine机制实现了M : N的线程模型,goroutine机制是协程(coroutine)的一种实现,golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。

以上内容来自 https://github.com/rubyhan1314/Golang-100-Days
主要说明一下同步等待组和通道的基本使用,以及 go 是如何处理并发的,更多可以继续参考以上,来自千峰的 go 教程。

实战爬虫

前面说了这么多只不过是为这个脚本做铺垫,要不然则来的太唐突。
我这里写了一个爬虫脚本,用到了通道来做并发,并有同步等待组做 awit() 操作

直接来看代码

获取html
func HttpGet(url string) (result string, err error) {
    resp, err1 := http.Get(url)
    if err != nil {
        err = err1
        return
    }
    defer resp.Body.Close()
    //读取网页的body内容
    buf := make([]byte, 4*1024)
    for true {
        n, err := resp.Body.Read(buf)
        if err != nil {
            if err == io.EOF{
                break
            }else {
                fmt.Println("resp.Body.Read err = ", err)
                break
            }
        }
        result += string(buf[:n])
    }
    return
}
爬取网页存为 .html 文件
func spiderPage(url string) string {

    fmt.Println("正在爬取", url)
    //爬,将所有的网页内容爬取下来
    result, err := HttpGet(url)
    if err != nil {
        fmt.Println(err)
    }
    //把内容写入到文件
    filename := strconv.Itoa(rand.Int()) + ".html"
    f, err1 := os.Create(filename)
    if err1 != nil{
        fmt.Println(err1)
    }
    //写内容
    f.WriteString(result)
    //关闭文件
    f.Close()
    return url + " 抓取成功"

}

爬取方法方面就写完了,接下来就到了重要的部分了

定义一个工作者函数
func doWork(start, end int,wg *sync.WaitGroup) {
    fmt.Printf("正在爬取第%d页到%d页\n", start, end)
    //因为很有可能爬虫还没有结束下面的循环就已经结束了,所以这里就需要且到通道
    page := make(chan string,100)
    results := make(chan string,100)


    go sendResult(results,start,end)

    go func() {

        for i := 0; i <= 20; i++ {
            wg.Add(1)
            go asyn_worker(page, results, wg)
        }
    }()

    for i := start; i <= end; i++ {
            url := "https://tieba.baidu.com/f?kw=%E7%BB%9D%E5%9C%B0%E6%B1%82%E7%94%9F&ie=utf-8&pn=" + strconv.Itoa((i-1)*50)
            page <- url
            println("加入" + url + "到page")
        }
        println("关闭通道")
        close(page)

    wg.Wait()
    //time.Sleep(time.Second * 5)
    println(" Main 退出 。。。。。")
}
从通道取出数据
func asyn_worker(page chan string, results chan string,wg *sync.WaitGroup){

    defer wg.Done()  //defer wg.Done()必须放在go并发函数内

    for{
        v, ok := <- page //显示的调用close方法关闭通道。
        if !ok{
            fmt.Println("已经读取了所有的数据,", ok)
            break
        }
        //fmt.Println("取出数据:",v, ok)
        results <- spiderPage(v)
    }


    //for n := range page {
    //  results <- spiderPage(n)
    //}
}
发送抓取结果
func sendResult(results chan string,start,end int)  {

    //for i := start; i <= end; i++ {
    //  fmt.Println(<-results)
    //}

    // 发送抓取结果
    for{
        v, ok := <- results
        if !ok{
            fmt.Println("已经读取了所有的数据,", ok)
            break
        }
        fmt.Println(v)

    }
}

大体思路是这样的:

可以看到我定义了两个通道,一个是用来存入 url 的,另一个是用来存入爬取结果的,缓冲空间是 100
在方法 doWork 中, sendResult 会阻塞等待 results 通道的输出,匿名函数则是等待 page 通道的输出

紧接着下面就是把 200 个 url 写入 page 通道,匿名函数得到 page 的输出就会执行 asyn_worker 函数,也就是爬取 html 的函数了(将其存入results 通道)

然后 sendResult 函数得到 results 通道的输出,将结果打印出来

可以看到 我在匿名函数中并发了 20 个 goroution,并且启用了同步等待组作为参数传入,理论上可以根据机器的性能来定义 并发数

main函数

func main() {
    start_time := time.Now().UnixNano()

    var wg sync.WaitGroup

    doWork(1,200, &wg)
    //输出执行时间,单位为毫秒。
    fmt.Printf("执行时间: %ds",(time.Now().UnixNano() - start_time) / 1000)

}

运行爬虫并计算运行时间,这个时间因机器而异,但应该不会相差太多

完整代码

package main

import (
    "fmt"
    "io"
    "sync"
    "math/rand"
    "net/http"
    "os"
    "strconv"
    "time"
)



func HttpGet(url string) (result string, err error) {
    resp, err1 := http.Get(url)
    if err != nil {
        err = err1
        return
    }
    defer resp.Body.Close()
    //读取网页的body内容
    buf := make([]byte, 4*1024)
    for true {
        n, err := resp.Body.Read(buf)
        if err != nil {
            if err == io.EOF{
                break
            }else {
                fmt.Println("resp.Body.Read err = ", err)
                break
            }
        }
        result += string(buf[:n])
    }
    return
}


//爬取网页
func spiderPage(url string) string {

    fmt.Println("正在爬取", url)
    //爬,将所有的网页内容爬取下来
    result, err := HttpGet(url)
    if err != nil {
        fmt.Println(err)
    }
    //把内容写入到文件
    filename := strconv.Itoa(rand.Int()) + ".html"
    f, err1 := os.Create(filename)
    if err1 != nil{
        fmt.Println(err1)
    }
    //写内容
    f.WriteString(result)
    //关闭文件
    f.Close()
    return url + " 抓取成功"

}

func asyn_worker(page chan string, results chan string,wg *sync.WaitGroup){

    defer wg.Done()  //defer wg.Done()必须放在go并发函数内

    for{
        v, ok := <- page //显示的调用close方法关闭通道。
        if !ok{
            fmt.Println("已经读取了所有的数据,", ok)
            break
        }
        //fmt.Println("取出数据:",v, ok)
        results <- spiderPage(v)
    }

    //for n := range page {
    //  results <- spiderPage(n)
    //}
}

func doWork(start, end int,wg *sync.WaitGroup) {
    fmt.Printf("正在爬取第%d页到%d页\n", start, end)
    //因为很有可能爬虫还没有结束下面的循环就已经结束了,所以这里就需要且到通道
    page := make(chan string,100)
    results := make(chan string,100)


    go sendResult(results,start,end)

    go func() {

        for i := 0; i <= 20; i++ {
            wg.Add(1)
            go asyn_worker(page, results, wg)
        }
    }()


    for i := start; i <= end; i++ {
            url := "https://tieba.baidu.com/f?kw=%E7%BB%9D%E5%9C%B0%E6%B1%82%E7%94%9F&ie=utf-8&pn=" + strconv.Itoa((i-1)*50)
            page <- url
            println("加入" + url + "到page")
        }
        println("关闭通道")
        close(page)

    wg.Wait()
    //time.Sleep(time.Second * 5)
    println(" Main 退出 。。。。。")
}


func sendResult(results chan string,start,end int)  {

    //for i := start; i <= end; i++ {
    //  fmt.Println(<-results)
    //}

    // 发送抓取结果
    for{
        v, ok := <- results
        if !ok{
            fmt.Println("已经读取了所有的数据,", ok)
            break
        }
        fmt.Println(v)

    }
}

func main() {
    start_time := time.Now().UnixNano()

    var wg sync.WaitGroup

    doWork(1,200, &wg)
    //输出执行时间,单位为毫秒。
    fmt.Printf("执行时间: %ds",(time.Now().UnixNano() - start_time) / 1000)

}

总体来说,这个脚本就是为了弄清楚 Go 语言的并发原理 以及 通道,同步等待组的基本使用,或者只用 go 语言的锁,目的都是为了防止 临界资源的安全问题。

有了 channel 和 goroutine 之后,Go 的并发编程变得异常容易和安全,得以让程序员把注意力留到业务上去,实现开发效率的提升。

欢迎转载,但要声明出处,不然我顺着网线过去就是一拳。
个人技术博客:http://www.gzky.live

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容

  • 参考《快学 Go 语言》第 11 课 —— 千军万马跑协程《快学 Go 语言》第 12 课 —— 通道let's ...
    合肥黑阅读 2,907评论 2 7
  • 开发go程序的时候,时常需要使用goroutine并发处理任务,有时候这些goroutine是相互独立的,而有的时...
    驻马听雪阅读 2,411评论 0 21
  • 控制并发有三种种经典的方式,一种是通过channel通知实现并发控制 一种是WaitGroup,另外一种就是Con...
    wiseAaron阅读 10,633评论 4 34
  • 不期望幸运顺利的人对别人的苦难波折感同身受,但也希望别高高在上鄙视身心磨难的他人是矫情。哎呀你太矫情了,不就是怀个...
    viviv阅读 414评论 0 0
  • 昨天的泰兴市场开拓相当顺利。有此我想到很多。 其实我这辈子做任何事情都比较顺利,那不是因为我的能力有多强,...
    悦琴1818阅读 137评论 0 0