zookeeper集群角色
- leader:集群leader,负责处理事务请求,处理查询请求
- follower:集群follower,同步leader节点数据,转化事务请求到leader,处理查询请求,参与选举投票
- observer:与follower不同于,不参与选举的投票
myid
集群节点id属性
epoch
在每一轮投票递增,记录投票的”朝代“。
zxid(事务id)
leader节点在每次事务操作后都会递增zxid,follower节点同步leader的zxid,保证节点数据与leader节点数据一致。
节点的状态
- looking:选举阶段
- following:follower节点
- observering:observer节点,不参与选举
- leading:leader节点
leader选举选票的判断顺序
每一轮选举都会发送 myid/zxid/epoch
- epoch最大
- zxid最大
- myid最大
- 当某个节点的得票数大于集群节点数一半则成为leader
leader的选举过程
假设一个3个节点的集群,启动时节点状态为looking,这时每个节点的epoch为1,每个节点都投票自己为leader(1/zxid/1,2/zxid/1,3/zxid/1),每个节点都会收到其他节点的投票信息。
这时epoch一致的情况下判断zxid,如果都一致,说明每个节点的数据都是最新的,则判断myid,修改自己节点的投票信息,以选票中的最大值修改。
节点会保存有其他节点发送过来的选票,自己发送出去的投票信息。每一轮投票都递增epoch。每个节点保存自己的epoch。
当节点收到其他节点发送过来的投票,判断epoch,相等则判断zxid,如果还相等再判断myid。
而如果epoch不相等,则更新节点保存的自己的投票信息为对方的投票信息,并保存所有接收到的投票信息,发送自己新的选票。对接收到的投票信息进行归纳,判断接收到的选票有超过一般的选票与自己刚刚更新保存的选票信息相同的leader,则选择该节点为leader,否则继续投票。
如果节点成为了follower,则需要设置节点状态为follower,再链接leader节点,再同步数据。
每个节点有一个队列保存其他节点发送过来的投票信息,当还没选举出leader时,不断循环判断选票信息,直到有新的选票信息比当前保存的还有大。然后不断循环直到选出leader。
源码导读
基于3.5.5版本
首先从zk服务器的入口开始,查看zkServer.sh脚本,其中启动关键的命令为
//1
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
//2
nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
那么可以知道org.apache.zookeeper.server.quorum.QuorumPeerMain
为启动的入口
@InterfaceAudience.Public
public class QuorumPeerMain {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
private static final String USAGE = "Usage: QuorumPeerMain configfile";
protected QuorumPeer quorumPeer;
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
//
}
//...省略代码
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析配置文件
config.parse(args[0]);
}
// ....
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
// ...
}
}
启动方法QuorumPeerMain #runFromConfig
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
// 节点通信连接相关
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
// 默认没有指定为 NIOServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
// 节点参数的配置
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
// leader选举类型 默认为3,其他类型再源码中也已标记为过期
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
// 数据存储
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
// 默认实现QuorumMaj 集群节点信息,(集群节点总数,投票节点总数,observer节点数)
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
// 节点类型
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
// 启动 QuorumPeer,这里的run方法中判断投票结果
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
查看启动方法quorumPeer.start();
,后面会讲到run方法,先记住super.start()
调用的是QuorumPeer
的run()
方法
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 加载磁盘数据,设置zxid,epoch信息
loadDataBase();
// 启动socket服务
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 开始选举投票工作
startLeaderElection();
// 子类重写了run方法,实质调用子类的run方法
super.start();
}
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
//创建选票信息
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
//...
}
// 默认3 其他类型也已经被标记为弃用,返回FastLeaderElection
this.electionAlg = createElectionAlgorithm(electionType);
}
FastLeaderElection
类结构
public class FastLeaderElection{
QuorumPeer self;
// 负责与其他节点的通信,选票的发送、接收
QuorumCnxManager manager;
// 负责选票的生成、接收 JVM线程间
Messenger messenger;
// 负责与其他节点的通信,选票的发送、接收
QuorumCnxManager manager;
//票据发送队列
LinkedBlockingQueue<ToSend> sendqueue;
//票据接收队列
LinkedBlockingQueue<Notification> recvqueue;
public void start() {
this.messenger.start();
}
}
protected class Messenger {
// 投票信息发送
WorkerSender ws;
// 接收投票信息
WorkerReceiver wr;
// 封装线程
Thread wsThread = null;
// 封装线程
Thread wrThread = null;
void start(){
this.wsThread.start();
this.wrThread.start();
}
}
再查看WorkerReceiver
和WorkerSender
class WorkerReceiver extends ZooKeeperThread {
volatile boolean stop;
QuorumCnxManager manager;
WorkerReceiver(QuorumCnxManager manager) {
super("WorkerReceiver");
this.stop = false;
this.manager = manager;
}
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
// 获取接收到的投票信息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
//。。。
// Instantiate Notification and set its attributes
Notification n = new Notification();
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
int version = 0x0;
QuorumVerifier rqv = null;
// ... 省略代码
// 下面做的是判断票据信息
if(!validVoter(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
// 节点不在集群列表中,发送投票数据
sendqueue.offer(notmsg);
} else {
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
//选举中保存接收到的信息到队列
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
// 发送方也在选举中,且朝代比自己老,则需要发送投票信息
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
// 当前节点已完成选举,发送方还在选举,发送当前投票信息
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}
// 发送线程
class WorkerSender extends ZooKeeperThread {
volatile boolean stop;
QuorumCnxManager manager;
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
// 获取发送队列信息,发送
process(m);
} catch (InterruptedException e) {
break;
}
}
}
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
manager.toSend(m.sid, requestBuffer);
}
}
到这里,都是投票信息的发送的逻辑,其中涉及几个队列的数据轮转,需要留意队列建数据的关系。
再来看选举结果的判断逻辑,QuorumPeer#run
@Override
public void run() {
updateThreadName();
// ...
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
// 通过setCurrentVote(makeLEStrategy().lookForLeader());启动leader选举
if (Boolean.getBoolean("readonlymode.enabled")) {
// ...
try {
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// ...
}
} else {
try {
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
// ...
break;
case FOLLOWING:
// ...
break;
case LEADING:
// ...
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
// ...
}
}
makeLEStrategy().lookForLeader()
,实际调用的是FastLeaderElection#lookForLeader()
public Vote lookForLeader() throws InterruptedException {
// ...
try {
// 保存投票信息,用于判断结果
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
// 更新当前票据
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 发送消息
sendNotifications();
// 循环执行判断
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
// 从接收到的票据队列中拿数据
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
}
else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// 接收到的票据信息比较自己的朝代大
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
// 清空历史投票数据
recvset.clear();
//比较当前节点的票据与接收到的票据哪个比较大,更新当前节点投票信息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
// 发送方的投票信息比自己还旧,不处理这个投票
} else if (n.electionEpoch < logicalclock.get()) {
break;
// 朝代相同 判断用哪个比较大的票据信息,更新节点票据信息,发送新的票据信息
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
// 保存投票信息
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 这里方法判断投票结果
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// 过滤接收队列中朝代比当前旧的数据
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
// 所有数据都是最新的且已经选举完成,更新节点状态
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
// ...
break;
case FOLLOWING:
case LEADING:
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
// 当前节点已经完成选举,不参与投票,epoch不会变。其他节点任然在投票
outofelection.put(n.sid, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
termPredicate
方法
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// voteSet维护了一个HashSet集合,将所有节点的投票信息与当前Vote相同节点的添加到set中
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
// 这个方法判断set中有超过半数节点投票为vote中的节点,结束选举
return voteSet.hasAllQuorums();
}
几个主要的类属性、方法
留意到FastLeaderElection
下的Messenger
包含WorkerSender
、WorkerReceiver
,这两个线程负责当前节点的选票信息的更新。而QuorumCnxManager
中SendWorker
和RecvWorker
负责一其他节点的选票投递,其中是通过Socket通信完成。看一下选票再集群间的流转情况。
总结
节点选举的主要流程