采用go routine的方式
func MergeChannelsWithGoroutine(chns ...<-chan int) <-chan int {
outChn := make(chan int)
go func() {
var wg sync.WaitGroup
wg.Add(len(chns))
for _, chn := range chns {
// chn 需要以参数的方式传递到goroutine
go func(chn <-chan int) {
for v := range chn {
outChn <- v
}
wg.Done()
}(chn)
}
wg.Wait()
close(outChn)
}()
return outChn
}
采用反射的方式
func MergeChannelsWithReflect(chns ...<-chan int) <-chan int {
outChn := make(chan int)
go func() {
defer close(outChn)
var cases []reflect.SelectCase
for _, chn := range chns {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chn),
})
}
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok {
cases = append(cases[:i], cases[i+1:]...)
continue
}
outChn <- v.Interface().(int)
}
}()
return outChn
}
递归合并方式
func mergeRec(chans ...<-chan int) <-chan int {
switch len(chans) {
case 0:
c := make(chan int)
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo(
mergeRec(chans[:m]...),
mergeRec(chans[m:]...))
}
}
func mergeTwo(a, b <-chan int) <-chan int {
c := make(chan int)
go func() {
defer close(c)
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok {
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok {
b = nil
continue
}
c <- v
}
}
}()
return c
}