为了提升发送时的吞吐量,本文采用类似Nagle算法的发送方式:
- 数据先存入发送缓冲区
- 若缓冲区已满,则立即发送
- 若缓冲区未满,则等待指定时间发送
package main
import (
"fmt"
"sync"
"time"
)
type Message struct {
ch chan int
buf []int
size int
timeout <-chan time.Time
once sync.Once
}
func newMessage(size int) *Message {
return &Message{
ch: make(chan int, size),
buf: make([]int, 0, size),
size: size,
timeout: time.After(1 * time.Second),
}
}
/**
* 此方法仅执行一次,所以不需要加锁
*/
func (m *Message) Send() {
m.once.Do(func() {
fmt.Println("once")
for {
select {
case d := <-m.ch:
m.buf = append(m.buf, d)
if len(m.buf) >= m.size {
fmt.Println("full buffer, send data...", m.buf)
m.buf = m.buf[:0]
}
case <-m.timeout:
if len(m.buf) > 0 {
fmt.Println("timeout and send data...", m.buf)
m.buf = m.buf[:0]
}
m.timeout = time.After(1 * time.Second)
}
}
})
}
func (m *Message) Receive(val int) {
fmt.Println("receive data:", val)
m.ch <- val
time.Sleep(time.Second)
}
func main() {
msg := newMessage(100)
go msg.Send()
for i := 0; i < 100; i++ {
go msg.Receive(i)
}
time.Sleep(5 * time.Minute)
}