Spark HiveThriftServer2 高可用的实现

Spark HiveThriftServer高可用的问题

spark HiveThriftServer 继承了HiveServer2,但是却没有继承HiveServer2的HA机制,现在我们通过修改源码的方式来实现HiveThriftServer的高可用,基本原理是在zk上注册多个服务的连接地址,与HiveServer2的使用方式相同

涉及类及源码修改

  • spark HiveThriftServer的入口类为HiveThriftServer2,该object有个main方法入口,我们看下这里的main方法做了什么
  def main(args: Array[String]) {
    // 解析命令行参数
    Utils.initDaemon(log)
    val optionsProcessor = new HiveServer2.ServerOptionsProcessor("HiveThriftServer2")
    optionsProcessor.parse(args)

    logInfo("Starting SparkContext")
    // 初始化环境
    SparkSQLEnv.init()

    ShutdownHookManager.addShutdownHook { () =>
      SparkSQLEnv.stop()
      uiTab.foreach(_.detach())
    }

    val executionHive = HiveUtils.newClientForExecution(
      SparkSQLEnv.sqlContext.sparkContext.conf,
      SparkSQLEnv.sqlContext.sessionState.newHadoopConf())

    try {
      /**
        * 实例化hiveThriftServer2
        */
      val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
      server.init(executionHive.conf)
      server.start()
      logInfo("HiveThriftServer2 started")
      listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
      SparkSQLEnv.sparkContext.addSparkListener(listener)
      uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
        Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
      } else {
        None
      }
      // If application was killed before HiveThriftServer2 start successfully then SparkSubmit
      // process can not exit, so check whether if SparkContext was stopped.
      if (SparkSQLEnv.sparkContext.stopped.get()) {
        logError("SparkContext has stopped even if HiveServer2 has started, so exit")
        System.exit(-1)
      }
    } catch {
      case e: Exception =>
        logError("Error starting HiveThriftServer2", e)
        System.exit(-1)
    }
  }
  • 我们看到这里new了一个HiveThriftServer2的对象,我们进这个对象看一下
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
  extends HiveServer2
  with ReflectedCompositeService {
  // state is tracked internally so that the server only attempts to shut down if it successfully
  // started, and then once only.
  private val started = new AtomicBoolean(false)

  // todo: 新加的
  var hiveConf:HiveConf = _
  /**
    * 初始化hiveThriftServer
    */
  override def init(hiveConf: HiveConf) {
    this.hiveConf = hiveConf
    val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
    setSuperField(this, "cliService", sparkSqlCliService)
    addService(sparkSqlCliService)

    val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
      new ThriftHttpCLIService(sparkSqlCliService)
    } else {
      new ThriftBinaryCLIService(sparkSqlCliService)
    }

    setSuperField(this, "thriftCLIService", thriftCliService)
    addService(thriftCliService)
    initCompositeService(hiveConf)
  }

  private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
    val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
    transportMode.toLowerCase(Locale.ROOT).equals("http")
  }

  override def start(): Unit = {
    super.start()
    started.set(true)
    /**
      * todo: 这里使用了HiveServer的高可用的配置
      */
    if (this.hiveConf.getBoolVar(
      ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
      /*
      * addServerInstanceToZooKeeper 把hiveServer2注册到zookeeper
      * */
      invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
        classOf[HiveConf] -> this.hiveConf)
    }

  }

  override def stop(): Unit = {
    /**
      * todo: 停止的时候,将zookeeper上的注册信息删除
      */
    if (started.getAndSet(false)) {
      if (this.hiveConf.getBoolVar(
        ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
        invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
      }
      super.stop()
    }
  }
 }
  • 这里的方法也很简单,一个初始化,一个start()方法,一个stop()方法,代码中todo的部分是我新加的代码,改动也很少
  • 新加hiveConf的成员变量,记录初始化时的配置,以便后面用
  • start方法: 在zookeeper中注册服务信息
  • stop方法: 在zookeeper中删除注册信息
  • 这里主要是应用了反射,在HiveServer中添加了注册和删除zk的信息,我们来到HiveServer2的代码中看一下如何修改
  /*#################################     新增方法      ############################################*/

  //获取thriftServer的IP:HOST
  private String getServerInstanceURI() throws Exception {
    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
      throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
    }

    return getHiveHost() + ":"
        + thriftCLIService.getPortNumber();
  }

  private String getHiveHost() {
    HiveConf hiveConf = thriftCLIService.getHiveConf();
    String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
    if (hiveHost != null && !hiveHost.isEmpty()) {
      return hiveHost;
    } else {
      return thriftCLIService.getServerIPAddress().getHostName();
    }
  }
  
  /**
   * 控制是否需要重新在zookeeper上注册HiveServer2
   * */
  private boolean deregisteredWithZooKeeper = false;
  private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
    this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
  }

  /**
   * zk的监控者,如果发现注册信息为null,会触发监控,然后关掉当前注册hiveServer2的实例信息
   */
  private PersistentEphemeralNode znode;
  private class DeRegisterWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
      if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
        if (znode != null) {
          try {
            znode.close();
            LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
                + "The server will be shut down after the last client sesssion completes.");
          } catch (IOException e) {
            LOG.error("Failed to close the persistent ephemeral znode", e);
          } finally {
            HiveServer2.this.setDeregisteredWithZooKeeper(true);
            // 如果当前已经没有可用的服务,那么就把HiveServer2关闭掉
            if (cliService.getSessionManager().getOpenSessionCount() == 0) {
              LOG.warn("This instance of HiveServer2 has been removed from the list of server "
                  + "instances available for dynamic service discovery. "
                  + "The last client session has ended - will shutdown now.");
              HiveServer2.this.stop();
            }
          }
        }
      }
    }
  }

  private CuratorFramework zooKeeperClient;
  private String znodePath;
  /**
   * 把服务注册到zookeeper中
   * @param hiveConf
   * @throws Exception
   */
  private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
    //从hiveConf中获取zookeeper地址
    String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
    //从hive-site.xml中获取hive.server2.zookeeper.namespace的配置信息
    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
    //获取用户提供的thriftServer地址
    String instanceURI = getServerInstanceURI();
    
    //获取hive连接zookeeper的session超时时间
    int sessionTimeout =
        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
            TimeUnit.MILLISECONDS);
    //hive连接zookeeper的等待时间
    int baseSleepTime =
        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
            TimeUnit.MILLISECONDS);
    //hive连接zookeeper的最大重试次数
    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
    // 获取zookeeper客户端
    zooKeeperClient =
        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
            .sessionTimeoutMs(sessionTimeout)
            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
    //启动zookeeper客户端
    zooKeeperClient.start();
    //TODO 在zookeeper上根据rootNamespace创建一个空间(用来存储数据的文件夹)
    try {
      zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
          .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
      LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
    } catch (KeeperException e) {
      if (e.code() != KeeperException.Code.NODEEXISTS) {
        LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
        throw e;
      }
    }
    //TODO 把hiveserver2的信息注册到rootNamespace下:
    // serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
    try {
      String pathPrefix =
          ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
              + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
              + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
      byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
      znode =
          new PersistentEphemeralNode(zooKeeperClient,
              PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
      znode.start();
      // We'll wait for 120s for node creation
      long znodeCreationTimeout = 120;
      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
        throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
      }
      setDeregisteredWithZooKeeper(false);
      znodePath = znode.getActualPath();
      // TODO 添加zk的watch , 如果服务不见了,需要第一时间watche到
      if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
        // No node exists, throw exception
        throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
      }
      LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
    } catch (Exception e) {
      LOG.fatal("Unable to create a znode for this server instance", e);
      if (znode != null) {
        znode.close();
      }
      throw (e);
    }
  }

  //移除znode,代表当前程序关闭
  private void removeServerInstanceFromZooKeeper() throws Exception {
    setDeregisteredWithZooKeeper(true);

    if (znode != null) {
      znode.close();
    }
    zooKeeperClient.close();
    LOG.info("Server instance removed from ZooKeeper.");
  }
  • 这里主要是对HiveServer2添加了两个方法,及HiveThriftServer2中通过反射调用的两个方法,分别是在启动的时候,在zk指定的地址上注册连接信息,以及停止的时候删除对应位置的连接信息。
  • 新建了一个zkWatcher监控
  • 注意这里zk存储的数据信息,必须是: serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005的形式,主要是为了复用hiveServer2的高可用

编译

mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package 
  • 得到的hive-thriftServer.jar包替换spark lib目录下的jar包即可

相关配置

  • 这里的高可用的配置与hiveServer2的高可用配置是一致的,在hive-site.xml文件中添加
    <name>hive.server2.support.dynamic.service.discovery</name>
    <value>true</value>
</property>
<property>
    <name>hive.server2.zookeeper.namespace</name>
    <value>hiveserver2_zk</value>
</property>

<property
    <name>hive.zookeeper.quorum</name>
    <value>cdh1:2181,cdh2:2181,cdh3:2181</value>
</property>
<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
</property>
<property>
    <name>hive.server2.thrift.bind.host</name>
    <value>cdh1</value>
</property>
  • 另外,因为我们需要以spark的sbin/start-thriftserver.sh脚本启动进程,脚本中调用了spark-daemon.sh脚本实际启动任务,而这个脚本中的有关于重复应用的检查,即同一台机器上启动多个相同应用会报错,如果我们要在同一台机器上启动两个hiveThriftServer的话,我们需要对spark-daemon.sh做一些修改
# 屏蔽以下脚本###########################################
# if [ -f "$pid" ]; then
#   TARGET_ID="$(cat "$pid")"
#   if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
#     echo "$command running as process $TARGET_ID.  Stop it first."
#     exit 1
#   fi
# fi
########################################################

启动

  • 启动命令
sbin/start-thriftserver.sh \
--master yarn \
--conf spark.driver.memory=1G \
--executor-memory 1G \
--num-executors 1 \
--hiveconf hive.server2.thrift.port=10002
  • beeline连接命令
!connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容