HDFS RBF的Connection管理

前言

为了解决HDFS Federation下多集群的维护管理,Hadoop社区实现了Router-Based Federation(HDFS-10467)功能。此功能的强大之处在于它在client和集群NN服务之间新加了一层Router的服务。有了这个Router这个服务后,所有客户端的请求将会由这个Router负责转发给下游的NN节点。这样的话,客户端用户就不需要知道下游有哪些NN节点以及各个NN Namespace里分布着什么样的数据了。这些事情Router都会透明地帮client做完。既然Router在这里主要做了请求转发这件事,那么势必它会存在与下游NN建立新的连接然后进行请求转发的步骤。那么问题来了,Router是怎么做这块的处理呢?如果是每次来一个客户端请求,Router对应建立一个新的connection去请求NN,那么这意味着Router会做大量的连接重建立操作,这显然是不太高效的做法。高效一点的做法是是Router自己维护一定的connection,然后尽可能能够复用其中的connection去请求NN。但是这无疑会增加Router Connection这块的管理工作,而且这里还要避免connection泄漏的问题。本文笔者来简单聊聊Router Connection管理这块的内容,Router是如何做到即高效又安全的Connection管理的。
Connection管理的权衡问题

说到Connection管理,这里始终会存在一个权衡问题:是尽可能维护更多的Connection呢还是尽可能少的维护Connection呢?更多的Connection意味着更高的复用率,但同时可能会造成inactive的Connection过多导致影响到下游服务本身。
因此这里Connection管理不是一刀切的做法,没有固定说一定要cache住多少当前的open的Connection,而是一种更加变通的灵活的管理做法。简单来说,当在需要建立更多connection的时候,我们尽量去cache住这些connection。但是当我们发现当下很多connection在一段时间内没有被用到的时候,我们就及时地对其进行close清理掉。RBF的Connection管理正是巧妙地运用了此策略。
RBF的Connection管理

细粒度的Connection Pool划分
在RBF模式下,Router一方面要面对不同client发来的RPC请求,另一方面它还需要转发请求到多个namespace的NN节点。为了做到不同namespace,不同用户间Connection的隔离,Router在这里按照user/namespace/protocol级别进行了Connection的隔离。简单来说,Router按照上述提到的3个维度进行了ConnectionPool的创建,然后每个ConnectionPool自行再进行connection的管理。
Connection的创建

说到connection的管理,它无外乎两大方面的处理,一是connection的创建,二是connection的清理。这里我们先来看看connection的创建。
Router是在每次获取connection的时候如果发现可用connection不够的话,则尝试进行connection的创建的,相关代码如下:
public ConnectionContext getConnection(UserGroupInformation ugi,
String nnAddress, Class<?> protocol) throws IOException {

...

// 1) 根据user+ns+protocol拼出connectionPoolId
ConnectionPoolId connectionId =
    new ConnectionPoolId(ugi, nnAddress, protocol);
ConnectionPool pool = null;
readLock.lock();
try {
  // 2) 根据connectionPoolId取出对应的ConnectionPool
  pool = this.pools.get(connectionId);
} finally {
  readLock.unlock();
}

// Create the pool if not created before
if (pool == null) {
  ...
}

// 3) 取出一个connection
ConnectionContext conn = pool.getConnection();

// Add a new connection to the pool if it wasn't usable
if (conn == null || !conn.isUsable()) {
  // 4) 如果connection不可用,则将connection pool加入队列让其进行connection的异步创建
  if (!this.creatorQueue.offer(pool)) {
    LOG.error("Cannot add more than {} connections at the same time",
        this.creatorQueueMaxSize);
  }
}

if (conn != null && conn.isClosed()) {
  LOG.error("We got a closed connection from {}", pool);
  conn = null;
}

return conn;

}
1234567891011121314151617181920212223242526272829303132333435363738394041
如上代码所示,creatorQueue队列是拿来临时存放那些需要创建connection的connection pool。此queue将被用在下面的创建connection的thread内。
/**

  • Thread that creates connections asynchronously.
    */
    static class ConnectionCreator extends Thread {
    ...
    @Override
    public void run() {
    while (this.running) {
    try {
    // 从queue中获取connection pool
    ConnectionPool pool = this.queue.take();
    try {
    int total = pool.getNumConnections();
    int active = pool.getNumActiveConnections();
    float poolMinActiveRatio = pool.getMinActiveRatio();
    // 判断此pool内
    // 1) 前活跃的connection数超过最小阈值
    // 2) connection总数不超过最大值限制的话
    // 则进行新的connection的创建
    if (pool.getNumConnections() < pool.getMaxSize() &&
    active >= poolMinActiveRatio * total) {
    ConnectionContext conn = pool.newConnection();
    pool.addConnection(conn);
    } else {
    LOG.debug("Cannot add more than {} connections to {}",
    pool.getMaxSize(), pool);
    }
    } catch (IOException e) {
    LOG.error("Cannot create a new connection", e);
    }
    } catch (InterruptedException e) {
    LOG.error("The connection creator was interrupted");
    this.running = false;
    } catch (Throwable e) {
    LOG.error("Fatal error caught by connection creator ", e);
    }
    }
    }
    1234567891011121314151617181920212223242526272829303132333435363738
    上述实现较为巧妙的点在于它进行了最小活跃connection阈值的设置来确保说新的connection不至于大概率在后面会变成一个无用的connection。
    Connection的清理

Connection管理另一方面的内容是connection的清理。Router在这里采用定期task schedule的方式进行connection的清理的。
this.cleaner.scheduleAtFixedRate(
new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS);
12
CleanupTask清理task代码逻辑如下:
/**

  • Removes stale connections not accessed recently from the pool. This is
  • invoked periodically.
    */
    private class CleanupTask implements Runnable {
@Override
public void run() {
  long currentTime = Time.now();
  List<ConnectionPoolId> toRemove = new LinkedList<>();

  // Look for stale pools
  readLock.lock();
  try {
    for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
      // 1)根据最近一次的活跃时间,查找那些过期的不活跃的connection pool
      ConnectionPool pool = entry.getValue();
      long lastTimeActive = pool.getLastActiveTime();
      boolean isStale =
          currentTime > (lastTimeActive + poolCleanupPeriodMs);
      // 2)如果查找到的情况,则加入pool移除列表
      if (lastTimeActive > 0 && isStale) {
        // Remove this pool
        LOG.debug("Closing and removing stale pool {}", pool);
        pool.close();
        ConnectionPoolId poolId = entry.getKey();
        toRemove.add(poolId);
      } else {
        // 3)如果当前connection pool还是活跃在使用的话,则继续进行此pool内无用connection的清理
        LOG.debug("Cleaning up {}", pool);
        cleanup(pool);
      }
    }
  } finally {
    readLock.unlock();
  }

  // Remove stale pools
  if (!toRemove.isEmpty()) {
    writeLock.lock();
    try {
      for (ConnectionPoolId poolId : toRemove) {
        pools.remove(poolId);
      }
    } finally {
      writeLock.unlock();
    }
  }
}

}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
上面清理的逻辑从stale的connection pool到pool内不活跃的connection两个层面对无用connection进行清理。这样可以避免那些过多无效connection的存在。
最后是Router connection管理简化图: 以上就是本文所要阐述的所有内容,上述涉及到的代码均来自于下文参考链接的ConnectionManager类,感兴趣的同学可自行进行阅读学习。
————————————————
版权声明:本文为CSDN博主「Android路上的人」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Androidlushangderen/article/details/114809319

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 这篇文章写的很优秀,然后自己稍微整理了下。转自:https://matt33.com/2018/07/15/hdf...
    Michaelhbjian阅读 2,028评论 0 5
  • 产生背景 随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管...
    tracy_668阅读 610评论 0 12
  • HDFS核心技术详解 我们都知道Hadoop 主要由HDFS和MapReduce 两个核心部分组成。其中最底部...
    dinel阅读 521评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • 在很多时候,我们会碰到数据融合的需求,比如说原先有A集群,B集群,后来管理员认为有2套集群,数据访问不方便,于是设...
    tracy_668阅读 805评论 0 3