btcd的p2p网络(2)-连接ConnMgr

p2p网络从底层到上层可以分为3层,地址 连接 节点,每一层都有自己的功能
声明:文章代码和源码有不一致地方
这篇文章简单记录下连接conn

三个主要的结构体

1、连接管理

// ConnManager providers a manager to handle network connections.
type ConnManager struct {
    // the following variables must only be used atomically
    // 记录自己主动连接其他节点的连接数量
    connReqCount uint64
    // 标识connmgr已经启动
    start int32
    // 标识connmgr已经结束
    stop int32

    // 设定相关的配置
    cfg Config
    // 用于同步connmgr的退出状态,调用方可以阻塞等待connmgr的工作协程退出
    wg sync.WaitGroup
    // 某个连接失败后,connmgr尝试选择新的peer地址连接的总次数
    failedAttempts uint64
    // 用于与connmgr工作协程通信的管道
    requests chan interface{}
    // 用于通知工作协程退出
    quit chan struct{}
}

2、Config,配置参数
其实就是connmgr配置,本身就是connmgr的一个字段。

// Config holds the configuration options related to the connection manager.
type Config struct {
    // Listeners define a slice of listeners for which the connection manager
    // will take ownership of(取得所有权) and accept connections. when a connection
    // is accepted,the OnAccept handler will be invoked with the connection. since
    // the connection manager takes ownership of these listeners,they will be closed
    // when the connection manager is stoped.

    // this field will not have any effect if the onAccept field is not also specified.
    // It may be nil if the caller does not wish to listen for
    // incoming connection

    Listeners []net.Listener //节点上所有等待外部连接的监听点;
    // OnAccept is a callback that is fired when an inbound connection is
    // accepted.  It is the caller's responsibility(责任、义务) to close the connection.
    // Failure to close the connection will result in the connection manager
    // believing the connection is still active and thus have undesirable
    // side effects such as still counting toward maximum connection limits.
    //
    // This field will not have any effect if the Listeners field is not
    // also specified since there couldn't possibly be any accepted
    // connections in that case.
    OnAccept func(net.Conn) // 节点应答并接受外部连接后的回调函数
    // TargetOutbound is the number of outbound network connections to
    // maintain. Defaults to 8.
    TargetOutbound uint32 // 节点主动向外连接peer的最大个数
    // RetryDuration is the duration to wait before retrying connection
    // requests. Defaults to 5s.
    RetryDuration time.Duration // 连接失败后发起重连的等待时间
    // OnConnection is a callback that is fired when a new outbound
    // connection is established.
    OnConnection func(*ConnReq, net.Conn) // 连接建立成功后的回调函数
    // OnDisconnection is a callback that is fired when an outbound
    // connection is disconnected.
    OnDisconnection func(*ConnReq) // 连接关闭后的回调函数
    // GetNewAddress is a way to get an address to make a network connection
    // to.  If nil, no new connections will be made automatically.
    // 连接失败后,ConnMgr可能会选择新的peer地址进行连接
    // GetNewAddress函数提供了获取新的peer地址的方法,它最终会调用addrManager中
    // 的GetAddress()来分配新地址。
    GetNewAddress func() (net.Addr, error)
    // Dial connects to the address on the named network.It cannot be nil.
    // 定义建立TCP连接的方式,是直接连接还是通过代理连接。
    Dial func(net.Addr) (net.Conn, error)
}

3、ConnReq 描述了一个连接

// ConnReq is the connection request to a network address. If permanent, the
// connection will be retried on disconnection.
// ConnReq 描述了一个连接
type ConnReq struct {
    // The following variables must only be used atomically.
    // 连接的序号,用于索引
    id uint64
    // 连接的目的地址
    Addr      net.Addr
    // 标识是否与Peer保持永久连接,如果为true,
    // 则连接失败后,继续尝试与该Peer连接,而不是选择新的Peer地址重新连接
    Permanent bool
    // 连接成功后,真实的net.Conn对象;
    conn       net.Conn
    // 连接的状态,有ConnPending、ConnEstablished、ConnDisconnected及ConnFailed等;
    state      ConnState
    // stateMtx: 保护state状态的读写锁;
    stateMtx   sync.RWMutex
    //retryCount: 如果Permanent为true,retryCount记录该连接重复重连的次数;
    retryCount uint32
}

结合起来说,就是连接管理器connmgr按照自身的配置config,管理着一些连接connReq

启动ConnMgr

我们先看start()函数

// Start: launches(发起、发动)the connection manager and begins conecting to the network.
func (cm *ConnManager) Start() {
    // already started ?
    if atomic.AddInt32(&cm.start, 1) != 1 {
        return
    }
    log.Trace("Connection manager started")
    cm.wg.Add(1)
    // 启动工作协程
    go cm.connHandler()
    // Start all the listeners so long as the caller requested
    // them and provided a callback to be invoked when connections are accepted.
    if cm.cfg.OnAccept != nil {
        for _, listenr := range cm.cfg.Listeners {
            cm.wg.Add(1)
            // 启动监听协程listenHandler,等待其他节点连接;
            go cm.listenHandler(listenr)
        }
    }
    // 启动建立连接的协程,选择Peer地址并主动连接;
    for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
        go cm.NewConnReq()
    }
}

主要是启动工作协程cm.connHandler(),
然后一方面监听其他节点的连接,go cm.listenHandler(listenr)这里面做的事情就是我们普通的tcp地址监听。
一方面主动去连接其他的节点。
主动连接其他节点cm.NewConnReq()

// NewConnReq creates a new connection request and connects to the
// corresponding(对应的) address.
// 创建一个连接请求,然后连接对应的地址
func (cm *ConnManager) NewConnReq() {
    if atomic.LoadInt32(&cm.stop) != 0 {
        return
    }
    if cm.cfg.GetNewAddress == nil {
        return
    }
    c := &ConnReq{}
    atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
    // Submit a request of a pending connection attempt to the connection
    // manager. By registering the id before the connection is even established,
    // we'll be able to later cancel the connection via the Remove method.
    done := make(chan struct{})
    select {
    case cm.requests <- registerPending{c, done}:
    case <-cm.quit:
        return
    }

    // wait for the registration to successfully add the pending conn req
    // to the conn manager's internal state.
    select {
    case <-done:
    case <-cm.quit:
        return
    }
    addr,err := cm.cfg.GetNewAddress()
    if err != nil {
        select {
        case cm.requests <- handleFailed{c, err}:
        case <-cm.quit:
        }
        return
    }
    c.Addr = addr
    cm.Connect(c)
}

首先构造一个ConnReq,c := &ConnReq{},然后生成registerPending{c, done},
registerPending写入到connmgr的通道case cm.requests <- registerPending{c, done}

这里的registerPending结构体中还有一个通道done,cm.requests这个通道肯定有人会从里面读数据,处理完后会通过通道done返回信息。下面的case <-done:就是在等待返回的信息。
谁在通道的另外一头读呢?go cm.connHandler(),下面这个图就是他们工作概况

然后cm.cfg.GetNewAddress()得到一个连接的地址(这里用到了addrMgr),然后连接连接cm.Connect(c)

// Connection assigns an id and dials a connection to the address of the connection request
func (cm *ConnManager) Connect(c *ConnReq){
    if atomic.LoadInt32(&cm.stop) != 0{
        return
    }
    // TODO 再次检查一遍,相当于重复了NewConnReq()的工作
    log.Debugf("Attempting to connect to %v", c)
    conn,err := cm.cfg.Dial(c.Addr)
    if err != nil {
        select {
        case cm.requests <- handleFailed{c, err}:
        case <-cm.quit:
        }
        return
    }

    select {
    case cm.requests <- handleConnected{c, conn}:
    case <-cm.quit:
    }
}

连接主要就是这句代码conn,err := cm.cfg.Dial(c.Addr),这个Dial就是在普通的tcp连接外包了一层,让我们有个选择,比如可以通过代理进行连接。

不论是连接成功还是失败,handleConnected{c, conn}:handleFailed{c, err}:这两个结构体都被构建,并且发送到cm.requests

有连接就有断开

func (cm *ConnManager) Disconnect(id uint64) {
    if atomic.LoadInt32(&cm.stop) != 0 {
        return
    }

    select {
    case cm.requests <- handleDisconnected{id, true}:
    case <-cm.quit:
    }
}

connect也差不多,都是向cm.requests发了一个请求。

看来,连接或者断开连接的主要处理逻辑在connHandler中,我们来看看它的实现:

// connHandler handles all connection related requests.  It must be run as a
// goroutine.
//
// The connection handler makes sure that we maintain a pool of active outbound
// connections so that we remain connected to the network.  Connection requests
// are processed and mapped by their assigned ids.
func (cm *ConnManager) connHandler() {
    // pending holds all registered conn requests that hava yet to succeed.
    var pending = make(map[uint64]*ConnReq)
    // conns represents the set of all actively connected peers.
    var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map时,size可以省略,当你知道大小时,最好加上

out:
    for {
        select {
        case req := <- cm.requests:
            switch msg:=req.(type) {
            case registerPending:
                // TODO
            case handleConnected:
                connReq := msg.c

                if _, ok := pending[connReq.id]; !ok {
                    if msg.conn != nil {
                        msg.conn.Close()
                    }
                    log.Debugf("Ignoring connection for "+
                        "canceled connreq=%v", connReq)
                    continue
                }

                connReq.updateState(ConnEstablished)
                connReq.conn = msg.conn
                conns[connReq.id] = connReq
                log.Debugf("Connected to %v", connReq)
                connReq.retryCount = 0
                cm.failedAttempts = 0

                delete(pending, connReq.id)

                if cm.cfg.OnConnection != nil {
                    go cm.cfg.OnConnection(connReq, msg.conn)
                }
            case handleDisconnected:
                // TODO
            case handleFailed:
                // TODO
            }

        case <-cm.quit:
            break out
        }
    }
    cm.wg.Done()
    log.Trace("Connection handler done")
}

在这里不停的处理cm.requests通道中的信息。我们看下连接成功的处理
起初创建了两个变量

// pending holds all registered conn requests that hava yet to succeed.
    var pending = make(map[uint64]*ConnReq)
    // conns represents the set of all actively connected peers.
    var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map时,size可以省略,当你知道大小时,最好加上

连接成功后

  1. map``pending中找有没有这个连接请求,如果没有则表明这不要我们要的连接。断开
  2. 更新connReq的状态,然后添加到map conns
  3. 调用go cm.cfg.OnConnection(connReq, msg.conn)

两个peer之间的连接conn,还需要考虑其他的很多方面。但是还好,到现在我们至少可以简单的创建一个连接了。
至于cm.cfg.OnConnection()要干什么,我们后面再分析了。


参考
https://www.jianshu.com/p/d6484e5710ad

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容