scala使用par并行集合设置的线程池未关闭导致的内存占用问题

问题产生背景

程序语言使用的是scala。
在处理大量数据时,使用了par函数将普通集合转为并行集合集合,通过多线程并发处理达到提升处理效率的目的,相关代码如下:

for循环{
    ...
  process()
}
def process(): Unit ={
  val map: Map[String, Int] = Map("a"->1, "b"->2)
  val rangePar = map.par
  //新建固定线程数的线程池
  val pool = new collection.parallel.ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
  rangePar.tasksupport = pool
  rangePar.map(
    x=> {
      log.info(s"range Par element : $x")
    }
  )
  pool.environment.shutdown()
}

结果运行一段时间后,发现内存增加了不少而且还在不停地上升
问题分析

从业务上分析,数据量并没有明显增加,于是通过jvisualvm工具查看进程中对象的内存占用情况,发现在短时间内,scala.concurrent.forkjoin.ForkJoinTask[]对象在急剧上升,同时线程数也在不停增加,这些线程创建后,一直没有被释放,且处于waiting状态。这些增长中的没有释放的线程导致内存占用一直上升。

根据ForkJoinTask这个类最终定位到上面的代码部分。然后在本地测试,发现随着循环次数的增加,内存中的线程数会一直上升且未释放掉,当循环次数为10时,就已经有14个线程处于waiting状态(不知道怎么上传图片 - -!)。

因此定位问题是在循环中不停创建的线程池没有即使得到释放造成
问题解决

知道问题就好办了,有如下两个解决办法:

(1) 在循环内每次使用完,就及时关闭资源池,释放占用的内存

valpool = new collection.parallel.ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
rangePar.tasksupport = pool
rangePar.map(
  x=> {
    log.info(s"range Par element : $x")
  }
)
pool.environment.shutdown()

(2) 将资源池对象设置为全局变量共享,这样也可以解决创建大量未释放线程池的问题。不过这种场景下,要处理好多线程下的情况,可以将线程池对象设置为ThreadLocal变量,各线程使用独立的对象,这样在一定程度上也能解决多线程共享的问题。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 13,911评论 1 32
  • 所有知识点已整理成app app下载地址 J2EE 部分: 1.Switch能否用string做参数? 在 Jav...
    侯蛋蛋_阅读 7,444评论 1 4
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 5,691评论 0 12
  • 多线程 1.悲观锁和乐观锁 http://www.importnew.com/21037.html 悲观锁悲观锁,...
    bigfish1129阅读 2,934评论 0 0
  • (一) 在食堂吃饭的时候,刚巧和同楼层其他部门一个认识的同事坐在了一起,这是我上班来我们第一次如此近距离地共进午餐...
    少女勺子阅读 2,999评论 4 3

友情链接更多精彩内容