前面讲了p2p网络的实现,现在开始涉及节点间应用数据(区块,交易等)的交互
1.p2p模块定义的用户协议接口
为了将p2p模块从具体的数据处理细节中解耦出来,实现p2p模块的独立性,p2p模块并没有对节点间的数据进行处理,而是定义了一个结构体,调用该结构体的Run方法来处理数据
接口的定义:
// p2p/protocol.go
// Protocol represents a P2P subprotocol implementation.
type Protocol struct {
// Name should contain the official protocol name,
// often a three-letter word.
Name string
// Version should contain the version number of the protocol.
Version uint
// Length should contain the number of message codes used
// by the protocol.
Length uint64
// Run is called in a new groutine when the protocol has been
// negotiated with a peer. It should read and write messages from
// rw. The Payload for each message must be fully consumed.
//
// The peer connection is closed when Start returns. It should return
// any protocol-level error (such as an I/O error) that is
// encountered.
Run func(peer *Peer, rw MsgReadWriter) error
// NodeInfo is an optional helper method to retrieve protocol specific metadata
// about the host node.
NodeInfo func() interface{}
// PeerInfo is an optional helper method to retrieve protocol specific metadata
// about a certain peer in the network. If an info retrieval function is set,
// but returns nil, it is assumed that the protocol handshake is still running.
PeerInfo func(id discover.NodeID) interface{}
}
- 使用p2p模块时,调用者需要注册一个已经实现Run方法的结构体到p2p模块
- 当节点间有数据过来时,p2p模块会调用Run方法进行数据处理
2.eth的注册流程
注册的调用过程是:
//cmd/geth/main.go
init()
=>
func geth(ctx *cli.Context) error ()
=>
//cmd/geth/config.go
func makeFullNode(ctx *cli.Context) *node.Node
=>
//cmd/utils/flags.go
func RegisterEthService(stack *node.Node, cfg *eth.Config)
=>
//cmd/node/node.go
func (n *Node) Register(constructor ServiceConstructor) error
具体的关键注册代码
//cmd/utils/flags.go
// RegisterEthService adds an Ethereum client to the stack.
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
//创建eth
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}
//eth/handler.go
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
3.数据处理的调用过程
被注册到p2p.Server后
在p2p.Server.Start()函数中有
...
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
...
go srv.run(dialer)
...
在p2p.Server.run(dialstate dialer)中
...
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
err := srv.protoHandshakeChecks(peers, inboundCount, c)
//fmt.Println("pre runPeer", err)
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
// If message events are enabled, pass the peerFeed
// to the peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p)
peers[c.id] = p
if p.Inbound() {
inboundCount++
}
}
...
再从p2p.Server.runPeer
到
peer.run
到
peer.startProtocols,调用注册的run方法
如下:
//p2p/peer.go
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
//fmt.Println("startProtocols:", p.running)
//debug.PrintStack()
p.wg.Add(len(p.running))
for _, proto := range p.running {
//fmt.Println("proto:", proto)
proto := proto
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
var rw MsgReadWriter = proto
if p.events != nil {
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func(){
##############
//这里调用注册的Run方法
##############
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
p.wg.Done()
}()
}
}
4.tcp连接的建立
这里有两类tcp连接
- 主动连接,在p2p.Server.run中实现
相关代码片段是
// starts until max number of active tasks is satisfied
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
=>
//p2p/dial.go
func (t *dialTask) Do(srv *Server) {
//fmt.Println("diatask do")
//debug.PrintStack()
if t.dest.Incomplete() {
if !t.resolve(srv) {
return
}
}
//fmt.Println(t.dest)
err := t.dial(srv, t.dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
// Try resolving the ID of static nodes if dialing failed.
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
//fmt.Println(t.dest)
t.dial(srv, t.dest)
}
}
}
}
=>
// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
//fmt.Println("dialtask dia")
fd, err := srv.Dialer.Dial(dest)
if err != nil {
//fmt.Println("dialtask error")
//fmt.Println(err)
return &dialError{err}
}
//fmt.Println("dialtask dia success")
//fmt.Println(dest)
mfd := newMeteredConn(fd, false)
return srv.SetupConn(mfd, t.flags, dest)
}
- 另外一个监听tcp端口,接收对方连接
func (srv *Server) startListening() error {
// Launch the TCP listener.
listener, err := net.Listen("tcp", srv.ListenAddr)
if err != nil {
return err
}
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
srv.listener = listener
srv.loopWG.Add(1)
go srv.listenLoop()
// Map the TCP listening port if NAT is configured.
if !laddr.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
return nil
}
=>
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop()