consul 通过serf 实现去中心化,可以通过和现有任何节点交互来加入集群
通过udp数据报检测存活,如果有一次检测超时,那么就标记为随机选取一些存活的节点,通过他们来检测此疑似失效节点的存活性,如果也超时了,就加入广播队列,等待广播(如果有节点消息通信的话,也会携带)
此处只分析Lan(wan同理)
github.com/hashicorp/consul/command/agent/agent.go中
agent 命令真实入口
func (c *cmd) run(args []string) int {
...
if err := agent.Start(); err != nil {
...
if err := c.startupJoin(agent, config); err != nil {
...
}
func (c *cmd) startupJoin(agent *agent.Agent, cfg *config.RuntimeConfig) error {
...
n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN)
...
}
github.com/hashicorp/consul/agent/agent.go中
func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
...
n, err = a.delegate.JoinLAN(addrs)
...
}
github.com/hashicorp/consul/agent/consul/client.go中
拿client举例
func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
...
c.serf, err = c.setupSerf(config.SerfLANConfig,
c.eventCh, serfLANSnapshot)
...
监听serf的事件
go c.lanEventHandler()
...
}
func (c *Client) JoinLAN(addrs []string) (int, error) {
return c.serf.Join(addrs, true)
}
github.com/hashicorp/consul/agent/consul/client_serf.go中
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
...
conf.EventCh = ch
return serf.Create(conf)
}
func (c *Client) lanEventHandler() {
...
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
...
}
github.com/hashicorp/serf/serf/serf.go中
func Create(conf *Config) (*Serf, error) {
...
通过他来让memberlist检测收到节点的变化通知到serf层
conf.MemberlistConfig.Events = &eventDelegate{ser f: serf}
...
实际的gossip处理者
memberlist, err := memberlist.Create(conf.Memberl istConfig)
(当尝试重新连接失效节点超时后,当离开的节点超时后(离开的节点不会立即移除))移除节点
go serf.handleReap()
尝试重新连接失效节点
go serf.handleReconnect()
...
}
func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
...
num, err := s.memberlist.Join(existing)
...
}
func (s *Serf) handleNodeJoin(n *memberlist.Node) {
...
通知到consul层
if s.config.EventCh != nil {
s.config.EventCh <- MemberEvent{
Type: EventMemberJoin,
Members: []Member{member.Member},
}
}
...
}
github.com/hashicorp/serf/serf/event_delegate.g中
func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
e.serf.handleNodeJoin(n)
}
github.com/hashicorp/memberlist/memberlist.go中
func Create(conf *Config) (*Memberlist, error) {
...
m.schedule()
...
}
func (m *Memberlist) Join(existing []string) (int, error) {
...
if err := m.pushPullNode(hp, true); err != nil {
...
}
github.com/hashicorp/memberlist/state.go中
func (m *Memberlist) pushPullNode(addr string, join bool) error {
...
remote, userState, err := m.sendAndReceiveState(addr, join)
...
if err := m.mergeRemoteState(join, remote, userState); err != nil {
...
}
func (m *Memberlist) schedule() {
...
执行节点检测存活性
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
...
go m.triggerFunc(m.config.GossipInterval, t.C , stopCh, m.gossip)
...
}
消息广播
func (m *Memberlist) gossip() {
...
for _, node := range kNodes {
...
if err := m.rawSendMsgPacket(addr, &node. Node, msgs[0]); err != nil {
...
}
...
}
func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
...
for {
select {
case <-C:
f()
case <-stop:
return
}
}
...
}
节点存活性检测
func (m *Memberlist) probe() {
...
m.probeNode(&node)
}
func (m *Memberlist) probeNode(node *nodeState) {
...
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
...
随机获取存活的检点,让他们帮助ping此疑似失效节点
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
})
...
如果返回正常(ack),那么complete就是true
否则超时就是false
select {
case v := <-ackCh:
if v.Complete == true {
return
}
}
...
失败标记为疑似失效节点
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
加入广播队列
m.suspectNode(&s)
}
func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
ackFn := func(payload []byte, timestamp time.Time) {
select {
case ackCh <- ackMessage{true, payload, timestamp}:
default:
}
...
超时通知
ah.timer = time.AfterFunc(timeout, func() {
m.ackLock.Lock()
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
select {
case ackCh <- ackMessage{false, nil, time.Now()}:
default:
}
})
}