Jetcd 实现主从选举(示例代码)

@Slf4j
@Component
public class JetcdElectionService implements ApplicationListener<ContextRefreshedEvent> {
    private final Client jetcdClient;
    private final String electionNameText = "/testElection";
    private final String firstNonLoopbackAddress;

    private final AtomicReference<Watch.Watcher> watcher = new AtomicReference<>();
    private final AtomicBoolean connectionExceptionFlag = new AtomicBoolean();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public JetcdElectionService(Client jetcdClient, @Value("${server.port}") int port) {
        this.jetcdClient = jetcdClient;
        this.firstNonLoopbackAddress = "127.0.0.1:" + port;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(this::run).start();
    }

    private void run() {
        ByteSequence electionName = this.getElectionName(this.electionNameText);
        ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
        log.info("[Election] 正在执行选举 [electionName: {}, proposal: {}]...", electionName.toString(), proposal.toString());

        Election electionClient = this.jetcdClient.getElectionClient();
        electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));

        LeaderResponse leader = null;
        try {
            leader = electionClient.leader(electionName).get(); // etcd中没有leader会报错
        } catch (Exception ignored) {
        }

        if (leader == null) {
            log.info("[Election] 检测到leader不存在,当前实例正在尝试参选...");
            this.doElect(electionName, proposal);
        }
    }

    private boolean doElect(ByteSequence electionName, ByteSequence proposal) {
        Lease leaseClient = this.jetcdClient.getLeaseClient();
        Election electionClient = this.jetcdClient.getElectionClient();

        LeaseGrantResponse lease;
        try {
            lease = leaseClient.grant(15).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("[Election] 选举时发生异常,无法获得租约!", e);
            return false;
        }
        long leaseID = lease.getID();

        try {
            electionClient.campaign(electionName, leaseID, proposal).get(5, TimeUnit.SECONDS);

            // leader选举后进行自动续约,请求发送周期为: lease.ttl / 3
            leaseClient.keepAlive(leaseID, new LeaseKeepaliveObserver());
            log.info("[Election] 选举完成,当前实例当选成功!");
            return true;
        } catch (InterruptedException | ExecutionException e) {
            log.error("[Election] 选举时发生异常,无法获得租约!", e);
            return false;
        } catch (TimeoutException e) {
            log.info("[Election] 当前实例未能选中,停止参选中.");
            return false;
        }
    }

    private void handleLeaderResponse(LeaderResponse response) {
        Watch watchClient = this.jetcdClient.getWatchClient();

        KeyValue kv = response.getKv();
        ByteSequence proposalKey = kv.getKey(); // ${electionName}/${随机字符串}
        log.info("[Election] 选举完成,当选实例信息[key: {}, value: {}]", proposalKey.toString(), kv.getValue().toString());

        Watch.Watcher oldWatcher = this.watcher.get();
        if (oldWatcher != null) {
            oldWatcher.close();
        }

        Watch.Watcher newWatcher = watchClient.watch(proposalKey, new LeaderWatchListener(this::handleLeaderChange, this::handleWatchError, this.executor));
        this.watcher.compareAndSet(oldWatcher, newWatcher);
    }

    private void handleLeaderChange(WatchResponse response) {
        ByteSequence electionName = this.getElectionName(this.electionNameText);
        ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
        Election electionClient = this.jetcdClient.getElectionClient();

        List<WatchEvent> events = response.getEvents();
        WatchEvent watchEvent = null;
        if (events != null && !events.isEmpty()) {
            watchEvent = events.get(0);
        }
        WatchEvent.EventType eventType = Optional.ofNullable(watchEvent).map(WatchEvent::getEventType).orElse(null);

        if (this.connectionExceptionFlag.get()) {
            log.info("[Election] [{}] 检测到与etcd连接异常,重新注册observe服务.", eventType);

            electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
            this.connectionExceptionFlag.compareAndSet(true, false);
        } else {
            log.info("[Election] [{}] 检测到leader变动事件,当前实例正在尝试参选...", eventType);
        }
        this.doElect(electionName, proposal); 
    }

    private void handleWatchError(Throwable throwable) {
        // 发现和etcd连接出现了异常
        this.connectionExceptionFlag.compareAndSet(false, true);
    }

    public ByteSequence getElectionName(String electionNameText) {
        return ByteSequence.from(electionNameText, StandardCharsets.UTF_8);
    }

    public ByteSequence getProposal(String firstNonLoopbackAddress) {
        return ByteSequence.from(firstNonLoopbackAddress, StandardCharsets.UTF_8);
    }


    private static class LeaderElectionListener implements Election.Listener {
        private final Consumer<LeaderResponse> leaderResponseConsumer;
        private final ExecutorService executor;

        public LeaderElectionListener(Consumer<LeaderResponse> leaderResponseConsumer, ExecutorService executor) {
            this.leaderResponseConsumer = leaderResponseConsumer;
            this.executor = executor;
        }

        @Override
        public void onNext(LeaderResponse response) {
            CompletableFuture.runAsync(() -> this.leaderResponseConsumer.accept(response), this.executor).exceptionally((e) -> {
                if (e != null) {
                    e.printStackTrace();
                }
                return null;
            });
        }

        @Override
        public void onError(Throwable throwable) {
            log.error(throwable.getMessage(), new RuntimeException(throwable));
        }

        @Override
        public void onCompleted() {
        }
    }

    private static class LeaderWatchListener implements Watch.Listener {
        private final Consumer<WatchResponse> leaderChangeConsumer;
        private final Consumer<Throwable> onErrorConsumer;
        private final ExecutorService executor;

        public LeaderWatchListener(Consumer<WatchResponse> leaderChangeConsumer, Consumer<Throwable> onErrorConsumer, ExecutorService executor) {
            this.leaderChangeConsumer = leaderChangeConsumer;
            this.onErrorConsumer = onErrorConsumer;
            this.executor = executor;
        }

        @Override
        public void onNext(WatchResponse response) {
            CompletableFuture.runAsync(() -> this.leaderChangeConsumer.accept(response), this.executor).exceptionally((e) -> {
                if (e != null) {
                    log.error(e.getMessage(), e);
                }
                return null;
            });
        }

        @Override
        public void onError(Throwable throwable) {
            RuntimeException t = new RuntimeException(throwable);
            log.error(throwable.getMessage(), t);
            this.onErrorConsumer.accept(t);
        }

        @Override
        public void onCompleted() {
        }
    }

    private static class LeaseKeepaliveObserver implements StreamObserver<LeaseKeepAliveResponse> {

        @Override
        public void onNext(LeaseKeepAliveResponse value) {
        }

        @Override
        public void onError(Throwable throwable) {
            log.error(throwable.getMessage(), new RuntimeException(throwable));
        }

        @Override
        public void onCompleted() {
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容