简单来说,Gossip的Discovery是用来维持成员节点的状态的,是死是活全由他来决定。下面我们具体分析下。
初始化
func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy) Discovery {
d := &gossipDiscoveryImpl{
self: self,
incTime: uint64(time.Now().UnixNano()),
seqNum: uint64(0),
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
id2Member: make(map[string]*NetworkMember),
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.DiscoveryLogger, self.InternalEndpoint),
disclosurePolicy: disPol,
pubsub: util.NewPubSub(),
aliveTimeInterval: getAliveTimeInterval(),
aliveExpirationTimeout: getAliveExpirationTimeout(),
aliveExpirationCheckInterval: getAliveExpirationCheckInterval(),
reconnectInterval: getReconnectInterval(),
}
d.validateSelfConfig()
d.msgStore = newAliveMsgStore(d)
go d.periodicalSendAlive()
go d.periodicalCheckAlive()
go d.handleMessages()
go d.periodicalReconnectToDead()
go d.handlePresumedDeadPeers()
return d
}
基本上搞清楚启动的时候会做什么,那么整个模块大体上你就知道怎么实现的了。
- periodicalSendAlive
- periodicalCheckAlive
- handleMessages
- periodicalReconnectToDead
- handlePresumedDeadPeers
periodicalSendAlive
func (d *gossipDiscoveryImpl) periodicalSendAlive() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
d.logger.Debug("Sleeping", d.aliveTimeInterval)
time.Sleep(d.aliveTimeInterval)
msg, err := d.createSignedAliveMessage(true)
if err != nil {
d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
return
}
d.lock.Lock()
d.selfAliveMessage = msg
d.lock.Unlock()
d.comm.Gossip(msg)
}
}
- 第一印象就是每隔一段时间会将自己的Alive状态跟好朋友分享。
periodicalCheckAlive
func (d *gossipDiscoveryImpl) periodicalCheckAlive() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
time.Sleep(d.aliveExpirationCheckInterval)
dead := d.getDeadMembers()
if len(dead) > 0 {
d.logger.Debugf("Got %v dead members: %v", len(dead), dead)
d.expireDeadMembers(dead)
}
}
}
- 看aliveLastTS列表中的最后的访问时间截止到当前是否已经超时,如果是,挑出来这些超时的
- 就是将aliveLastTS中过期的加到deadLastTS里面
- 将aliveMembership中过期的加到deadMembership里面
- 关闭这些过期节点的grpc通讯
periodicalReconnectToDead
func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
wg := &sync.WaitGroup{}
for _, member := range d.copyLastSeen(d.deadLastTS) {
wg.Add(1)
go func(member NetworkMember) {
defer wg.Done()
if d.comm.Ping(&member) {
d.logger.Debug(member, "is responding, sending membership request")
d.sendMembershipRequest(&member, true)
} else {
d.logger.Debug(member, "is still dead")
}
}(member)
}
wg.Wait()
d.logger.Debug("Sleeping", d.reconnectInterval)
time.Sleep(d.reconnectInterval)
}
}
- 每隔一段时间遍历deadLastTS列表,如果能Ping通,给他们发GossipMessage_MemReq请求。该请求就是去拉取成员列表。
handlePresumedDeadPeers
func (d *gossipDiscoveryImpl) handlePresumedDeadPeers() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
select {
case deadPeer := <-d.comm.PresumedDead():
if d.isAlive(deadPeer) {
d.expireDeadMembers([]common.PKIidType{deadPeer})
}
case s := <-d.toDieChan:
d.toDieChan <- s
return
}
}
}
- 这里主要是处理底层comm模块的send时,如果报错,会将该节点归类到PresumedDead列表,这里会进行扫尾工作。
- 如果这个节点当前是Alive的,那把Alive转为Dead
handleMessages
func (d *gossipDiscoveryImpl) handleMessages() {
defer d.logger.Debug("Stopped")
in := d.comm.Accept()
for !d.toDie() {
select {
case s := <-d.toDieChan:
d.toDieChan <- s
return
case m := <-in:
d.handleMsgFromComm(m)
}
}
}
- 当然了,这里是用来处理Discovery消息。我们分三种类型来分析下,主要是做什么。
MembershipRequest
if memReq := m.GetMemReq(); memReq != nil {
selfInfoGossipMsg, err := memReq.SelfInformation.ToGossipMessage()
if err != nil {
d.logger.Warningf("Failed deserializing GossipMessage from envelope: %+v", errors.WithStack(err))
return
}
if !d.crypt.ValidateAliveMsg(selfInfoGossipMsg) {
return
}
if d.msgStore.CheckValid(selfInfoGossipMsg) {
d.handleAliveMessage(selfInfoGossipMsg)
}
var internalEndpoint string
if m.Envelope.SecretEnvelope != nil {
internalEndpoint = m.Envelope.SecretEnvelope.InternalEndpoint()
}
// Sending a membership response to a peer may block this routine
// in case the sending is deliberately slow (i.e attack).
// will keep this async until I'll write a timeout detector in the comm layer
go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, internalEndpoint, m.Nonce)
return
}
- ValidateAliveMsg会根据对方发来的MembershipRequest中关于自身的AliveMsg做两件事,一是保存该节点identity到idMapper,二是用identity来验证消息签名。如果第一次交流,会直接保存下来,以后同节点发来的消息就用这个身份来验签,也是为了安全的考虑,避免中间人攻击。
- CheckValid会校验该AliveMsg跟本地相比是否比较新,如果是,那么开始handleAliveMessage,这个后面再讲。
- 最后再sendMemResponse
handleAliveMessage
func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.SignedGossipMessage) {
d.logger.Debug("Entering", m)
defer d.logger.Debug("Exiting")
if d.isSentByMe(m) {
return
}
pkiID := m.GetAliveMsg().Membership.PkiId
ts := m.GetAliveMsg().Timestamp
d.lock.RLock()
_, known := d.id2Member[string(pkiID)]
d.lock.RUnlock()
if !known {
d.learnNewMembers([]*proto.SignedGossipMessage{m}, []*proto.SignedGossipMessage{})
return
}
d.lock.RLock()
_, isAlive := d.aliveLastTS[string(pkiID)]
lastDeadTS, isDead := d.deadLastTS[string(pkiID)]
d.lock.RUnlock()
if !isAlive && !isDead {
d.logger.Panicf("Member %s is known but not found neither in alive nor in dead lastTS maps, isAlive=%v, isDead=%v", m.GetAliveMsg().Membership.Endpoint, isAlive, isDead)
return
}
if isAlive && isDead {
d.logger.Panicf("Member %s is both alive and dead at the same time", m.GetAliveMsg().Membership)
return
}
if isDead {
if before(lastDeadTS, ts) {
// resurrect peer
d.resurrectMember(m, *ts)
} else if !same(lastDeadTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership, "lastDeadTS:", lastDeadTS, "but got ts:", ts)
}
return
}
d.lock.RLock()
lastAliveTS, isAlive := d.aliveLastTS[string(pkiID)]
d.lock.RUnlock()
if isAlive {
if before(lastAliveTS, ts) {
d.learnExistingMembers([]*proto.SignedGossipMessage{m})
} else if !same(lastAliveTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership, "lastAliveTS:", lastAliveTS, "but got ts:", ts)
}
}
// else, ignore the message because it is too old
}
- 如果id2Member没有,说明新来的Alive节点不认识,当然要learnNewMembers
- 去alive和dead列表里面看看没有这个节点。同时有或同时没有,都不对。
- 如果在dead列表里面,而且收到的alive消息的时间戳要比这里新,说明什么?说明这个节点活了,无非就是更新id2Member,然后从dead里面移除,然后put到alive列表啦
- 如果alive列表里面有,那么说明有更新,需要learnExistingMembers
sendMemResponse
func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, internalEndpoint string, nonce uint64) {
d.logger.Debug("Entering", targetMember)
targetPeer := &NetworkMember{
Endpoint: targetMember.Endpoint,
Metadata: targetMember.Metadata,
PKIid: targetMember.PkiId,
InternalEndpoint: internalEndpoint,
}
var aliveMsg *proto.SignedGossipMessage
var err error
d.lock.RLock()
aliveMsg = d.selfAliveMessage
d.lock.RUnlock()
if aliveMsg == nil {
aliveMsg, err = d.createSignedAliveMessage(true)
if err != nil {
d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
return
}
}
memResp := d.createMembershipResponse(aliveMsg, targetPeer)
if memResp == nil {
errMsg := `Got a membership request from a peer that shouldn't have sent one: %v, closing connection to the peer as a result.`
d.logger.Warningf(errMsg, targetMember)
d.comm.CloseConn(targetPeer)
return
}
defer d.logger.Debug("Exiting, replying with", memResp)
msg, err := (&proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: nonce,
Content: &proto.GossipMessage_MemRes{
MemRes: memResp,
},
}).NoopSign()
if err != nil {
err = errors.WithStack(err)
d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}
d.comm.SendToPeer(targetPeer, msg)
}
主要做三件事情。
- 将自己包装成AliveMsg
- 打包自己所知的Alive和Dead的成员列表
- 一起打包成MembershipResponse发给对方。
AliveMsg
if m.IsAliveMsg() {
if !d.msgStore.CheckValid(m) || !d.crypt.ValidateAliveMsg(m) {
return
}
// If the message was sent by me, ignore it and don't forward it further
if d.isSentByMe(m) {
return
}
d.msgStore.Add(m)
d.handleAliveMessage(m)
d.comm.Forward(msg)
return
}
- 看着很简单,首先是要add到本地的alivemsgstore。
- handleAliveMessage前面讲过了,开始处理alivemsg
- 好东西要跟好朋友分享,处理完就转发给其他人,更有利与状态同步
MembershipResponse
if memResp := m.GetMemRes(); memResp != nil {
d.pubsub.Publish(fmt.Sprintf("%d", m.Nonce), m.Nonce)
for _, env := range memResp.Alive {
am, err := env.ToGossipMessage()
if err != nil {
d.logger.Warningf("Membership response contains an invalid message from an online peer:%+v", errors.WithStack(err))
return
}
if !am.IsAliveMsg() {
d.logger.Warning("Expected alive message, got", am, "instead")
return
}
if d.msgStore.CheckValid(am) && d.crypt.ValidateAliveMsg(am) {
d.handleAliveMessage(am)
}
}
for _, env := range memResp.Dead {
dm, err := env.ToGossipMessage()
if err != nil {
d.logger.Warningf("Membership response contains an invalid message from an offline peer %+v", errors.WithStack(err))
return
}
// Newer alive message exists or the message isn't authentic
if !d.msgStore.CheckValid(dm) || !d.crypt.ValidateAliveMsg(dm) {
continue
}
newDeadMembers := []*proto.SignedGossipMessage{}
d.lock.RLock()
if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
newDeadMembers = append(newDeadMembers, dm)
}
d.lock.RUnlock()
d.learnNewMembers([]*proto.SignedGossipMessage{}, newDeadMembers)
}
}
- 这里就是处理对方发来的alive和dead列表了。
- alive的话,当然是handleAliveMessage
- dead的话,如果id2Member没有,需要去learnNewMembers
小结
- 基本就是在alive和dead列表中流转,一方面自己有超时的策略来轮询alive列表,二是定时轮询dead列表来尽可能复活节点,三是定时对外公布自己的状态。
发起点
那么问题来了,什么情况会发起MembershipRequest呢?基本上发起点都集中在peer节点启动的时候。
InitiateSync
func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
if d.toDie() {
return
}
var peers2SendTo []*NetworkMember
m, err := d.createMembershipRequest(true)
if err != nil {
d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err))
return
}
memReq, err := m.NoopSign()
if err != nil {
d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}
d.lock.RLock()
n := d.aliveMembership.Size()
k := peerNum
if k > n {
k = n
}
aliveMembersAsSlice := d.aliveMembership.ToSlice()
for _, i := range util.GetRandomIndices(k, n-1) {
pulledPeer := aliveMembersAsSlice[i].GetAliveMsg().Membership
var internalEndpoint string
if aliveMembersAsSlice[i].Envelope.SecretEnvelope != nil {
internalEndpoint = aliveMembersAsSlice[i].Envelope.SecretEnvelope.InternalEndpoint()
}
netMember := &NetworkMember{
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
PKIid: pulledPeer.PkiId,
InternalEndpoint: internalEndpoint,
}
peers2SendTo = append(peers2SendTo, netMember)
}
d.lock.RUnlock()
for _, netMember := range peers2SendTo {
d.comm.SendToPeer(netMember, memReq)
}
}
- 这里逻辑很简单,当peer节点启动的时候,会调用这里,然后随机选取n个节点发送MembershipRequest去拉取成员列表
Connect
func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
for _, endpoint := range []string{member.InternalEndpoint, member.Endpoint} {
if d.isMyOwnEndpoint(endpoint) {
d.logger.Debug("Skipping connecting to myself")
return
}
}
d.logger.Debug("Entering", member)
defer d.logger.Debug("Exiting")
go func() {
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
id, err := id()
if err != nil {
if d.toDie() {
return
}
d.logger.Warningf("Could not connect to %v : %v", member, err)
time.Sleep(d.reconnectInterval)
continue
}
peer := &NetworkMember{
InternalEndpoint: member.InternalEndpoint,
Endpoint: member.Endpoint,
PKIid: id.ID,
}
m, err := d.createMembershipRequest(id.SelfOrg)
if err != nil {
d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err))
continue
}
req, err := m.NoopSign()
if err != nil {
d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
continue
}
req.Nonce = util.RandomUInt64()
req, err = req.NoopSign()
if err != nil {
d.logger.Warningf("Failed adding NONCE to SignedGossipMessage %+v", errors.WithStack(err))
continue
}
go d.sendUntilAcked(peer, req)
return
}
}()
}
- 在peer启动的时候会去连接peer.gossip.bootstrap来发起MembershipRequest,去拉取成员列表。