简述
在分布式环境中,常常需要遇到这样一个场景:对于一个复杂的任务,需要从集群中选举一台机器进行处理即可。这种分布式问题就是“Master选举”。
思路
利用zookeeper的特性很容易实现Master选举功能,思路如下:
选择一个根节点,例如:/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用zookeeper特性,最终只有一个机器能够创建成功,成功的机器就是Master。
master选举还可以利用数据库的主键唯一性进行,这里不在细说。
Curator实现
Curator是对zookeeper的封装,可以调用更简单的api实现选举。Curator提供两种Leader选举策略:
- Leader Latch
随机选取一台机器作为master,除非显示调用close方法释放leadership,否则其他机器无法成为master。这种方案适合主备应用,在主应用宕机后,从剩下的备用应用选出一个成为新的主应用。 - Leader Election
这种选举策略跟Leader Latch选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。
Leader Latch
package com.demo;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class LeaderLatchDemo {
private static final String PATH = "/demo/leader";
public static void main(String[] args)throws Exception {
String SERVER ="127.0.0.1:2181";
List<LeaderLatch> latchList = new ArrayList<LeaderLatch>();
List<CuratorFramework> clients = new ArrayList<CuratorFramework>();
try {
for (int i = 0; i < 10; i++) {
CuratorFramework client = getClient(SERVER);
final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);
leaderLatch.addListener(new LeaderLatchListener() {
public void isLeader() {
System.out.println("I am leader:"+leaderLatch.getId());
}
public void notLeader() {
}
});
latchList.add(leaderLatch);
leaderLatch.start();
}
Thread.sleep(5000);
LeaderLatch leader = null;
for(LeaderLatch leaderLatch : latchList){
if(leaderLatch.hasLeadership()){
leader = leaderLatch;
break;
}
}
System.out.println("当前leader是: " + leader.getId());
leader.close();
latchList.remove(leader);
LeaderLatch firstNode = latchList.get(0); //获取此时第一个节点
System.out.println("删除leader后,当前第一个节点: " + firstNode.getId());
firstNode.await(10, TimeUnit.SECONDS); //阻塞并尝试获取领导权,可能失败
//再次获取当前leader
for(LeaderLatch tmp : latchList){
if(tmp.hasLeadership()){
leader = tmp;
break;
}
}
System.out.println("最终实际leader是: " + leader.getId());
} catch (Exception e) {
e.printStackTrace();
} finally {
for(CuratorFramework client : clients){
CloseableUtils.closeQuietly(client);
}
for(LeaderLatch leaderLatch : latchList){
CloseableUtils.closeQuietly(leaderLatch);
}
}
}
private static CuratorFramework getClient(String connectString) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(3000)
.namespace("demo")
.build();
client.start();
return client;
}
}
输出:
client#6:I am leader. I am doing jobs!
当前leader是: client#6
删除leader后,当前第一个节点: client#0
client#9:I am leader. I am doing jobs!
最终实际leader是: client#9
Leader Election
public class CustomLeaderSelectorListenerAdapter extends
LeaderSelectorListenerAdapter implements Closeable {
private String name;
private LeaderSelector leaderSelector;
public AtomicInteger leaderCount = new AtomicInteger();
public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name
) {
this.name = name;
this.leaderSelector = new LeaderSelector(client, path, this);
/**
* 自动重新排队
* 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
*/
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
/**
* 获取领导权
*/
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = 2;
System.out.println(name + "成为当前leader");
System.out.println(name + " 之前成为leader的次数:" + leaderCount.getAndIncrement() + "次");
//TODO 其他业务代码
try{
//等待2秒后放弃领导权(模拟业务执行过程)
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
}catch ( InterruptedException e ){
System.err.println(name + "已被中断");
Thread.currentThread().interrupt();
}finally{
System.out.println(name + "放弃领导权\n");
}
}
}
public class TestLeaderElection {
//会话超时时间
private final int SESSION_TIMEOUT = 30 * 1000;
//连接超时时间
private final int CONNECTION_TIMEOUT = 3 * 1000;
//客户端数量
private final int CLIENT_NUMBER = 10;
//ZooKeeper服务地址
private static final String SERVER = "127.0.0.1:2181";
private final String PATH = "/curator/latchPath";
//创建连接实例
private CuratorFramework client = null;
/**
* baseSleepTimeMs:初始的重试等待时间
* maxRetries:最多重试次数
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//自定义LeaderSelectorListenerAdapter实例集合
List<CustomLeaderSelectorListenerAdapter> leaderSelectorListenerList
= new ArrayList<CustomLeaderSelectorListenerAdapter>();
public void init() throws Exception{
//创建 CuratorFrameworkImpl实例
client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
client.start();
for(int i=0;i<CLIENT_NUMBER;i++){
//创建LeaderSelectorListenerAdapter实例
CustomLeaderSelectorListenerAdapter leaderSelectorListener =
new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + i);
leaderSelectorListener.start();
leaderSelectorListenerList.add(leaderSelectorListener);
}
//暂停当前线程,防止单元测试结束,可以让leader选举过程持续进行
TimeUnit.SECONDS.sleep(600);
}
/**
* 测试完毕关闭连接
*/
public void close(){
for(CustomLeaderSelectorListenerAdapter tmp : leaderSelectorListenerList){
CloseableUtils.closeQuietly(tmp);
}
CloseableUtils.closeQuietly(client);
}
public static void main(String[] args)throws Exception{
TestLeaderElection test = new TestLeaderElection();
test.init();
test.close();
}
}
输出:
Client #3成为当前leader
Client #3 之前成为leader的次数:0次
Client #3放弃领导权
Client #2成为当前leader
Client #2 之前成为leader的次数:0次
Client #2放弃领导权
Client #8成为当前leader
Client #8 之前成为leader的次数:0次
Client #8放弃领导权
Client #4成为当前leader
Client #4 之前成为leader的次数:0次
Client #4放弃领导权
Client #9成为当前leader
Client #9 之前成为leader的次数:0次
Client #9放弃领导权
Client #1成为当前leader
Client #1 之前成为leader的次数:0次
Client #1放弃领导权
可见,每次执行完takeLeaderShip的方法后,就会释放领导权。
参考
https://www.zifangsky.cn/1191.html
https://github.com/apache/curator/blob/master/curator-examples/src/main/java/leader