package main
import (
"fmt"
"time"
)
func sig(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
func main() {
// start := time.Now()
// //or-Done模式
// <-or(
// sig(10*time.Second),
// sig(20*time.Second),
// sig(30*time.Second),
// sig(40*time.Second),
// sig(50*time.Second),
// sig(01*time.Minute),
// )
//fan in
// <-fanInRec(
// sig(10*time.Second),
// sig(20*time.Second),
// sig(30*time.Second),
// sig(40*time.Second),
// sig(50*time.Second),
// sig(01*time.Minute),
// )
// fmt.Printf("done after %v", time.Since(start))
//fan out
ch := make(chan interface{})
chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}
fanOut(ch, chLister, false)
ch <- 888
fmt.Println(<-chLister[0])
fmt.Println(<-chLister[1])
fmt.Println(<-chLister[2])
}
func or(channels ...<-chan interface{}) <-chan interface{} {
// 特殊情况,只有零个或者1个chan
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2: // 2个也是一种特殊情况
select {
case <-channels[0]:
case <-channels[1]:
}
default: //超过两个,二分法递归处理,也可以使用reflect
m := len(channels) / 2
select {
case <-or(channels[:m]...):
case <-or(channels[m:]...):
}
}
}()
return orDone
}
func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo(
fanInRec(chans[:m]...),
fanInRec(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { //只要还有可读的chan
select {
case v, ok := <-a:
if !ok { // a 已关闭,设置为nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已关闭,设置为nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() { //退出时关闭所有的输出chan
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := 0; i < len(out); i++ {
i := i
if async { //异步
go func() {
out[i] <- v // 放入到输出chan中,异步方式
}()
} else {
out[i] <- v // 放入到输出chan中,同步方式
}
}
}
}()
}
golang channel实现fan-in、fan-out、or-done
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 问题 开发过程中,有批量处理的逻辑,依赖下游一同的接口(这个接口耗时感人,400 ~ 600ms),如果串行执行批...
- 1、利用channel实现线程互斥 2、利用channel实现线程同步 3、利用channel实现信号量
- 路径为:./src/runtime/chan.go 文件中,先看channel结构体: 以及waitq的结构体: ...
- golang中使用channel实现互斥锁 通过将带有一个缓冲区的channel作为一个桶,桶中的数据作为锁,每次...
- 用一个缓存空间的channel实现锁比较简单,如果是无缓存就会稍微麻烦点直接上代码了: 以下是sample: 欢迎...