SparkThriftServer内存泄漏排查

STS(SparkThrfitServer)版本

  • spark-3.2.1-bin-hadoop3.2

问题表现

  • Spark UI 经常无响应
  • STS 经常挂掉

问题分析

获取heap.hprof和gc.log

spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/spark/hbi/logs/ -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC -XX:+PrintReferenceGC -XX:+PrintGCApplicationStoppedTime -Xloggc:/home/spark/hbi/logs/gc-%t.log

堆对象分析

  • 工具:jprofiler

大对象

heap.hprof

Hashtable引用链

Hashtable

ConcurrentHashMap引用链

ConcurrentHashmap

GC趋势

  • 工具:gcviewer
  • 黄色:新生代
  • 红色:老年代
  • 黑色:full gc
gc.log

结论

  1. 确定是内存泄漏了,而且内存大对象有两个
    a. Hashtable
    b. ConcurrentHashmap
  2. 两个大对象都被该对象引用
    a. SparkExecuteStatementOperation

STS架构

https://blog.51cto.com/u_15259710/4797692
https://www.jianshu.com/p/b719c6415411

Spark Thrift Server大量复用了HiveServer2的代码。

HiveServer2的架构主要是通过ThriftCLIService监听端口,然后获取请求后委托给CLIService处理。CLIService又一层层的委托,最终交给OperationManager处理。OperationManager会根据请求的类型创建一个Operation的具体实现处理。比如Hive中执行sql的Operation实现是SQLOperation。

Spark Thrift Server做的事情就是实现自己的CLIService——SparkSQLCLIService,接着也实现了SparkSQLSessionManager以及SparkSQLOperationManager。另外还实现了一个处理sql的Operation——SparkExecuteStatementOperation。这样,当Spark Thrift Server启动后,对于sql的执行就会最终交给SparkExecuteStatementOperation了。

Spark Thrift Server其实就重写了处理sql的逻辑,其他的请求处理就完全复用HiveServer2的代码了。比如建表、删除表、建view等操作,全部使用的是Hive的代码。

推测原因

大对象引用者

  • org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
    • 注:下文中SparkExecuteStatementOperation统称为SESO

SESO引用者

  • org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1
    • 这是个什么东西?

SESO$$anon$1

  • 通过反编译,发现该类是SparkExecuteStatementOperation的内部类
    • 工具:jd-gui
  • $$anon$1通过$outer引用了SESO
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation;
import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$;
import scala.Option;
import scala.util.control.NonFatal$;

public final class null implements Runnable {
  private final ScheduledExecutorService timeoutExecutor$1;
  
  public null(SparkExecuteStatementOperation $outer, ScheduledExecutorService timeoutExecutor$1) {}
  
  public void run() {
    try {
      this.$outer.timeoutCancel();
    } catch (Throwable throwable1) {
      Throwable throwable2 = throwable1;
      Option option = NonFatal$.MODULE$.unapply(throwable2);
    } finally {
      this.timeoutExecutor$1.shutdown();
    } 
  }
}

SESO内部类源码

  • 注意:timeout单位是TimeUnit.SECONDS
# 核心变量timeout
// If a timeout value `queryTimeout` is specified by users and it is smaller than
// a global timeout value, we use the user-specified value.
// This code follows the Hive timeout behaviour (See #29933 for details).
private val timeout = {
  val globalTimeout = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT)
  if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < queryTimeout)) {
    globalTimeout
  } else {
    queryTimeout
  }
}

# 中间代码省略...

# 内部类创建逻辑
if (timeout > 0) {
  val timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
  timeoutExecutor.schedule(new Runnable {
    override def run(): Unit = {
      try {
        timeoutCancel()
      } catch {
        case NonFatal(e) =>
          setOperationException(new HiveSQLException(e))
          logError(s"Error cancelling the query after timeout: $timeout seconds")
      } finally {
        timeoutExecutor.shutdown()
      }
    }
  }, timeout, TimeUnit.SECONDS)
}

原因推理

客户端

  • 请求超时时间queryTimeout > 0

服务端

  1. timeout默认等于客户端传过来的queryTimeout

  2. timeout大于0时,会创建单线程的线程池,提交延迟任务,延迟任务在timeout时间后开始执行,执行完成后会停止线程池
    a. 注意1:延迟任务作为SESO的内部类,会持有SESO的引用
    b. 注意2:timeout单位是seconds

3.问题原因:如果timeout时间很长,而且sql执行时间很短,如果短时间内有大量的查询,那么这些线程在timeout时间内一直持有延迟任务的引用,也就是间接持有SESO对象的引用,这就是内存溢出的原因

验证推理

跟踪timeout传入链

  • 找到入口方法
org.apache.hive.service.cli.CLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map<java.lang.String,java.lang.String>, long)

监控命令

  • 工具:arthas
  • params[3]: timeout
# arthas watch 命令
watch org.apache.hive.service.cli.CLIService executeStatementAsync '{params}' 'params[3]>0' -x 2
watch method

提出疑问

  • 为什么 "SELECT 1" 的timeout是60000s(16.7h)

服务分析

哪个服务

  • 通过STS日志里查找sessionId,定位服务所在节点IP,从而定位到传入queryTimeout=60000的服务
  • 最终定位有问题的服务:hbi-query-engine
sts.log

原有连接池配置

# 连接池配置
engine.ds.hive.read.type = com.alibaba.druid.pool.DruidDataSource
# 单位是seconds,也就是16.7小时(Druid在用"SELECT 1"探活时使用该值)
engine.ds.hive.read.pool.queryTimeout=60000
# 单位是mills,也就是60秒(Druid没有使用该配置)
engine.ds.hive.read.pool.validationTimeout=60000
configDruid

改进后连接池配置

# 连接池配置
engine.ds.hive.read.type = com.zaxxer.hikari.HikariDataSource
# 单位是seconds,也就是5分钟
engine.ds.hive.read.pool.queryTimeout=300
# 单位是mills,也就是60秒(Hikari在用"SELECT 1"探活时使用该值)
engine.ds.hive.read.pool.validationTimeout=60000
configHikari

DruidDataSource探测流程

调用栈

stack

setQueryTimeout核心方法

setQueryTimeout

HikariDataSource探测流程

调用栈

stack

setQueryTimeout核心方法

setQueryTimeout

解决方案

  • 为了快速解决问题,目前采取方案1(或者方案2),方案3有时间再尝试

方案1

  1. 根据连接池不同,设置合理的queryTimeout、validationTimeout
    a. 注意1:queryTimeout的单位是seconds
    b. 注意2:validationTimeout单位是mills

方案2

  1. 在sts端配置globalTimeout,也就是spark.sql.thriftServer.queryTimeout
    a. 注:如果globalTimeout小于queryTimeout,则以globalTimeout为准

方案3

  1. 源码修复STS
    a. 思路:即时释放查询资源(线程池:timeoutExecutor)
    b. 实现:timeoutExecutor作为SESO的字段,并在SESO对象close时,调用timeoutExecutor.shutdown()

优化效果

全堆大小

Heap After GC

年轻代大小

Young Gen

老年代大小

Old Gen

工具列表

  1. jprofiler: 分析堆内存
  2. gcviewer: 分析垃圾回收
  3. gceasy: 分析垃圾回收 在线分析入口
  4. arthas: 分析方法调用
  5. jd-gui: 反编译

参考文档

  1. Hive内存溢出示例:https://www.jianshu.com/p/88626013b39f
  2. STS架构:https://www.jianshu.com/p/b719c6415411
  3. STS架构VS Kyuubi:https://blog.51cto.com/u_15259710/4797692
  4. Arthas watch:https://arthas.aliyun.com/doc/watch.html
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容