go-libp2p-host Connect 源码分析

Connect 过程解析

  • go-libp2p-host 中定义了 Host 接口,它有几个实现都在 go-libp2p 包中,我们关注 basic 包中的 BasicHost 实现,因为 IPFS 用了这个实现

  • Connect 主要是 dial peer 并完成握手,再去交换 Identify 信息,Identify 信息后文有提到,文中反复提到的 ID 是指 Identify 协议的名称

  • 发起连接一端我们称为 from ,被连接一端成为 to ,则有如下过程在两端建立连接,这个时序图只为理解交互过程,对于阅读代码并无实际参考价值;


    from.connect(to)

1、2 是指 from 和 to 要先启动 libp2p 的 host 再做后续操作,在这一步已经执行了 Swarm.Listen 并启动了 handlencoming 线程来 accept 连接,并且为 ID 协议注册了 StreamHandler ,为通道上的连接注册了 ConnHandler;
3 from 端通过 Connect 来 dial to 端;
4、 to 端的 newConnHandler 被触发,这个方法调用了 IdentifyConn ;
5、于此同时 from 端发起连接请求成功后会去调用 IdentifyConn ;
6~14 双方行为一致,握手成功后激活 ID 协议的 StreamHandler 触发 requestHandler 来向对方发送 Identify 消息,两端用各自的 responseHandler 来处理 Identify 消息,并将 Identify 信息放入 peerstore 中。

看代码,简单看一下 Connect 过程

首先看看接口怎么定义的,因为所有的逻辑都要建立在 Connect 的基础上,所以以 Connect 为入口来欣赏 BasicHost 的实现过程

// Host is an object participating in a p2p network, which
// implements protocols or provides services. It handles
// requests like a Server, and issues requests like a Client.
// It is called Host because it is both Server and Client (and Peer
// may be confusing).
type Host interface {
    ......
    // Connect ensures there is a connection between this host and the peer with
    // given peer.ID. Connect will absorb the addresses in pi into its internal
    // peerstore. If there is not an active connection, Connect will issue a
    // h.Network.Dial, and block until a connection is open, or an error is
    // returned. // TODO: Relay + NAT.
    Connect(ctx context.Context, pi pstore.PeerInfo) error
    ......
}

Host 是什么以及 Connect 要做的事情通过注视都能看出来,只是看到 TODO 时感到有些遗憾,这个 Relay 足足耽误我几天时间,看来读代码应该先读接口.

From 端 Connect 的实现过程

注视跟接口描述的差不多,如果没有可用连接就会去尝试 dial 这个 peer 并且把它加入到 peerstore 中

//https://github.com/libp2p/go-libp2p/blob/master/p2p/host/basic/basic_host.go
// Connect ensures there is a connection between this host and the peer with
// given peer.ID. If there is not an active connection, Connect will issue a
// h.Network.Dial, and block until a connection is open, or an error is returned.
// Connect will absorb the addresses in pi into its internal peerstore.
// It will also resolve any /dns4, /dns6, and /dnsaddr addresses.
func (h *BasicHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
    // absorb addresses into peerstore
    h.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.TempAddrTTL)

    if h.Network().Connectedness(pi.ID) == inet.Connected {
        return nil
    }

    resolved, err := h.resolveAddrs(ctx, h.Peerstore().PeerInfo(pi.ID))
    if err != nil {
        return err
    }
    h.Peerstore().AddAddrs(pi.ID, resolved, pstore.TempAddrTTL)

    return h.dialPeer(ctx, pi.ID)
}

dialPeer 虽然很复杂但最终是调用到 IdentifyConn 方法上,我们直接看重点

//===========================================
// go-libp2p/p2p/protocol/identify/id.go
//===========================================

func (ids *IDService) IdentifyConn(c inet.Conn) {
    ids.currmu.Lock()
    if wait, found := ids.currid[c]; found {
        ids.currmu.Unlock()
        log.Debugf("IdentifyConn called twice on: %s", c)
        <-wait // already identifying it. wait for it.
        return
    }
    ch := make(chan struct{})
    ids.currid[c] = ch
    ids.currmu.Unlock()

    defer func() {
        close(ch)
        ids.currmu.Lock()
        delete(ids.currid, c)
        ids.currmu.Unlock()
    }()

    s, err := c.NewStream()
    if err != nil {
        log.Debugf("error opening initial stream for %s: %s", ID, err)
        log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer())
        c.Close()
        return
    }
    
    // 指定了当前这个 Stream 是一个 ID 协议的流,握手成功后就会收到 ID 消息
    // 指定了当前这个 Stream 是一个 ID 协议的流,握手成功后就会收到 ID 消息
    // 指定了当前这个 Stream 是一个 ID 协议的流,握手成功后就会收到 ID 消息
    s.SetProtocol(ID)
    
    // 在此处完成握手
    // 在此处完成握手
    // 在此处完成握手
    // ok give the response to our handler.
    if err := msmux.SelectProtoOrFail(ID, s); err != nil {
        log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err})
        s.Reset()
        return
    }
    // 在此处接收 ID 消息
    ids.responseHandler(s)
}



//===========================================
// go-multistream/client.go
//===========================================

// proto = "/ipfs/id/1.0.0"
// rwc = stream
func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) error {
    err := handshake(rwc)
    if err != nil {
        return err
    }

    return trySelect(proto, rwc)
}

// 握手
// ProtocolID = "/multistream/1.0.0" 
func handshake(rwc io.ReadWriteCloser) error {
    errCh := make(chan error, 1)
    go func() {
        errCh <- delimWriteBuffered(rwc, []byte(ProtocolID))
    }()

    tok, readErr := ReadNextToken(rwc)
    writeErr := <-errCh

    if writeErr != nil {
        return writeErr
    }
    if readErr != nil {
        return readErr
    }

    if tok != ProtocolID {
        return errors.New("received mismatch in protocol id")
    }
    return nil
}

// 握手成功后,去激活对端的 proto 对应的 stream 的 handler 
// 在这里是为了激活 ID 协议对应的 requestHandler 
// proto = "/ipfs/id/1.0.0"
func trySelect(proto string, rwc io.ReadWriteCloser) error {
    err := delimWriteBuffered(rwc, []byte(proto))
    if err != nil {
        return err
    }

    tok, err := ReadNextToken(rwc)
    if err != nil {
        return err
    }

    switch tok {
    case proto:
        return nil
    case "na":
        return ErrNotSupported
    default:
        return errors.New("unrecognized response: " + tok)
    }
}
To 端 Listen 的实现过程
  • BasicHost.NewHost
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, error) {
    ......
    net.SetConnHandler(h.newConnHandler)    
    net.SetStreamHandler(h.newStreamHandler)
    return h, nil
}

func (h *BasicHost) newConnHandler(c inet.Conn) {
    // Clear protocols on connecting to new peer to avoid issues caused
    // by misremembering protocols between reconnects
    h.Peerstore().SetProtocols(c.RemotePeer())
    // 当有一个新的连接到来时,就会去执行 IdentifyConn ,跟发起端行为一致
    h.ids.IdentifyConn(c)
}
  • Swarm.Listen
    创建 BasicHost 时的 net 参数是 Network 接口的 Swarm 实现,那么启动过程中会调用 Listen 方法,下面代码贴出了关键部分,AddListenAddr 方法的 list.Accept() 对上文提到的握手信息进行了处理,然后要 upgrade 这个连接,再去触发 BasicHost 中指定的 net.SetConnHandler(h.newConnHandler)
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
    ......
    for i, a := range addrs {
        if err := s.AddListenAddr(a); err != nil {
            errs[i] = err
        } else {
            succeeded++
        }
    }
    ......
}

func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
    tpt := s.TransportForListening(a)
    if tpt == nil {
        return ErrNoTransport
    }
    // 在这个方法中调用了一个非常重要的方法,handleIncoming 
    list, err := tpt.Listen(a)
    ......
    go func() {
        ......
        for {
            c, err := list.Accept()
            ......
        }
    }()
    return nil
}

Accept 是 Listener 接口的方法
接口定义在 go-libp2p-transport/transport.go 中
实现定义在 go-libp2p-transport-upgrader/listener.go 中,我们看看如何实现

// Accept accepts a connection.
func (l *listener) Accept() (transport.Conn, error) {
    for c := range l.incoming {
        // Could have been sitting there for a while.
        if !c.IsClosed() {
            return c, nil
        }
    }
    return nil, l.err
}

这个实现太简单了,只是在读 incoming channel ,所以线索是谁在往 incoming chainnel 中写数据,于是找到了 handleIncoming() 方法,以 TcpTransport 实现为例,可以看到是在 UpgradeListener 时启动的 handleIncoming

// go-tcp-transport/tcp.go
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
    list, err := t.maListen(laddr)
    if err != nil {
        return nil, err
    }
    return t.Upgrader.UpgradeListener(t, list), nil
}

//go-libp2p-transport-upgrader/upgrader.go
func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.Listener {
    ctx, cancel := context.WithCancel(context.Background())
    l := &listener{
        Listener:  list,
        upgrader:  u,
        transport: t,
        threshold: newThreshold(AcceptQueueLength),
        incoming:  make(chan transport.Conn),
        cancel:    cancel,
        ctx:       ctx,
    }
    // 就是这里
    // 就是这里
    // 就是这里
    go l.handleIncoming()
    return l
}

以上列出了一些关键点,应该可以导读代码了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,258评论 0 10
  • 291976-陈国艳《2017-02-18》 【第7天总结】 A、目标完成情况 1、背诵复习2A完成50% 2、听...
    国艳更文的365天阅读 257评论 0 0
  • 十年,就这么一晃而过。 从2006到2016,大学生活已经远去,相识的朋友已经走过十年风雨。 前段时间一张大一的老...
    cindy幸福在路上阅读 191评论 0 0
  • 人物动态的表现形式 人物动态速写,表现形式很多。主要的几种方法是:单线画法、单线复线结合的方法、线面结合的画法.....
    xiyue手绘阅读 1,212评论 0 9
  • 這夜-徐子泓 夜 讓我絕望累極 孤身飲泪 話雖如此 這夜 月色太明 我在群山之中 静静的呼嘯 湖面一片恬謐 風穿透...
    塔沙阅读 159评论 0 0