作用
从tranport.go的注释和之前的博文中可以看出transport的功能是提供异步通信。
// Package is an interface for synchronous communication
package transport
接口
type Socket interface {
Recv(*Message) error
Send(*Message) error
Close() error
}
type Client interface {
Socket
}
type Listener interface {
Addr() string
Close() error
Accept(func(Socket)) error
}
// Transport is an interface which is used for communication between
// services. It uses socket send/recv semantics and had various
// implementations {HTTP, RabbitMQ, NATS, ...}
type Transport interface {
Dial(addr string, opts ...DialOption) (Client, error)
Listen(addr string, opts ...ListenOption) (Listener, error)
String() string
}
从之上的代码可以看出:
-
当Transport作为客户端时:
提供Dial主动连接功能,连接成功返回代表一个连接的Client,Client内嵌Socket,提供Send/Recv的网络收发功能。
-
当Transport作为服务端时:
提供Listen监听功能,当Listener.Accept成功返回时,返回一个代表连接的Socket,提供Send/Recv的网络收发功能。
Send和Recv的数据是Message结构
type Message struct {
Header map[string]string
Body []byte
}
由上可以看出Client是客户端语义,Socket是服务端语义。
transport在Go-micro内部的实现是httpTransport
Dial
从代码中可以看出本身Dial的逻辑很简单调用
conn, err = tls.DialWithDialer
或
conn, err = net.DialTimeout
最终返回httpTransportClient对象
type httpTransportClient struct {
ht *httpTransport
addr string
conn net.Conn
dialOpts DialOptions
once sync.Once
sync.Mutex
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
}
httpTransportClient 实现Client 接口提供Send、Recv网络首发功能。
通过httpTransportClient 的Send代码可以看出,通过tpc的链接发送http请求。
func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)
for k, v := range m.Header {
header.Set(k, v)
}
reqB := bytes.NewBuffer(m.Body)
defer reqB.Reset()
buf := &buffer{
reqB,
}
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.addr,
},
Header: header,
Body: buf,
ContentLength: int64(reqB.Len()),
Host: h.addr,
}
///>>>>>>
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
///<<<<<<<<
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) ///设置超时
}
return req.Write(h.conn)
}
其他的代码都好理解,唯独///>>>>>> ///<<<<<<<<中间的部分,为什么一直往h.bl中append,仅仅h.r可写的情况下remove.
猜想这里是不是应该这么写
if !h.dialOpts.Stream {
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
}
func (h *httpTransportClient) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
var r *http.Request
if !h.dialOpts.Stream {
rc, ok := <-h.r
if !ok {
return io.EOF
}
r = rc
}
h.Lock()
defer h.Unlock()
if h.buff == nil {
return io.EOF
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
rsp, err := http.ReadResponse(h.buff, r)
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
if rsp.StatusCode != 200 {
return errors.New(rsp.Status + ": " + string(b))
}
m.Body = b
if m.Header == nil {
m.Header = make(map[string]string)
}
for k, v := range rsp.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
return nil
}
从if !h.dialOpts.Stream //这里也可以看出Stream的Send和对应的Recv直接不能插入其他的其他的Stream.Send。
Listener
根据传进来的addr,然后net/tls.Listen创建的Listener对象。
Accept
上面返回的Listener对象上进行Accept操作,如果有新的连接成功的话,创建Socket对应,用Socket调用注入的接口 fn.
func (h *httpTransportListener) Accept(fn func(Socket)) error