(一)Memberlist 简介
(1)Memberlist是用来管理分布式集群内节点发现,节点故障检测、节点列表发现的软件。
(2)Memberlist 是基于Gossip协议来传播消息,该Gossip构建在swim协议之上。
(二) Protocol
集群内的广播
节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。
func (m *Memberlist) gossip() {
// 随机获取K个节点
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
......
})
for _, node := range kNodes {
// 获取消息队列里的消息
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
return
}
addr := node.Address()
if len(msgs) == 1 {//只有一条消息
//通过UDP发送消息
if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
} else {
// Otherwise create and send a compound message
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
}
}
}
Push/Pull
每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()
// If no nodes, bail
if len(nodes) == 0 {
return
}
node := nodes[0]
// Attempt a push pull
if err := m.pushPullNode(node.Address(), false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
}
}
// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr string, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(addr, join)
if err != nil {
return err
}
//合并所有node信息
if err := m.mergeRemoteState(join, remote, userState); err != nil {
return err
}
return nil
}
节点的三个状态
const (
stateAlive nodeStateType = iota
stateSuspect
stateDead
)
Alive (比较简单,代码不分析)
当节点上线的时候,向集群广播Alive消息。
Probe
当节点启动之后,每个一定的时间间隔,会选取一个节点对其发送一个PING(UDP)消息,当PING消息失败后,会随机选取IndirectChecks个节点发起间接的PING(UDP)请求和直接在发起一个TCP PING请求。收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。
// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
......
node = *m.nodes[m.probeIndex]
if node.Name == m.config.Name {
skip = true
} else if node.State == stateDead {
skip = true
}
// Potentially skip
m.nodeLock.RUnlock()
m.probeIndex++
if skip {
numCheck++
goto START
}
// Probe the specific node
m.probeNode(&node)
}
// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
......
if node.State == stateAlive {
//发送一个Ping消息
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
}
}else{
......
}
//等待Ping的返回结果
select {
case v := <-ackCh:
//Ping成功
if v.Complete == true {
if m.config.Ping != nil {
rtt := v.Timestamp.Sub(sent)
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
}
return
}
......
case <-time.After(m.config.ProbeTimeout):
//Ping超时
m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
}
// Get some random live nodes.
m.nodeLock.RLock()
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()
// 发起Indirect Ping
expectedNacks := 0
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}
if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
}
}
fallbackCh := make(chan bool, 1)
if (!m.config.DisableTcpPings) && (node.PMax >= 3) {//发起tcp PING
go func() {
defer close(fallbackCh)
didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
} else {
fallbackCh <- didContact
}
}()
} else {
close(fallbackCh)
}
select {
case v := <-ackCh:
if v.Complete == true {//PING成功
return
}
}
for didContact := range fallbackCh {//PING超时
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
return
}
}
.....
//失败成suspect
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
suspect
(1)当节点被标记为suspect,本地启动一个定时器,发出一个suspect广播,在此期间如果收到其他节点发送过来的supect消息,就将本地的suspect确认数加1,当确认数达到要求之后并且该节依旧不是alive状态,会将该节点标记dead。
(2)如果本地节点收到一个针对本地节点的suspect消息,本地节点会发送一个alive广播,修正本节点在其他节点上的状态。
dead
和suspect过程类型这里就不再赘述。
(三)Degegate
Memberlist对消息的封装和逻辑处理并不提供具体的逻辑实现,只提供了一个Delegate interface ,Delegate interface里面具体的方法由serf层来封装和逻辑实现,包括集群的管理和UserEvent和Query等等。
type Delegate interface {
// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
NodeMeta(limit int) []byte
// NotifyMsg is called when a user-data message is received.
// Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed
NotifyMsg([]byte)
// GetBroadcasts is called when user data messages can be broadcast.
// It can return a list of buffers to send. Each buffer should assume an
// overhead as provided with a limit on the total byte size allowed.
// The total byte size of the resulting data to send must not exceed
// the limit. Care should be taken that this method does not block,
// since doing so would block the entire UDP packet receive loop.
GetBroadcasts(overhead, limit int) [][]byte
// LocalState is used for a TCP Push/Pull. This is sent to
// the remote side in addition to the membership information. Any
// data can be sent here. See MergeRemoteState as well. The `join`
// boolean indicates this is for a join instead of a push/pull.
LocalState(join bool) []byte
// MergeRemoteState is invoked after a TCP Push/Pull. This is the
// state received from the remote side and is the result of the
// remote side's LocalState call. The 'join'
// boolean indicates this is for a join instead of a push/pull.
MergeRemoteState(buf []byte, join bool)
}
~
~
NodeMeta
NotifyMsg
NotifyMsg是serf中整个消息中间件的核心接口,gossip协议层所有的消息都会回调到NotifyMsg。
func (d *delegate) NotifyMsg(buf []byte) {
// If we didn't actually receive any data, then ignore it.
if len(buf) == 0 {
return
}
metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf)))
rebroadcast := false
rebroadcastQueue := d.serf.broadcasts
t := messageType(buf[0])
switch t {
......
case messageUserEventType:
var event messageUserEvent
if err := decodeMessage(buf[1:], &event); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding user event message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageUserEventType: %s", event.Name)
rebroadcast = d.serf.handleUserEvent(&event)
rebroadcastQueue = d.serf.eventBroadcasts
case messageQueryType:
var query messageQuery
if err := decodeMessage(buf[1:], &query); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding query message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageQueryType: %s", query.Name)
rebroadcast = d.serf.handleQuery(&query)
rebroadcastQueue = d.serf.queryBroadcasts
default:
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
}
......
}
NotifyMsg包含了多种消息类型的处理,这里我们只分析UserEvent和Query,如果对其他消息的处理有信息可以阅读源码。其中UserEvent实现了集群中不需要反馈的,单向通信,Query和QueryResponse实现了集群中双向通信。
func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
// Witness a potentially newer time
s.eventClock.Witness(eventMsg.LTime)
s.eventLock.Lock()
defer s.eventLock.Unlock()
// Ignore if it is before our minimum event time
if eventMsg.LTime < s.eventMinTime {
return false
}
// Check if this message is too old
curTime := s.eventClock.Time()
if curTime > LamportTime(len(s.eventBuffer)) &&
eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) {
s.logger.Printf(
"[WARN] serf: received old event %s from time %d (current: %d)",
eventMsg.Name,
eventMsg.LTime,
s.eventClock.Time())
return false
}
// Check if we've already seen this(该消息已经收到过)
idx := eventMsg.LTime % LamportTime(len(s.eventBuffer))
seen := s.eventBuffer[idx]
userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload}
if seen != nil && seen.LTime == eventMsg.LTime {
for _, previous := range seen.Events {
if previous.Equals(&userEvent) {
return false
}
}
} else {
seen = &userEvents{LTime: eventMsg.LTime}
s.eventBuffer[idx] = seen
}
// Add to recent events
seen.Events = append(seen.Events, userEvent)
// Update some metrics
metrics.IncrCounter([]string{"serf", "events"}, 1)
metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)
//将消息发往Agent层
if s.config.EventCh != nil {
s.config.EventCh <- UserEvent{
LTime: eventMsg.LTime,
Name: eventMsg.Name,
Payload: eventMsg.Payload,
Coalesce: eventMsg.CC,
}
}
return true
}
Query消息的处理机制和UserEvent类型,这里不再赘述。
其他接口比较简单这里就不再做分析了。
(四)Transport
Memberlist的内部包含了TCP和UDP的通信过程,所以需要一个Transport来实现底层的api。
Memberlist内部默认提供了Transport的实现,当然也可以通过Config来配置定制的Transport.
Transport的接口如下
type Transport interface {
// FinalAdvertiseAddr is given the user's configured values (which
// might be empty) and returns the desired IP and port to advertise to
// the rest of the cluster.
FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
// WriteTo is a packet-oriented interface that fires off the given
// payload to the given address in a connectionless fashion. This should
// return a time stamp that's as close as possible to when the packet
// was transmitted to help make accurate RTT measurements during probes.
//
// This is similar to net.PacketConn, though we didn't want to expose
// that full set of required methods to keep assumptions about the
// underlying plumbing to a minimum. We also treat the address here as a
// string, similar to Dial, so it's network neutral, so this usually is
// in the form of "host:port".
WriteTo(b []byte, addr string) (time.Time, error)
// PacketCh returns a channel that can be read to receive incoming
// packets from other peers. How this is set up for listening is left as
// an exercise for the concrete transport implementations.
PacketCh() <-chan *Packet
// DialTimeout is used to create a connection that allows us to perform
// two-way communication with a peer. This is generally more expensive
// than packet connections so is used for more infrequent operations
// such as anti-entropy or fallback probes if the packet-oriented probe
// failed.
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
// StreamCh returns a channel that can be read to handle incoming stream
// connections from other peers. How this is set up for listening is
// left as an exercise for the concrete transport implementations.
StreamCh() <-chan net.Conn
// Shutdown is called when memberlist is shutting down; this gives the
// transport a chance to clean up any listeners.
Shutdown() error
}
在newMemberlist里面会判断是否使用默认的Transport,这部分代码如下:
func newMemberlist(conf *Config) (*Memberlist, error) {
......
// Set up a network transport by default if a custom one wasn't given
// by the config.
transport := conf.Transport
if transport == nil {
nc := &NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
}
// See comment below for details about the retry in here.
makeNetRetry := func(limit int) (*NetTransport, error) {
var err error
for try := 0; try < limit; try++ {
var nt *NetTransport
if nt, err = NewNetzzTransport(nc); err == nil {
return nt, nil
}
if strings.Contains(err.Error(), "address already in use") {
logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
continue
}
}
return nil, fmt.Errorf("failed to obtain an address: %v", err)
}
......
}