HBase 1.2.0源码分析:HRegionServer启动

RegionServer 实现类org.apache.hadoop.hbase.regionserver.HRegionServer.

类描述:HRegionServer 管理一些 HRegion,使其对 Client 可用。需要与 HMaster 通信,通知状态。(HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There are many HRegionServers in a single HBase deployment.

0. HRegionServer 初始化

构造方法需要参数:
conf 对应配置文件,csm 是一个协调服务,提供启停 Server 等方法

public HRegionServer(Configuration conf) throws IOException, InterruptedException {
    // 构建默认 CoordinatedStateManager
    this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
}

public HRegionServer(Configuration conf, CoordinatedStateManager csm)
      throws IOException, InterruptedException {
    
    // ...
    
    // 测试 hbase.regionserver.codecs 配置的编码方式是可用的
    checkCodecs(this.conf);
    
    // 初始化 userProvider 用于权限认证
    this.userProvider = UserProvider.instantiate(conf);
    
    // 设置 short circuit read buffer,即短路本地读
    // 校验是否跳过 checksum,默认是 false,不推荐跳过
    // dfs.client.read.shortcircuit.skip.checksum
    FSUtils.setupShortCircuitRead(this.conf);
    
    // 设置一系列配置项
    // 设置客户端连接重试次数 hbase.client.retries.number(31)
    this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
    // 线程执行周期 hbase.server.thread.wakefrequency(10 * 1000)    
    // 周期内会执行 CompactionChecker
    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
    // RegionServer 发消息给 Master 时间间隔,单位是毫秒
    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
    this.sleeper = new Sleeper(this.msgInterval, this);

    // 设置 Nonce 标志,初始化 nonceManager
    // 客户端的每次申请及重复申请使用同一个 nonce 进行描述,解决 Client 重复操作提交的情况
    boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
    this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;

    // 向 Master 进行 report 的 region 个数
    // HBase Client 操作的 Timeout 时间
    // HBase rpc 短操作的 Timeout 时间
    // ...

    // 创建 RegionServer 的 RPC 服务端
    rpcServices = createRpcServices();
    
    // 工厂类实例化
    rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
    rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);

    // 用来记录 region server 中所有的 memstore 所占大小
    regionServerAccounting = new RegionServerAccounting();

    // 表示使用 zk 管理状态,并不会对Region状态进行持久化
    // 默认是 true,hbase.assignment.usezk
    useZKForAssignment = ConfigUtil.useZKForAssignment(conf);

    // 初始化 fs,封装hdfs
    this.fs = new HFileSystem(this.conf, useHBaseChecksum);
    
    // 初始化 htable meta 信息
    this.tableDescriptors = new FSTableDescriptors(
      this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);

    // 初始化rs处理任务的线程池,同master的任务线程池
    // 具体任务类型参考 ExecutorType
    service = new ExecutorService(getServerName().toShortString());

    if (!conf.getBoolean("hbase.testing.nocluster", false)) {
        // 创建当前rs与zk的连接
        zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
          rpcServices.isa.getPort(), this, canCreateBaseZNode());

        // csm在分布式环境下用来协调集群所有server的状态信息
        this.csm = (BaseCoordinatedStateManager) csm;
        this.csm.initialize(this);
        this.csm.start();

        // 基于zk实现的分布式的锁管理器,用于锁表
        tableLockManager = TableLockManager.createTableLockManager(
          conf, zooKeeper, serverName);

        // 创建master跟踪器,等待master的启动
        // 在zk节点上注册,zookeeper.znode.master
        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
        masterAddressTracker.start();

        // 创建cluster的跟踪器,等待cluster的启动
        // master注册clusterid到zk节点(zookeeper.znode.state),表示集群已经启动
        clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
        clusterStatusTracker.start();
    }

    // 启动 Region Server RPC服务
    rpcServices.start();
    // 启动 jetty,Region Server WebUI
    putUpWebUI();
    
    // 负责当前 rs 上所有 wal 的log roll(滚动并清理日志)
    this.walRoller = new LogRoller(this, this);
    // 封装了线程池,负责周期性的调度任务
    // 心跳、检查compact、compact完成后hfile清理...
    this.choreService = new ChoreService(getServerName().toString(), true);

}

什么是短路本地读?

在 HDFS 中,读取操作都是通过 Datanode 来进行的。
当客户端向 Datanode 发起读取文件请求时,Datanode 从磁盘读取文件,并且通过 TCP socket 发送给客户端。
所谓的“短路”就是不通过 Datanode,允许客户端直接读取文件。
显然,这仅在客户端与数据位于同一位置的情况下才有可能。短路读取能让许多应用性能显著提升。

什么是Nonce机制

客户端发送 RPC 请求给服务器后,如果响应超时,那么客户端会重复发送请求,直到达到参数配置的重试次数上限。
客户端第一次发送和以后重发请求时,会附带相同的 nonce,服务端只要根据 nonce 进行判断,就能得知是否为同一请求,
并根据之前请求处理的结果,决定是等待、拒绝还是直接处理。

1. HRegionServer 运行

/**
 * The HRegionServer sticks in this loop until closed.
 */
@Override
public void run() {
    
    // 向HMaster注册之前完成一些初始化工作   
    // 在ZK节点 /hbase/rs 下创建当前region server信息的节点,HMaster 监听这个路径
    preRegistrationInitialization();

    try {
        if (!isStopped() && !isAborted()) {
            // 在ZK节点 /hbase/rs 下创建当前region server信息的节点
            createMyEphemeralNode();
            // 加载的 coprocessor,提供 coprocessor 的运行环境
            this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
        }

        while (keepLooping()) { // !this.stopped && isClusterUp();
            // 通知master,region server启动成功
            RegionServerStartupResponse w = reportForDuty();
            if (w == null) {
                this.sleeper.sleep();
            } else {
                // Sets up wal and starts up all server threads.
                handleReportForDutyResponse(w);
                break;
            }
        }
  
        // 启动 rspmHost \ rsQuotaManager
        // ...

        // The main run loop.
        while (!isStopped() && isHealthy()) {
            // 监控ZK节点(zookeeper.znode.state)
            if (!isClusterUp()) {
                // .. 处理集群 down 的情况
                // 关闭所有的 user region
            }
            long now = System.currentTimeMillis();
            // 定期报告心跳
            if ((now - lastMsg) >= msgInterval) {
                tryRegionServerReport(lastMsg, now);
                lastMsg = System.currentTimeMillis();
                doMetrics();
            }
          
            // ...  
        } 
    } catch (Throwable t) {
        // ...
    }
    
    // Run shutdown ...
    // 关闭连接、服务、region、WAL、proxy
    // 清除ZK节点 /hbase/rs (强制删除 znode 存储文件)
    // 关闭rpcClient、rpcservice、monitor、线程池、Zookeeper
    // ...
}

1.1 preRegistrationInitialization

预初始化

private void preRegistrationInitialization(){
    try {
        // 初始化 clusterConnection、metaTableLocator
        setupClusterConnection();
    
        // Health checker thread.
        if (isHealthCheckerConfigured()) {
            int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
            HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
            healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
        }
        this.pauseMonitor = new JvmPauseMonitor(conf);
        pauseMonitor.start();
    
        // 初始化ZK连接
        initializeZooKeeper();
        if (!isStopped() && !isAborted()) {
            // 初始化线程
            initializeThreads();
        }
    } catch (Throwable t) {
        // ...
    }
}

/**
 * 初始化一系列线程、monitor、和连接
 */
private void initializeThreads() throws IOException {
    // Cache flushing thread.
    this.cacheFlusher = new MemStoreFlusher(conf, this);

    // Compaction thread
    this.compactSplitThread = new CompactSplitThread(this);

    // check for compactions
    this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
    
    this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
    
    // creat lease monitor
    this.leases = new Leases(this.threadWakeFrequency);

    // Create the thread to clean the moved regions list
    movedRegionsCleaner = MovedRegionsCleaner.create(this);

    if (this.nonceManager != null) {
        // Create the scheduled chore that cleans up nonces.
        nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
    }

    // Setup the Quota Manager
    rsQuotaManager = new RegionServerQuotaManager(this);
    
    // Setup RPC client for master communication
    rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
        rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());

    boolean onlyMetaRefresh = false;
    int storefileRefreshPeriod = conf.getInt(
        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
      , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
    if (storefileRefreshPeriod == 0) {
        storefileRefreshPeriod = conf.getInt(
          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
        onlyMetaRefresh = true;
    }
    if (storefileRefreshPeriod > 0) {
        this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
          onlyMetaRefresh, this, this);
    }
    registerConfigurationObservers();
}

1.2 handleReportForDutyResponse

启动 WAL 和线程

// response包含conf:
// hbase.regionserver.hostname.seen.by.master
// fs.default.name
// hbase.rootdir
protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
  throws IOException {
    try {
        for (NameStringPair e : c.getMapEntriesList()) {
            String key = e.getName();
            // The hostname the master sees us as.
            if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
                // master 为rs重新定义 hostname
                // rs得到新的 serverName
                String hostnameFromMasterPOV = e.getValue();
                this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
                  rpcServices.isa.getPort(), this.startcode);
            
                // 校验 hostname 
                continue;
            }
            // 覆盖原有 conf
            String value = e.getValue();
            this.conf.set(key, value);
        }

        // ZK节点写到磁盘,用于处理程序异常情况
        ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());

        // 创建 cacheConfig、walFactory
        this.cacheConfig = new CacheConfig(conf);
        this.walFactory = setupWALAndReplication();
        this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));

        // 启动前一步创建的线程,启动ExecutorService
        startServiceThreads();
        startHeapMemoryManager();

        // 通知其他监听线程 rs online
        synchronized (online) {
            online.set(true);
            online.notifyAll();
        }
    } catch (Throwable e) {
        // ...
    }
}

2. HRegionServer主要干以下事情:

  • 在ZK上注册,表明rs启动
  • 跟 HMaster 通知
  • 设置 WAL 和 Replication
  • 注册协作器 RegionServerCoprocessorHost
  • 启动 hlogRoller
  • 定期刷新 memstore
  • 定期检测是否需要压缩合并
  • 启动 lease
  • 启动 jetty
  • 创建 SplitLogWorker,用于拆分 HLog
  • 启动快照管理
  • 创建其他工作线程和对象
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354