zookeeper Master选举

简述

在分布式环境中,常常需要遇到这样一个场景:对于一个复杂的任务,需要从集群中选举一台机器进行处理即可。这种分布式问题就是“Master选举”。

思路

利用zookeeper的特性很容易实现Master选举功能,思路如下:

选择一个根节点,例如:/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用zookeeper特性,最终只有一个机器能够创建成功,成功的机器就是Master。
master选举还可以利用数据库的主键唯一性进行,这里不在细说。

Curator实现

Curator是对zookeeper的封装,可以调用更简单的api实现选举。Curator提供两种Leader选举策略:

  1. Leader Latch
    随机选取一台机器作为master,除非显示调用close方法释放leadership,否则其他机器无法成为master。这种方案适合主备应用,在主应用宕机后,从剩下的备用应用选出一个成为新的主应用。
  2. Leader Election
    这种选举策略跟Leader Latch选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。

Leader Latch

package com.demo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class LeaderLatchDemo {

    private static final String PATH = "/demo/leader";


    public static void main(String[] args)throws  Exception {

        String SERVER ="127.0.0.1:2181";

        List<LeaderLatch> latchList = new ArrayList<LeaderLatch>();
        List<CuratorFramework> clients = new ArrayList<CuratorFramework>();

        try {

            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient(SERVER);
                final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);

                leaderLatch.addListener(new LeaderLatchListener() {
                    public void isLeader() {
                        System.out.println("I am leader:"+leaderLatch.getId());
                    }

                    public void notLeader() {

                    }
                });
                latchList.add(leaderLatch);
                leaderLatch.start();
            }
            Thread.sleep(5000);
            LeaderLatch leader = null;


            for(LeaderLatch leaderLatch : latchList){
                if(leaderLatch.hasLeadership()){
                    leader = leaderLatch;
                    break;
                }
            }

            System.out.println("当前leader是: " + leader.getId());
            leader.close();
            latchList.remove(leader);

            LeaderLatch firstNode = latchList.get(0); //获取此时第一个节点
            System.out.println("删除leader后,当前第一个节点: " + firstNode.getId());

            firstNode.await(10, TimeUnit.SECONDS); //阻塞并尝试获取领导权,可能失败

            //再次获取当前leader
            for(LeaderLatch tmp : latchList){
                if(tmp.hasLeadership()){
                    leader = tmp;
                    break;
                }
            }

            System.out.println("最终实际leader是: " + leader.getId());

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for(CuratorFramework client : clients){
                CloseableUtils.closeQuietly(client);
            }

            for(LeaderLatch leaderLatch : latchList){
                CloseableUtils.closeQuietly(leaderLatch);
            }
        }
    }

    private static CuratorFramework getClient(String connectString) {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

输出:

client#6:I am leader. I am doing jobs!
当前leader是: client#6
删除leader后,当前第一个节点: client#0
client#9:I am leader. I am doing jobs!
最终实际leader是: client#9

Leader Election


public class CustomLeaderSelectorListenerAdapter  extends
        LeaderSelectorListenerAdapter implements Closeable {

    private String name;
    private LeaderSelector leaderSelector;
    public AtomicInteger leaderCount = new AtomicInteger();

    public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name
    ) {
        this.name = name;
        this.leaderSelector = new LeaderSelector(client, path, this);

        /**
         * 自动重新排队
         * 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
         */
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    /**
     * 获取领导权
     */
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = 2;
        System.out.println(name + "成为当前leader");
        System.out.println(name + " 之前成为leader的次数:" + leaderCount.getAndIncrement() + "次");

        //TODO 其他业务代码
        try{
            //等待2秒后放弃领导权(模拟业务执行过程)
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        }catch ( InterruptedException e ){
            System.err.println(name + "已被中断");
            Thread.currentThread().interrupt();
        }finally{
            System.out.println(name + "放弃领导权\n");
        }

    }
}

public class TestLeaderElection {
    //会话超时时间
    private final int SESSION_TIMEOUT = 30 * 1000;

    //连接超时时间
    private final int CONNECTION_TIMEOUT = 3 * 1000;

    //客户端数量
    private final int CLIENT_NUMBER = 10;

    //ZooKeeper服务地址
    private static final String SERVER = "127.0.0.1:2181";

    private final String PATH = "/curator/latchPath";

    //创建连接实例
    private CuratorFramework client = null;

    /**
     * baseSleepTimeMs:初始的重试等待时间
     * maxRetries:最多重试次数
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    //自定义LeaderSelectorListenerAdapter实例集合
    List<CustomLeaderSelectorListenerAdapter> leaderSelectorListenerList
            = new ArrayList<CustomLeaderSelectorListenerAdapter>();

    public void init() throws Exception{
        //创建 CuratorFrameworkImpl实例
        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
        client.start();

        for(int i=0;i<CLIENT_NUMBER;i++){
            //创建LeaderSelectorListenerAdapter实例
            CustomLeaderSelectorListenerAdapter leaderSelectorListener =
                    new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + i);

            leaderSelectorListener.start();
            leaderSelectorListenerList.add(leaderSelectorListener);
        }

        //暂停当前线程,防止单元测试结束,可以让leader选举过程持续进行
        TimeUnit.SECONDS.sleep(600);
    }

    /**
     * 测试完毕关闭连接
     */
    public void close(){
        for(CustomLeaderSelectorListenerAdapter tmp : leaderSelectorListenerList){
            CloseableUtils.closeQuietly(tmp);
        }

        CloseableUtils.closeQuietly(client);
    }


    public  static void main(String[] args)throws Exception{
        TestLeaderElection test = new TestLeaderElection();
        test.init();
        test.close();
    }
}

输出:

Client #3成为当前leader
Client #3 之前成为leader的次数:0次
Client #3放弃领导权

Client #2成为当前leader
Client #2 之前成为leader的次数:0次
Client #2放弃领导权

Client #8成为当前leader
Client #8 之前成为leader的次数:0次
Client #8放弃领导权

Client #4成为当前leader
Client #4 之前成为leader的次数:0次
Client #4放弃领导权

Client #9成为当前leader
Client #9 之前成为leader的次数:0次
Client #9放弃领导权

Client #1成为当前leader
Client #1 之前成为leader的次数:0次
Client #1放弃领导权

可见,每次执行完takeLeaderShip的方法后,就会释放领导权。

参考

https://www.zifangsky.cn/1191.html
https://github.com/apache/curator/blob/master/curator-examples/src/main/java/leader

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容