Pipe 的reader 和 writer 封装的为同一个Steam, Steam基于channel 进行数据的recv和send, 并通过一个独立的channel 来控制closed状态, 代码实现如下:
package schema
import "io"
func Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T]) {
stm := newStream[T](cap)
return stm.asReader(), &StreamWriter[T]{stm: stm}
}
type StreamReader[T any] struct {
st *stream[T]
}
type StreamWriter[T any] struct {
stm *stream[T]
}
// stream is a channel-based stream with 1 sender and 1 receiver.
// The sender calls closeSend() to notify the receiver that the stream sender has finished.
// The receiver calls closeRecv() to notify the sender that the receiver stop receiving.
type stream[T any] struct {
items chan streamItem[T]
closed chan struct{}
}
func (s *stream[T]) asReader() *StreamReader[T] {
return &StreamReader[T]{st: s}
}
func (s *stream[T]) recv() (chunk T, err error) {
item, ok := <-s.items
if !ok {
item.err = io.EOF
}
return item.chunk, item.err
}
func (s *stream[T]) send(chunk T, err error) (closed bool) {
// if the stream is closed, return immediately
select {
case <-s.closed:
return true
default:
}
item := streamItem[T]{chunk, err}
select {
case <-s.closed:
return true
case s.items <- item:
return false
}
}
func (s *stream[T]) closeSend() {
close(s.items)
}
func (s *stream[T]) closeRecv() {
close(s.closed)
}
type streamItem[T any] struct {
chunk T
err error
}
func newStream[T any](cap int) *stream[T] {
return &stream[T]{
items: make(chan streamItem[T], cap),
closed: make(chan struct{}),
}
}
注意stream 的Send方法实现, 首先判断stream是否closed, 不同于使用if判断, 这里直接使用select来读取closed通道中数据, 如果有数据方法返回closed状态, 否则default分支什么都不执行, 程序继续执行下面send操作,
select {
case <-s.closed:
return true
default:
}
send发送操作同样在select 中进行, 第一个case分支读取closed通道数据在非closed状态下会阻塞,select 会选择第二个case分支的数据发送操作。
select {
case <-s.closed:
return true
case s.items <- item:
return false
}
测试用例:
func TestPipe(t *testing.T) {
sr, sw := Pipe[int](10)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
closed := sr.st.send(i, nil)
if closed {
break
}
}
sr.st.closeSend()
}()
i := 0
for {
i++
if i > 5 {
sw.stm.closeRecv()
break
}
v, err := sw.stm.recv()
if err != nil {
assert.ErrorIs(t, err, io.EOF)
break
}
t.Log(v)
}
wg.Wait()
}
image.png