Eino中Pipe实现

Pipe 的reader 和 writer 封装的为同一个Steam, Steam基于channel 进行数据的recv和send, 并通过一个独立的channel 来控制closed状态, 代码实现如下:

package schema

import "io"

func Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T]) {
    stm := newStream[T](cap)
    return stm.asReader(), &StreamWriter[T]{stm: stm}
}

type StreamReader[T any] struct {
    st *stream[T]
}

type StreamWriter[T any] struct {
    stm *stream[T]
}

// stream is a channel-based stream with 1 sender and 1 receiver.
// The sender calls closeSend() to notify the receiver that the stream sender has finished.
// The receiver calls closeRecv() to notify the sender that the receiver stop receiving.
type stream[T any] struct {
    items chan streamItem[T]

    closed chan struct{}
}

func (s *stream[T]) asReader() *StreamReader[T] {
    return &StreamReader[T]{st: s}
}

func (s *stream[T]) recv() (chunk T, err error) {
    item, ok := <-s.items

    if !ok {
        item.err = io.EOF
    }

    return item.chunk, item.err
}

func (s *stream[T]) send(chunk T, err error) (closed bool) {
    // if the stream is closed, return immediately
    select {
    case <-s.closed:
        return true
    default:
    }

    item := streamItem[T]{chunk, err}

    select {
    case <-s.closed:
        return true
    case s.items <- item:
        return false
    }
}

func (s *stream[T]) closeSend() {
    close(s.items)
}

func (s *stream[T]) closeRecv() {
    close(s.closed)
}

type streamItem[T any] struct {
    chunk T
    err   error
}

func newStream[T any](cap int) *stream[T] {
    return &stream[T]{
        items:  make(chan streamItem[T], cap),
        closed: make(chan struct{}),
    }
}

注意stream 的Send方法实现, 首先判断stream是否closed, 不同于使用if判断, 这里直接使用select来读取closed通道中数据, 如果有数据方法返回closed状态, 否则default分支什么都不执行, 程序继续执行下面send操作,

select {
    case <-s.closed:
        return true
    default:
    }

send发送操作同样在select 中进行, 第一个case分支读取closed通道数据在非closed状态下会阻塞,select 会选择第二个case分支的数据发送操作。

select {
    case <-s.closed:
        return true
    case s.items <- item:
        return false
    }

测试用例:

func TestPipe(t *testing.T) {
    sr, sw := Pipe[int](10)
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
            closed := sr.st.send(i, nil)
            if closed {
                break
            }
        }
        sr.st.closeSend()
    }()

    i := 0
    for {
        i++
        if i > 5 {
            sw.stm.closeRecv()
            break
        }
        v, err := sw.stm.recv()
        if err != nil {
            assert.ErrorIs(t, err, io.EOF)
            break
        }
        t.Log(v)
    }

    wg.Wait()

}
image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容