Vote
群首选举过程是通过投票来实现的,每个投票中包含两个最基本信息:所推举 Leader 的 sid 和 zxid。在 ZooKeeper 中 Vote 数据结构主要有以下重要字段:
final private long id;
被推举 Leader 的 sid 值。
final private long zxid;
被推举 Leader 的事务id。
final private long electionEpoch;
逻辑适时钟,用来判断多个投票是否在同一个选举周期中。该值在服务器端时一个自增序列。每次进入新一轮的投票后,都会对该值 + 1。
final private long peerEpoch;
被推举 Leader 的事务群首周期。
final private ServerState state;
当前服务器的状态,默认 LOOKING。
QuorumPeer
QuorumPeer 是集群模式下特有的对象,是 ZooKeeper 服务器实例(ZooKeeperServer)的托管者,从集群层面看,QuorumPeer 代表了 ZooKeeper 集群中的一台机器。在运行期间,QuorumPeer 会不断检测当前服务器实例的运行状态,同时更具情况发起 Leader 选举。
重要字段
QuorumCnxManager qcm;
FastLeaderElection 算法下负责各台服务器之间的底层 Leader 选举过程中的网络通信。
private ZKDatabase zkDb;
ZooKeeper 的内存数据库,其中存储了 DataTree 对象。它只在 ZooKeeper 启动时创建一次。
public enum ServerState {
LOOKING, FOLLOWING, LEADING, OBSERVING;
}
private ServerState state = ServerState.LOOKING;
服务器状态:
- LOOKING: 寻找 Leader 状态。当服务器处于该状态时,它会认为当前集群中没有 Leader,因此进入 Leader 选举流程;
- FOLLOWING: 跟随者状态,表明当前服务器角色是 Follower;
- LEADING: 群首状态,当前服务器角色是 Leader;
- OBSERVING: 观察者状态,当前服务器角色是 Observer;
初始化进入 Leader 选举流程。
public enum LearnerType {
PARTICIPANT, OBSERVER;
}
private LearnerType learnerType = LearnerType.PARTICIPANT;
是否参与选举或者事务仲裁,默认 PARTICIPANT 参与仲裁选举群首。
protected Map<Long, QuorumServer> quorumPeers;
public int getQuorumSize(){return getVotingView().size();}
quorumPeers 存储组成集群的服务器,服务器的 sid 作为 key。getQuorumSize 函数返回 PARTICIPANT 服务器的数量。
private QuorumVerifier quorumConfig;
判断服务器数量是否形成仲裁,QuorumVerifier 核心方法是 containsQuorum,该方法返回 true,则表示服务器数量形成仲裁,返回 false 则没有形成仲裁不能采纳。默认采用 QuorumMaj 实现,即过半数仲裁。
private long myid;
本服务器的 sid。
volatile private Vote currentVote;
本服务器的群首选举投票。
Election electionAlg;
本服务器采用的群首选举策略,默认 FastLeaderElection。Election 接口核心方法是 lookForLeader,该方法实现群首选举算法。
public Follower follower;
public Leader leader;
public Observer observer;
根据群首选举结果,设置这三个变量。
重要方法
public long getLastLoggedZxid()
获取当前服务器最后记录的 zxid。
public long getCurrentEpoch() throws IOException
public void setCurrentEpoch(long e) throws IOException
获取和设置本服务器推举 Leader 的群首周期。
synchronized public void startLeaderElection()
初始化后,start 接口调用 startLeaderElection 启动群首选举。它主要负责完成:
- 初始化当前选票 currentVote
- 配置本服务器负责选举的 ip 地址
- 配置 electionAlg,群首选举算法
随后 start 调用 super.start 启动 QuorumPeer 线程。
public void run()
QuorumPeer 最核心的方法,在 while 循环中 调用 getPeerState 判断本服务器状态:
LOOKING 时,则 setCurrentVote(makeLEStrategy().lookForLeader()) 获取群首。
FOLLOWING 时,则调用 follower.followLeader() 行使跟随者角色。followLeader 方法中也有 while 循环,如果该方法返回,说明本服务器崩溃或者群首崩溃,本服务器重新进入 LOOKING 状态;
LEADING 时,则调用 leader.lead() 行使群首角色。lead 方法中也有 while 循环,如果该方法返回,说明本服务器崩溃,本服务器重新进入 LOOKING 状态;
OBSERVING 与上面状态类似。
QuorumCnxManager
QuorumCnxManager 主要用在 FastLeaderElection 算法下,负责算法底层各个服务器之间的 TCP 连接和消息收发。它会连接集群中的其他服务器,并保证相同的两台服务器之间只有一个 TCP 连接。
重要字段
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
消息发送器集合。每个 SendWorker,都对应一台远程 ZooKeeper 服务器,负责消息的发送,Map 的 key 是服务器的 sid。
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
消息发送队列集合,用于保存哪些待发送的消息。和 senderWorkerMap 一样,按照服务器的 sid 分组,为每台机器分配一个单独队列,从而保证各台机器之间的消息发送互不影响。
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
向每个服务器最后一次发送过的消息。按照服务器 sid 分组,和其他服务器连接成功后,会补发一次记录在这个 map 中的消息,防止上次连接断开时最后一条消息发送失败。接受端会做好去重操作。
public final ArrayBlockingQueue<Message> recvQueue;
消息接收队列,用于存放那些从其他服务器接收到的消息。 QuorumCnxManager 向 recvQueue 添加接收到的消息,FastLeaderElection 会消费处理队列中的消息。
private final Object recvQLock = new Object();
用于同步多线程访问 recvQueue。
建立连接
为了能够进行互相投票,ZooKeeper 集群中的所有机器都需要两两建立起网络连接。QuorumCnxManager 在启动的时候,会创建一个 ServerSocket 来监听 Leader 选举的通信端口(默认 3888)。
synchronized void connectOne(long sid)
public boolean initiateConnection(Socket sock, Long sid)
connectOne 接口主动连接 sid 对应的服务器,它会调用 initiateConnection 初始化连接成功后,立马发送本服务器的 sid。
public boolean receiveConnection(Socket sock)
该函数负责接收其他服务器的连接请求,连接建立成功后,会读取一个 Long 数据,该数据就是对方的 sid,如果对方 sid 小于本服务器 sid,则断开请求,本服务器 connectOne 去主动连接对方;否则保留这个 socket 连接,并初始化 SendWorker 和 RecvWorker,并启动他们。
所以 ZooKeeper 是依赖服务器 sid 之间的大小关系来建立连接,也就是说 sid 大的服务器主动去和其他服务器,保证两个服务器之间只有一个 TCP 连接。
消息接收发送
消息的接收过程是由 RecvWorker 来负责的,RecvWorker 实例存储在 SendWorker 中,所以每个远程服务器都有一个单独的 RecvWorker。它主要是在 while 循环不断从连接中读取数据,然后调用 addToRecvQueue,将消息加入到 recvQueue 中。
消息发送过程也比较简单,ZooKeeper 同样为每个远程服务器分配一个 SendWorker。每个 SendWorker 在 while 循环中,不断从 queueSendMap 中对应的队列中获取数据并发送,发送成功后会将该消息存入 lastMessageSent。lastMessageSent 的作用前文已经提过,主要是提供重发机制,防止上次消息未被正确处理。
FastLeaderElection
群首选举算法,需要实现 Election 接口
public interface Election {
public Vote lookForLeader() throws InterruptedException;
public void shutdown();
}
ZooKeeper 目前提供 FastLeaderElection,AuthFastLeaderElection,LeaderElection,其中后两种已经被废弃。所以我们主要以 FastLeaderElection 来理解 ZooKeeper 的群首选举。
内部类和选票管理
- Notification 和 ToSend
Vote 是选票,但是在服务器之前传输的数据结构是 FastLeaderElection.ToSend。如果是获取的消息,ZooKeeper 会将其转成FastLeaderElection.Notification 存入 FastLeaderElection 对象的recvqueue。结这两个数据结构和 Vote 类似。
- sendqueue
选票发送队列,用于保存等待发送的选票
LinkedBlockingQueue<ToSend> sendqueue;
- recvqueue
选票接收队列,用于保存接收到的其他服务器的投票
LinkedBlockingQueue<Notification> recvqueue;
- Messenger
用于传输数据,它有两个内部类:Messenger.WorkReceiver 和 Messenger.WorkSender,他们都实现 Runnable 接口,可用于多线程。
WorkReceiver 在 while 循环中不断调用 QuorumCnxManager.pollRecvQueue 方法,从 recvQueue 获取 ToSend 消息,转成 Notification 对象后存入 recvqueue。如果对方是 Observer 或者对方的选举轮次落后本服务器,则将本服务器选票压入 sendqueue,以便发送。若当前服务器并不是 LOOKING 状态,即已经选举出 Leader,那么也将对方投票,同时将 Leader 信息以投票的形式发给对方。
WorkSender 在 while 循环中不断从 sendqueue 获取选票通知,然后调用 QuorumCnxManager.toSend 方法,将选票发送给 ToSend 对象 sid 指定的服务器。
整个选票管理流程如下图所示:
其他字段和方法
除了上面说到的用于选票管理的内部类和成员变量,FastLeaderElection 还有其他重要的字段和辅助方法。
QuorumPeer self;
引用本服务器在集群中的实例对象,可以通过 QuorumPeer 获取其他参与者服务器的 sid。
volatile long logicalclock; /* Election instance */
本服务器的投票轮次,通常和 ToSend.electionEpoch 比较。
long proposedLeader;
本服务器当前选举的 Leader 服务器的 sid。
long proposedZxid
本服务器当前选举的 Leader 服务器的事务 id。
long proposedEpoch;
本服务器当前选举的 Leader 服务器的事务群首周期,用于选举时快速比较投票。
private void sendNotifications() ;
将本服务器投票发送给所有参与者(PARTICIPANT)服务器。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch)
比较两张选票哪张更新,如果 newId 所处的选票更新,则返回 true,通常用于比较外部收到的选票和本服务器的选票。
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote);
判断选票是否达到仲裁数量,如果返回 true 说明能够选出 Leader 服务器。参数:
- votes 是收到的外部选票的集合;
- vote 是本服务器的选票。
它会调用 self.getQuorumVerifier().containsQuorum 接口判断选票是否满足仲裁条件。
final static int finalizeWait = 200;
选票达到仲裁数量后,ZooKeeper 会继续等待 finalizeWait 毫秒,如果期间有新的选票到达,则会重新计票,如果没有则说明选举结束,能选出新的 Leader。
算法核心 lookForLeader
群首选举算法的基本流程,其实就是 lookForLeader 方法的逻辑,如果当前服务器状变变成 LOOKING 时,就会调用 lookForLeader 方法来进行群首选举。该代码非常长,由于篇幅有限,我会截取部分重要代码说明流程。
- 选举初始化操作
初始化操作包括自增选举轮次,初始化选票和选票集合。
811 行, logicalclock 用于标识本服务器的选举轮次,ZooKeeper 规定所有有效的投票都必须在同一轮次中。ZooKeeper 在开始新一轮投票时,会首先对 logicalclock 进行自增操作。
812 行初始化选票,即将本服务器的 sid,最后一次事务 zxid 和事务群首周期作为选票。
804 行,选票集合 recvset 是一个 HashMap,以外部服务器的 sid 作为 key,它们的选票 Vote 作为 value。
- 发送初始化选票
完成选票的初始化后,本服务器就会调用 sendNotifications() 方法,发起第一次投票调用。
-
进入循环直到 Leader 服务器被选出
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ ...... }
接收外部投票
在循环中,本服务器会不断从 recvqueue 获取其他服务器的投票。
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
... ...
}
如果投票为空,那么就会立即确认自己是否和集群中其他服务器保持连接,如果没有,那么就会马上建立连接;如果已经建立了连接,就在此发送自己的当前选票。
- 判断选举轮次
如果投票不为空,则首选确定选票轮次,判断选票有效性。
如果当前选举轮次落后于他人选票,说明之前收集的选票已无效
859,860 行,本服务器会更新 logicalclock,并清空选票集合 recvset。
862~869 行,本服务器进行选票 PK,更新选票。
870 行,本服务器发送当前选票通知其他服务器。
如果收到的投票轮次小于本服务器轮次,则忽略该选票。
如果外部选票轮次和本服务器轮次一致,则:
878~880 行,选票 PK,并更新本地选票。
881 行,本服务器将最新选票通知集群其他服务器。
- 选票 PK
辅助函数 totalOrderPredicate 用于进行选票 PK,来确定是否需要更新本地选票。
符合任意一个条件就需要进行投票变更:
- 外部选票 Leader 的群首周期大于内部选票的群首周期;
- 1条件相等的情况下,对比两者的 zxid,如果外部选票的更大,则需要变更选票;
- 2条件相等的情况下,对比两台服务器的 sid,如果外部选票的 sid 更大,则需要变更选票。
- 变更选票
通过选票 PK 后,如果外部选票更优,则调用 updateProposal() 接口更新 proposedLeader,proposedZxid,proposedEpoch 成员变量。
更新完毕后,调用 sendNotifications() 接口将最新的选票通知其他服务器。
- 归档选票
无论是否进行选票变更,本服务器就会将外部选票存入 recvset。
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
recvset 按照服务器的 sid 来区分选票,一台服务器最多只会存入一张选票。
- 统计选票
完成投票归档后,就可以开始统计选票。统计选票逻辑在辅助函数 termPredicate() 中。如果该函数返回 false,说明还没有选出 Leader,因为在循环中,服务器又会进入 接收外部选票 路径。
- 更新服务器状态
统计选票选出 Leader 后,就会准备开始更新服务器状态。
898 ~ 904 行,在更新状态之前,服务器会等待 finalizeWait 毫秒时间来接收新的选票,以防止漏下关键选票。如果收到可能改变 Leader 的新选票,则重新进行计票。
911 ~ 920 行,在规定时间内没有收到其他选票,则更新本服务器状态,如果 Leader 服务器的 sid 是本服务器 sid,则更新会 LEADING,否则为 FOLLOWING 或者 OBSERVING。
至此群首选举完成,整个流程图如下所示。
内容来源
从 Paxos 到 ZooKeeper 分布式一致性原理与实践
ZooKeeper 分布式过程协同技术详解
ZooKeeper 3.4.6 源代码