Channel(二)channel写

2.1、channel创建

channel := make(chan int, 6)

channel创建,本质上就是初始化一个hchan结构体,使用make()创建channel,是如何调用到具体的实例化hchan的函数makechan()的?

在编译期根据make()中的不同类型,将其转换为不同类型的节点,

见/usr/local/go/src/cmd/compile/internal/gc/typecheck.go:327 OMAKE类型节点

func typecheck1(n *Node, top int) (res *Node) {
    // ...
    switch n.Op{
    case OMAKE:
        switch t.Etype {
        case TCHAN:
            l = nil
            if i < len(args){
                // ... 对缓冲区大小进行检测
                n.Left = l  // 带缓冲区,赋值缓冲区大小
            }else{
                n.Left = nodintconst(0) // 不带缓冲区
            }
            n.Op = OMAKECHAN
        }
    }
}

然后OMAKECHAN节点会/usr/local/go/src/cmd/compile/internal/gc/walk.go:416中转换成调用makechan或者makechan64的函数

// src/cmd/compile/internal/gc/walk.go
func walkexpr(n *Node, init *Nodes) *Node {
    switch n.Op {
    case OMAKECHAN:
        size := n.Left
        fnname := "makechan64"
        argtype := types.Types[TINT64]

        if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
            fnname = "makechan"
            argtype = types.Types[TINT]
        }
        n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
    }
}

makechan函数

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize % maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

2.2、写channel

image.jpeg

流程:

  • 首先判断recvq接收队列是否为空,若不为空,说明有消费者goroutine阻塞在队列里,即channel为空或者channel没有缓冲区。直接从recvq中取出一个G,写入数据,唤醒该G,结束写入流程。
  • 若recvq为空,判断当前环形队列是否有空位,如果有,将数据写入队列尾部,如果队列已满,则将当前写数据的goroutine加入到sendq队列中,等待被唤醒
channel := make(chan int, 6) channel <- 10

<- 发送语句实际会被转换为chansend1

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
   chansend(c, elem, true, getcallerpc())
}
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
 /*
 *c: 操作的channel
  ep:指针,指向发送的数据ch<-i
  block:是否阻塞调用,在select case中才会设置为false
  
  return:代表是否发送成功
  */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    //channel为nil的情况,
   if c == nil {
       //block为false,直接返回false,表示发送失败
      if !block {
         return false
      }
      //对于nil channel,直接挂起当前goroutine,永久阻塞
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   if debugChan {
      print("chansend: chan=", c, "\n")
   }

   if raceenabled {
      racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
   }

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   //
   // After observing that the channel is not closed, we observe that the channel is
   // not ready for sending. Each of these observations is a single word-sized read
   // (first c.closed and second full()).
   // Because a closed channel cannot transition from 'ready for sending' to
   // 'not ready for sending', even if the channel is closed between the two observations,
   // they imply a moment between the two when the channel was both not yet closed
   // and not ready for sending. We behave as if we observed the channel at that moment,
   // and report that the send cannot proceed.
   //
   // It is okay if the reads are reordered here: if we observe that the channel is not
   // ready for sending and then observe that it is not closed, that implies that the
   // channel wasn't closed during the first observation. However, nothing here
   // guarantees forward progress. We rely on the side effects of lock release in
   // chanrecv() and closechan() to update this thread's view of c.closed and full().
   if !block && c.closed == 0 && full(c) {
      return false
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

    //加锁
   lock(&c.lock)

    //如果channel关闭,panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

    //从待接收队列中获取等待的goroutine
   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      //只要可以从待接收队列中获取到goroutine,那么发送操作都是只需要copy一次,见code1:send
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      typedmemmove(c.elemtype, qp, ep)
      //递增存放发送数据的索引
      c.sendx++
      if c.sendx == c.dataqsiz {
          //调整索引
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
    //如果是非阻塞发送,那么可以直接解锁返回,未发送成功
   if !block {
      unlock(&c.lock)
      return false
   }

   // Block on the channel. Some receiver will complete our operation for us.
   //阻塞发送,挂起当前goroutine
   gp := getg()
   //生成配置sudo
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   //入队待发送队列
   c.sendq.enqueue(mysg)
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set gp.activeStackChans is not safe for
   // stack shrinking.
   atomic.Store8(&gp.parkingOnChan, 1)
   //挂起goroutine,等待唤醒
   //chanparkcommit函数会解锁,ch.lock
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}

code1:send

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if raceenabled {
      if c.dataqsiz == 0 {
         racesync(c, sg)
      } else {
         // Pretend we go through the buffer, even though
         // we copy directly. Note that we need to increment
         // the head/tail locations only when raceenabled.
         qp := chanbuf(c, c.recvx)
         raceacquire(qp)
         racerelease(qp)
         raceacquireg(sg.g, qp)
         racereleaseg(sg.g, qp)
         c.recvx++
         if c.recvx == c.dataqsiz {
            c.recvx = 0
         }
         c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
   }
   //sg.elem是指向接收goroutine中接收数据的指针 s<-ch
   //如果待接收goroutine需要接收具体的数据,那么直接将数据copy到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   //赋值param,待接收者被唤醒后会根据 param 来判断是否是被发送至唤醒的
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1) //唤醒待接收者
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   // src is on our stack, dst is a slot on another stack.
    //src 是发送的数据源地址,dst是接收数据的地址
    //src 在当前的goroutine栈中,dst在其他栈上
   // Once we read sg.elem out of sg, it will no longer
   // be updated if the destination's stack gets copied (shrunk).
   // So make sure that no preemption points can happen between read & use.
   dst := sg.elem
   // No need for cgo write barrier checks because dst is always
   // Go memory.
   //使用memove直接进行内存copy
   //因为dst指向其他goroutine的栈,如果它发生了栈收缩,那就没有修改真正的dst位置。所以需要加一个写屏障
   typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
   memmove(dst, src, t.size)
}

写channel的特性:

  • 向nil channel发送数据会被永久阻塞,并且不会被select语句选中
  • 如果channel未关闭,非缓冲区没有待接收的goroutine,或者缓冲区已满,那么不会被select语句选中
  • 向关闭的channel发送数据,会panic,并且可以被select语句选中,这也就意味着select语句中可能会panic
  • 如果有待接收者,那么会将发送的数据直接copy到待接收者的接收位置,然后唤醒接收者
  • 如果有缓冲区,并且缓冲区未满,那么就直接把发送的数据copy到缓冲区中
  • 如果channel未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前goroutine,等待被唤醒
  • 发送者被阻塞后,可以被关闭channel操作或者被接收者操作唤醒,关闭channel导致发送者被唤醒后,会panic
  • 当channel中有待接收goroutine,那么channel的状态必然是费缓冲或者缓冲区为空

发送数据,可以被select选中的情况:

  • channel已关闭
  • channel未关闭,channel有待接收的goroutine,或者缓冲区不为空且缓冲区未满
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容

  • 前言 Golang在并发编程上有两大利器,分别是channel和goroutine,这篇文章我们先聊聊channe...
    即将秃头的Java程序员阅读 1,104评论 0 2
  • 了解过go的都知道,go最为突出的优点就是它天然支持高并发,但是所有高并发情况都面临着一个很明显的问题,就是并发的...
    GGBond_8488阅读 334评论 0 3
  • Don't communicate by sharing memory, share memory by comm...
    IceberGu阅读 605评论 0 0
  • 单纯地将函数并发执行是没有意义的,函数与函数之间需要交换数据才能体现并发执行函数的作用。虽然可使用共享内存进行数据...
    JunChow520阅读 424评论 0 2
  • 简介 熟悉Go的人都知道,它提倡着不要通过共享内存来通讯,而要通过通讯来共享内存。Go提供了一种独特的并发同步技术...
    marsjhe阅读 2,645评论 0 2