Serf下的Memberlist 源码分析

(一)Memberlist 简介

(2)Memberlist 是基于Gossip协议来传播消息,该Gossip构建在swim协议之上。

(二) Protocol

节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。

func (m *Memberlist) gossip() {
    // 随机获取K个节点
    kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
    for _, node := range kNodes {
        // 获取消息队列里的消息
        msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
        if len(msgs) == 0 {

        addr := node.Address()
        if len(msgs) == 1 {//只有一条消息
            if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
                m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
        } else {
            // Otherwise create and send a compound message
            compound := makeCompoundMessage(msgs)
            if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
                m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)

每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。

func (m *Memberlist) pushPull() {
    // Get a random live node
    nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
        return n.Name == m.config.Name ||
            n.State != stateAlive

    // If no nodes, bail
    if len(nodes) == 0 {
    node := nodes[0]

    // Attempt a push pull
    if err := m.pushPullNode(node.Address(), false); err != nil {
        m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)

// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr string, join bool) error {
    defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())

    // Attempt to send and receive with the node
    remote, userState, err := m.sendAndReceiveState(addr, join)
    if err != nil {
        return err
    if err := m.mergeRemoteState(join, remote, userState); err != nil {
        return err
    return nil


const (
    stateAlive nodeStateType = iota 

Alive (比较简单,代码不分析)

当节点启动之后,每个一定的时间间隔,会选取一个节点对其发送一个PING(UDP)消息,当PING消息失败后,会随机选取IndirectChecks个节点发起间接的PING(UDP)请求和直接在发起一个TCP PING请求。收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。

// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
    node = *m.nodes[m.probeIndex]
    if node.Name == m.config.Name {
        skip = true
    } else if node.State == stateDead {
        skip = true

    // Potentially skip
    if skip {
        goto START

    // Probe the specific node

// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
    if node.State == stateAlive {
        if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
            m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)

    select {
    case v := <-ackCh:
        if v.Complete == true {
            if m.config.Ping != nil {
                rtt := v.Timestamp.Sub(sent)
                m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
    case <-time.After(m.config.ProbeTimeout):
        m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)

    // Get some random live nodes.
    kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
        return n.Name == m.config.Name ||
            n.Name == node.Name ||
            n.State != stateAlive

    // 发起Indirect Ping 
    expectedNacks := 0
    ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
    for _, peer := range kNodes {
        // We only expect nack to be sent from peers who understand
        // version 4 of the protocol.
        if ind.Nack = peer.PMax >= 4; ind.Nack {

        if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
            m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)

    fallbackCh := make(chan bool, 1)
    if (!m.config.DisableTcpPings) && (node.PMax >= 3) {//发起tcp PING
        go func() {
            defer close(fallbackCh)
            didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
            if err != nil {
                m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
            } else {
                fallbackCh <- didContact
    } else {
    select {
    case v := <-ackCh:
        if v.Complete == true {//PING成功
    for didContact := range fallbackCh {//PING超时
        if didContact {
            m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
    s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}




Memberlist对消息的封装和逻辑处理并不提供具体的逻辑实现,只提供了一个Delegate interface ,Delegate interface里面具体的方法由serf层来封装和逻辑实现,包括集群的管理和UserEvent和Query等等。

type Delegate interface {
    // NodeMeta is used to retrieve meta-data about the current node
    // when broadcasting an alive message. It's length is limited to
    // the given byte size. This metadata is available in the Node structure.
    NodeMeta(limit int) []byte
    // NotifyMsg is called when a user-data message is received.
    // Care should be taken that this method does not block, since doing
    // so would block the entire UDP packet receive loop. Additionally, the byte
    // slice may be modified after the call returns, so it should be copied if needed
    // GetBroadcasts is called when user data messages can be broadcast.
    // It can return a list of buffers to send. Each buffer should assume an
    // overhead as provided with a limit on the total byte size allowed.
    // The total byte size of the resulting data to send must not exceed
    // the limit. Care should be taken that this method does not block,
    // since doing so would block the entire UDP packet receive loop.
    GetBroadcasts(overhead, limit int) [][]byte
    // LocalState is used for a TCP Push/Pull. This is sent to
    // the remote side in addition to the membership information. Any
    // data can be sent here. See MergeRemoteState as well. The `join`
    // boolean indicates this is for a join instead of a push/pull.
    LocalState(join bool) []byte
    // MergeRemoteState is invoked after a TCP Push/Pull. This is the
    // state received from the remote side and is the result of the
    // remote side's LocalState call. The 'join'
    // boolean indicates this is for a join instead of a push/pull.
    MergeRemoteState(buf []byte, join bool)




func (d *delegate) NotifyMsg(buf []byte) {
    // If we didn't actually receive any data, then ignore it.
    if len(buf) == 0 {
    metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf)))

    rebroadcast := false
    rebroadcastQueue := d.serf.broadcasts
    t := messageType(buf[0])
    switch t {
    case messageUserEventType:
        var event messageUserEvent
        if err := decodeMessage(buf[1:], &event); err != nil {
            d.serf.logger.Printf("[ERR] serf: Error decoding user event message: %s", err)

        d.serf.logger.Printf("[DEBUG] serf: messageUserEventType: %s", event.Name)
        rebroadcast = d.serf.handleUserEvent(&event)
        rebroadcastQueue = d.serf.eventBroadcasts

    case messageQueryType:
        var query messageQuery
        if err := decodeMessage(buf[1:], &query); err != nil {
            d.serf.logger.Printf("[ERR] serf: Error decoding query message: %s", err)

        d.serf.logger.Printf("[DEBUG] serf: messageQueryType: %s", query.Name)
        rebroadcast = d.serf.handleQuery(&query)
        rebroadcastQueue = d.serf.queryBroadcasts


        d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)



func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
    // Witness a potentially newer time

    defer s.eventLock.Unlock()

    // Ignore if it is before our minimum event time
    if eventMsg.LTime < s.eventMinTime {
        return false

    // Check if this message is too old
    curTime := s.eventClock.Time()
    if curTime > LamportTime(len(s.eventBuffer)) &&
        eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) {
            "[WARN] serf: received old event %s from time %d (current: %d)",
        return false

    // Check if we've already seen this(该消息已经收到过)
    idx := eventMsg.LTime % LamportTime(len(s.eventBuffer))
    seen := s.eventBuffer[idx]
    userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload}
    if seen != nil && seen.LTime == eventMsg.LTime {
        for _, previous := range seen.Events {
            if previous.Equals(&userEvent) {
                return false
    } else {
        seen = &userEvents{LTime: eventMsg.LTime}
        s.eventBuffer[idx] = seen

    // Add to recent events
    seen.Events = append(seen.Events, userEvent)

    // Update some metrics
    metrics.IncrCounter([]string{"serf", "events"}, 1)
    metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)
    if s.config.EventCh != nil {
        s.config.EventCh <- UserEvent{
            LTime:    eventMsg.LTime,
            Name:     eventMsg.Name,
            Payload:  eventMsg.Payload,
            Coalesce: eventMsg.CC,
    return true





type Transport interface {
    // FinalAdvertiseAddr is given the user's configured values (which
    // might be empty) and returns the desired IP and port to advertise to
    // the rest of the cluster.
    FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)

    // WriteTo is a packet-oriented interface that fires off the given
    // payload to the given address in a connectionless fashion. This should
    // return a time stamp that's as close as possible to when the packet
    // was transmitted to help make accurate RTT measurements during probes.
    // This is similar to net.PacketConn, though we didn't want to expose
    // that full set of required methods to keep assumptions about the
    // underlying plumbing to a minimum. We also treat the address here as a
    // string, similar to Dial, so it's network neutral, so this usually is
    // in the form of "host:port".
    WriteTo(b []byte, addr string) (time.Time, error)

    // PacketCh returns a channel that can be read to receive incoming
    // packets from other peers. How this is set up for listening is left as
    // an exercise for the concrete transport implementations.
    PacketCh() <-chan *Packet

    // DialTimeout is used to create a connection that allows us to perform
    // two-way communication with a peer. This is generally more expensive
    // than packet connections so is used for more infrequent operations
    // such as anti-entropy or fallback probes if the packet-oriented probe
    // failed.
    DialTimeout(addr string, timeout time.Duration) (net.Conn, error)

    // StreamCh returns a channel that can be read to handle incoming stream
    // connections from other peers. How this is set up for listening is
    // left as an exercise for the concrete transport implementations.
    StreamCh() <-chan net.Conn

    // Shutdown is called when memberlist is shutting down; this gives the
    // transport a chance to clean up any listeners.
    Shutdown() error


func newMemberlist(conf *Config) (*Memberlist, error) {
    // Set up a network transport by default if a custom one wasn't given
    // by the config.
    transport := conf.Transport
    if transport == nil {
        nc := &NetTransportConfig{
            BindAddrs: []string{conf.BindAddr},
            BindPort:  conf.BindPort,
            Logger:    logger,

        // See comment below for details about the retry in here.
        makeNetRetry := func(limit int) (*NetTransport, error) {
            var err error
            for try := 0; try < limit; try++ {
                var nt *NetTransport
                if nt, err = NewNetzzTransport(nc); err == nil {
                    return nt, nil
                if strings.Contains(err.Error(), "address already in use") {
                    logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)

            return nil, fmt.Errorf("failed to obtain an address: %v", err)


