Go带缓冲的延时发送功能

为了提升发送时的吞吐量,本文采用类似Nagle算法的发送方式:

  1. 数据先存入发送缓冲区
  2. 若缓冲区已满,则立即发送
  3. 若缓冲区未满,则等待指定时间发送
package main

import (
    "fmt"
    "sync"
    "time"
)

type Message struct {
    ch      chan int
    buf     []int
    size    int
    timeout <-chan time.Time
    once    sync.Once
}

func newMessage(size int) *Message {
    return &Message{
        ch:      make(chan int, size),
        buf:     make([]int, 0, size),
        size:    size,
        timeout: time.After(1 * time.Second),
    }
}

/**
 * 此方法仅执行一次,所以不需要加锁
 */
func (m *Message) Send() {
    m.once.Do(func() {
        fmt.Println("once")
        for {
            select {
            case d := <-m.ch:
                m.buf = append(m.buf, d)
                if len(m.buf) >= m.size {
                    fmt.Println("full buffer, send data...", m.buf)
                    m.buf = m.buf[:0]
                }
            case <-m.timeout:
                if len(m.buf) > 0 {
                    fmt.Println("timeout and send data...", m.buf)
                    m.buf = m.buf[:0]
                }

                m.timeout = time.After(1 * time.Second)

            }
        }
    })

}

func (m *Message) Receive(val int) {
    fmt.Println("receive data:", val)
    m.ch <- val
    time.Sleep(time.Second)

}

func main() {
    msg := newMessage(100)
    go msg.Send()
    for i := 0; i < 100; i++ {
        go msg.Receive(i)
    }

    time.Sleep(5 * time.Minute)
}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容