spark thrift server HA

参考:http://liguo86.com/2017/09/25/spark-thriftserver-ha%E6%94%AF%E6%8C%81/

实际使用过程中发现这篇帖子的实现不够严谨,且有错误的地方

1、只考虑了在spark thrift server启动时将服务注册到zookeeper中,但当服务失效后没有考虑注销的问题。这样会导致服务失效后的几分钟后,zookeper内还有改服务的注册信息,这个时候如果客户端尝试通过zookeeper获取实例执行sql查询的话,会导致执行失败。

2、在两处(startWithContext,main)启动hiveserver的途径中增加zookeeper注册服务的代码,代码重复。可以考虑在hiveserver本身的start和stop两个方法入手

3、配置文件中hive.server2.thrift.bind.host属性不能设置为0.0.0.0,这会导致多个spark-thrift-server实例在zookeeper中注册的地址都为无意义的0.0.0.0。这个问题当你在服务启动的节点测试连接的时候是不会出现问题的,因为确实指向了本机……,但当你尝试远程连接的时候就会发现无法正常连接了

最后我是这么改的:

注意,以下修改都是针对spark-hive-thriftserver_XXX.jar所做的修改

1、修改org.apache.spark.sql.hive.thriftserver的内部类HiveThriftServer2

private[hive] class HiveThriftServer2(sqlContext: SQLContext)
  extends HiveServer2
    with ReflectedCompositeService {
  // state is tracked internally so that the server only attempts to shut down if it successfully
  // started, and then once only.
  private val started = new AtomicBoolean(false)
  var hiveConf: HiveConf = _

  override def init(hiveConf: HiveConf) {
    this.hiveConf = hiveConf;
    val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
    setSuperField(this, "cliService", sparkSqlCliService)
    addService(sparkSqlCliService)

    val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
      new ThriftHttpCLIService(sparkSqlCliService)
    } else {
      new ThriftBinaryCLIService(sparkSqlCliService)
    }

    setSuperField(this, "thriftCLIService", thriftCliService)
    addService(thriftCliService)
    initCompositeService(hiveConf)
  }

  private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
    val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
    transportMode.toLowerCase(Locale.ROOT).equals("http")
  }

  override def start(): Unit = {
    super.start()
    started.set(true)

    //增加高可用支持,向zookeeper中注册当前服务
    if (this.hiveConf.getBoolVar(
      ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
      invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
        classOf[HiveConf] -> this.hiveConf)
    }
    //高可用支持代码结束
  }

  override def stop(): Unit = {
    if (started.getAndSet(false)) {

      //关闭zookeeper链接,注销服务
      if (this.hiveConf.getBoolVar(
        ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
        invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
      }
      super.stop()
    }
  }

}

2、修改org.apache.hive.service.server.HiveServer2,增加addServerInstanceToZooKeeper,removeServerInstanceFromZooKeeper两个方法,使第一步的修改能够通过反射调用到

/**
 * StartOptionExecutor: starts HiveServer2.
 * This is the default executor, when no option is specified.
 */
static class StartOptionExecutor implements ServerOptionsExecutor {
    @Override
    public void execute() {
        try {
            startHiveServer2();
        } catch (Throwable t) {
            LOG.fatal("Error starting HiveServer2", t);
            System.exit(-1);
        }
    }
}

private String getServerInstanceURI() throws Exception {
    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
        throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
    }
    return getHiveHost() + ":"
            + thriftCLIService.getPortNumber();
}

private String getHiveHost() {
    HiveConf hiveConf = thriftCLIService.getHiveConf();
    String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
    if (hiveHost != null && !hiveHost.isEmpty()) {
        return hiveHost;
    } else {
        return thriftCLIService.getServerIPAddress().getHostName();
    }
}

/**
 * For a kerberized cluster, we dynamically set up the client's JAAS conf.
 *
 * @param hiveConf
 * @return
 * @throws Exception
 */
private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
    if (UserGroupInformation.isSecurityEnabled()) {
        String principal = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
        if (principal.isEmpty()) {
            throw new IOException("HiveServer2 Kerberos principal is empty");
        }
        String keyTabFile = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
        if (keyTabFile.isEmpty()) {
            throw new IOException("HiveServer2 Kerberos keytab is empty");
        }
        // Install the JAAS Configuration for the runtime
        Utils.setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
    }
}

/**
 * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
 */
private final ACLProvider zooKeeperAclProvider = new ACLProvider() {

    @Override
    public List<ACL> getDefaultAcl() {
        List<ACL> nodeAcls = new ArrayList<ACL>();
        if (UserGroupInformation.isSecurityEnabled()) {
            // Read all to the world
            nodeAcls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE);
            // Create/Delete/Write/Admin to the authenticated user
            nodeAcls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
        } else {
            // ACLs for znodes on a non-kerberized cluster
            // Create/Read/Delete/Write/Admin to the world
            nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
        }
        return nodeAcls;
    }

    @Override
    public List<ACL> getAclForPath(String path) {
        return getDefaultAcl();
    }
};

private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
    this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
}

/**
 * The watcher class which sets the de-register flag when the znode corresponding to this server
 * instance is deleted. Additionally, it shuts down the server if there are no more active client
 * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
 */
private class DeRegisterWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
            if (znode != null) {
                try {
                    znode.close();
                    LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
                            + "The server will be shut down after the last client sesssion completes.");
                } catch (IOException e) {
                    LOG.error("Failed to close the persistent ephemeral znode", e);
                } finally {
                    HiveServer2.this.setDeregisteredWithZooKeeper(true);
                    // If there are no more active client sessions, stop the server
                    if (cliService.getSessionManager().getOpenSessionCount() == 0) {
                        LOG.warn("This instance of HiveServer2 has been removed from the list of server "
                                + "instances available for dynamic service discovery. "
                                + "The last client session has ended - will shutdown now.");
                        HiveServer2.this.stop();
                    }
                }
            }
        }
    }
} 

/**
     * Adds a server instance to ZooKeeper as a znode.
     *
     * @param hiveConf
     * @throws Exception
     */
    private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
        String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
        String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
        String instanceURI = getServerInstanceURI();
        setUpZooKeeperAuth(hiveConf);
        int sessionTimeout =
                (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
                        TimeUnit.MILLISECONDS);
        int baseSleepTime =
                (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
                        TimeUnit.MILLISECONDS);
        int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
        // Create a CuratorFramework instance to be used as the ZooKeeper client
        // Use the zooKeeperAclProvider to create appropriate ACLs
        zooKeeperClient =
                CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
                        .sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider)
                        .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
        zooKeeperClient.start();
        // Create the parent znodes recursively; ignore if the parent already exists.
        try {
            zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                    .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
            LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
        } catch (KeeperException e) {
            if (e.code() != KeeperException.Code.NODEEXISTS) {
                LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
                throw e;
            }
        }
        // Create a znode under the rootNamespace parent for this instance of the server
        // Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber
        try {
            String pathPrefix =
                    ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
                            + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
                            + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
//            String znodeData = "";
//            if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) {
//                // HiveServer2 configs that this instance will publish to ZooKeeper,
//                // so that the clients can read these and configure themselves properly.
//                Map<String, String> confsToPublish = new HashMap<String, String>();
//                addConfsToPublish(hiveConf, confsToPublish);
//                // Publish configs for this instance as the data on the node
//                znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
//            } else {
//                znodeData = instanceURI;
//            }
            byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
            znode =
                    new PersistentEphemeralNode(zooKeeperClient,
                            PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
            znode.start();
            // We'll wait for 120s for node creation
            long znodeCreationTimeout = 120;
            if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
                throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
            }
            setDeregisteredWithZooKeeper(false);
            znodePath = znode.getActualPath();
            // Set a watch on the znode
            if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
                // No node exists, throw exception
                throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
            }
            LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
        } catch (Exception e) {
            LOG.fatal("Unable to create a znode for this server instance", e);
            if (znode != null) {
                znode.close();
            }
            throw (e);
        }
    }

    private void removeServerInstanceFromZooKeeper() throws Exception {
        setDeregisteredWithZooKeeper(true);

        if (znode != null) {
            znode.close();
        }
        zooKeeperClient.close();
        LOG.info("Server instance removed from ZooKeeper.");
    }

另外,如果考虑提供的spark-thrift-server服务可能为集群外调用,且无法通过hostname连接到服务,就需要修改zookeeper中的注册信息为ip

主要修改就是HiveServer2类中的getServerInstanceURI方法

private String getServerInstanceURI() throws Exception {
    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
        throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
    }
    return getHiveHost() + ":"
            + thriftCLIService.getPortNumber();
}

private String getHiveHost() {
    HiveConf hiveConf = thriftCLIService.getHiveConf();
    String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
    if (hiveHost != null && !hiveHost.isEmpty()) {
        return hiveHost;
    } else {
        return thriftCLIService.getServerIPAddress().getHostName();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容