spark性能优化(一)

本文内容说明

  • 初始化配置给rdd和dataframe带来的影响
  • repartition的相关说明
  • cache&persist的相关说明
  • 性能优化的说明建议以及实例

配置说明

spark:2.4.0
服务器:5台(8核32G)

初始化配置项

%%init_spark
launcher.master = "yarn"
launcher.conf.spark.app.name = "BDP-xw"
launcher.conf.spark.driver.cores = 1
launcher.conf.spark.driver.memory = '1g'
launcher.conf.spark.executor.instances = 3
launcher.conf.spark.executor.memory = '1g'
launcher.conf.spark.executor.cores = 2
launcher.conf.spark.default.parallelism = 5
launcher.conf.spark.dynamicAllocation.enabled = False
import org.apache.spark.sql.SparkSession
var NumExecutors = spark.conf.getOption("spark.num_executors").repr
var ExecutorMemory = spark.conf.getOption("spark.executor.memory").repr
var AppName = spark.conf.getOption("spark.app.name").repr
var max_buffer = spark.conf.getOption("spark.kryoserializer.buffer.max").repr
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.{SparkConf, SparkContext}


object LoadingData_from_files{
    def main(args: Tuple2[String, Array[String]]=Tuple2(hdfs_file, etl_date:Array[String])): Unit = {
        for( a <- etl_date){
            val hdfs_file_ = s"$hdfs_file" + a
            val rdd_20210113 = spark.sparkContext.textFile(hdfs_file_).cache()
            val num1 = rdd_20210113.count
            println(s"加载数据啦:$a RDD的数据量是$num1")
        }
        val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210328").cache()
        var num1 = rdd_20210113_test.count()
        println(s"加载数据啦:20210113 RDD的数据量是$num1")
        rdd_20210113_test.unpersist() // 解除持久化
        val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF.cache()
        num1 = df_20210420.count() // 指定memory之后,cache的数量太多之前cache的结果会被干掉
        println(s"加载数据啦:20210420 DataFrame的数据量是$num1")
    }
}

// 配置参数multiple_duplicated
val hdfs_file = "hdfs://path/etl_date="
val etl_date = Array("20210113","20210112","20210112","20210112","20210112","20210112", "20210113")
LoadingData_from_files.main(hdfs_file, etl_date)
  • 得到结果如下:
spark_个性化配置cache结果对比
spark_个性化配置cache结果RDD&DF对比
  • 结果分析
    • 可以看到默认情况下,RDD的缓存方式都是到Memory的,而DataFrame的缓存方式都是Memory and Disk
    • 指定memory之后,cache的数量太多之前cache的结果会被干掉

无特定配置项

import org.apache.spark.sql.SparkSession
var NumExecutors = spark.conf.getOption("spark.num_executors").repr
var ExecutorMemory = spark.conf.getOption("spark.executor.memory").repr
var AppName = spark.conf.getOption("spark.app.name").repr
object LoadingData_from_files{
    def main(args: Tuple2[String, Array[String]]=Tuple2(hdfs_file, etl_date:Array[String])): Unit = {
        for( a <- etl_date){
            val hdfs_file_ = s"$hdfs_file" + a
            val rdd_20210113 = spark.sparkContext.textFile(hdfs_file_).cache()
            val num1 = rdd_20210113.count
            println(s"加载数据啦:$a RDD的数据量是$num1")
        }
        val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210328").cache()
        var num1 = rdd_20210113_test.count()
        println(s"加载数据啦:20210328 RDD的数据量是$num1")
        rdd_20210113_test.unpersist() // 解除持久化
        val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF.cache()
        num1 = df_20210420.count() // 指定memory之后,cache的数量太多之前cache的结果会被干掉
        println(s"加载数据啦:20210420 DataFrame的数据量是$num1 \n当前环境下cache的个数及id为:")
        spark.sparkContext.getPersistentRDDs.foreach(i=>println("cache的id:" + i._1))
    }
}

// 无配置参数multiple_duplicated
val hdfs_file = "hdfs://path/etl_date="
val etl_date = Array("20210113","20210112","20210112","20210112","20210112","20210112", "20210113" )
LoadingData_from_files.main(hdfs_file, etl_date)
  • 得到结果如下:
  • spark_不做配置cache结果
  • 结果分析

    • spark的配置文件中,设置的也是动态分配内存;
    • cache的结果也是到达memory限制的时候,已经cache的结果会自动消失;
    • 上述例子中,我们增加了8个文件,但最终只保留了5个cache的结果;
      • 通过for重复从一个文件取数,并val声明给相同变量并cache,结果是会被多次保存在memory或者Disk中的;

查看当前服务下的所有缓存并删除

spark.sparkContext.getPersistentRDDs.foreach(i=>println(i._1))
spark.sparkContext.getPersistentRDDs.foreach(i=>{i._2.unpersist()})

repartition

  • repartition只是coalesce接口中shuffle为true的实现
  • repartition 可以增加和减少分区,而使用 coalesce 则只能减少分区
  • 每个block的大小为默认的128M
//RDD
rdd.getNumPartitions
rdd.partitions.length
rdd.partitions.size

// For DataFrame, convert to RDD first
df.rdd.getNumPartitions
df.rdd.partitions.length
df.rdd.partitions.size

RDD

  • 默认cache的级别是Memory
val hdfs_file = "hdfs://path1/etl_date="
val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210113").cache()
// 文件大小为1.5G
rdd_20210113_test.getNumPartitions
// res2: Int = 13
val rdd_20210113_test_par1 = rdd_20210113_test.repartition(5)
rdd_20210113_test_par1.partitions.size
// res9: Int = 5
val rdd_20210113_test_par2 = rdd_20210113_test_par1.coalesce(13)
rdd_20210113_test_par2.partitions.length
// res14: Int = 5 增加分区没生效
val rdd_20210113_test_par3 = rdd_20210113_test_par1.coalesce(3)
rdd_20210113_test_par3.partitions.length 
// res16: Int = 3 增加分区生效

DataFrame

默认cache的级别是Memory and Disk

val hdfs_file = "hdfs://path1/etl_date="
val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF().cache()
df_20210420.rdd.getNumPartitions
// res18: Int = 13
val df_20210420_par1 = df_20210420.repartition(20)
df_20210420_par1.rdd.getNumPartitions
// res19: Int = 20 增加分区生效
val df_20210420_par2 = df_20210420_par1.coalesce(5)
df_20210420_par2.rdd.getNumPartitions
// res20: Int = 5

cache&persist对比

  • cache调用的是无参数版本的persist()
  • persist的说明
import org.apache.spark.storage.StorageLevel._
// MEMORY_AND_DISK
val hdfs_file = "hdfs://path1/etl_date="
var etl_date = "20210113"
var hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_DISK_MEMORY = spark.sparkContext.textFile(hdfs_file_).persist(MEMORY_AND_DISK)
println("DISK_ONLY数据量为" + rdd_20210113_DISK_MEMORY.count())
// MEMORY_ONLY
etl_date = "20210112"
hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_MEMORY_ONLY = spark.sparkContext.textFile(hdfs_file_).persist(MEMORY_ONLY)
println("MEMORY_ONLY数据量为" + rdd_20210113_MEMORY_ONLY.count())
// DISK_ONLY
etl_date = "20210328"
hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_DISK_ONLY = spark.sparkContext.textFile(hdfs_file_).persist(DISK_ONLY)
println("DISK_ONLY数据量为" + rdd_20210113_DISK_ONLY.count())
// DISK_ONLY数据量为4298617
// MEMORY_ONLY数据量为86340
// DISK_ONLY数据量为20000
spark_persist_测试

性能优化

参数说明

火狐截图_spark_参数说明

参数配置建议

  • 优化方面说明

    火狐截图_优化建议
  • tips
    • yarn集群中一般有资源申请上限,如,executor-memory*num-executors < 400G 等,所以调试参数时要注意这一点
    • 如果GC时间较长,可以适当增加--executor-memory的值或者减少--executor-cores的值
    • yarn下每个executor需要的memory = spark-executor-memory + spark.yarn.executor.memoryOverhead.
    • 一般需要为,后台进程留下足够的cores(一般每个节点留一个core)。
    • Yarn ApplicationMaster (AM):ApplicationMaster负责从ResourceManager申请资源并且和NodeManagers一起执行和监控containers和它们的资源消耗。如果我们是spark on yarn模式,那么我们需要为ApplicationMaster预留一些资源(1G和1个Executor)
    • num-executors大(30),executor-cores小(1)→ 每个executor只分配了一个核,将无法运行多个任务的优点
    • num-executors小(5),executor-cores大(7)→ 每个executor分配了7个核,HDFS吞吐量会受到影响。同时过大的内存分配也会导致过多的GC延迟
    • Spark shell required memory = (Driver Memory + 384 MB) + (Number of executors * (Executor memory + 384 MB))

设置资源的示例

  • 资源分配情况【非动态分配资源模式下】
    • contain、CPUVcores、AllocaMemory为hdfs下对具体的application的资源占用情况
    • executors、storage-memory为spark web-ui下的executor情况
Snipaste_spark_资源配置说明
  • spark-shell下资源占用情况
    • 主要查看指定num-executor以及total-executor-cores情况下,资源占用是否仍然会动态变化
    • 还是会动态变化
Snipaste_spark_shell资源配置说明
  • spark-submit下资源占用情况
    • 主要查看指定num-executor以及total-executor-cores情况下,资源占用是否仍然会动态变化
    • 还是会动态变化
    • CPUVcores是因为其默认是*4
Snipaste_spark_submit资源配置说明
  • 设置资源的方式(以我们的集群为例,5台8核32G的服务器)
    • 第一,给每个Executor分配3个core即executor-cores=3,一般设置5对HDFS的吞吐量会比较友好。
    • 第二,为后台进程留一个core,则每个节点可用的core数是8 - 1 = 7。所以集群总的可用core数是7 x 5 = 35。
    • 第三,每个节点上的Executor数就是 7 / 3 = 2,集群总的可用的Executor数就是 2 * 5 = 10。为ApplicationManager留一个Executor,则num-executors=9。
    • 第四,每个节点上每个Executor可分配的内存是 (32GB-1GB) / 2 = 15GB(减去的1GB是留给后台程序用),除去MemoryOverHead=max(384MB, 7% * 15GB)=2GB,所以executor-memory=15GB - 2GB = 12GB。
    • 所以最后的参数配置是:num-executors=9、executor-cores=3、executor-memory=12G
  • 设置资源的方式(调整下,比如要降低GC,executor-memory不需要给那么多)
    • 按照上述方式,得到每个Executor分配到的内存是12GB,但假设8GB内存就够用了
    • 那么此时我们可以将executor-cores降低为2,那么每个节点就可以有7 / 2 = 3个Executor,那么总共可以获得的Executor数就是 (5 * 3) - 1 =14,每个节点上每个Executor可分配的内存是(32GB-1GB) / 3 = 10GB,除去MemoryOverHead=max(384MB, 7% * 10GB)=1GB,所以executor-memory=10GB - 1GB = 9GB
    • 所以最后的参数配置是:num-executors=14、executor-cores=2、executor-memory=9G

查看Memory实际分配情况

// 计算driver Memory的
// spark 分配的实际资源情况
def getSparkMemory():Float={
    val driver_memory_set1 = sc.getConf.getSizeAsBytes("spark.driver.memory")/1024/1024/1024
    val systemMemory = Runtime.getRuntime.maxMemory.toFloat///1024/1024/1024
    // fixed amount of memory for non-storage, non-execution purposes
    val reservedMemory = 300 * 1024 * 1024
    // minimum system memory required
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
    val maxMemory = (usableMemory * memoryFraction).toLong
    import org.apache.spark.network.util.JavaUtils
    val allocateMemory = JavaUtils.byteStringAsMb(maxMemory + "b")
    println(f"driver_memory: $driver_memory_set1%1.1f, allocateMemory: $allocateMemory%1.1f,")
    maxMemory
}

val maxMemory = getSparkMemory()
// driver_memory: 2.0, allocateMemory: 912.0,

// // 查看 spark web ui资源情况
def formatBytes(bytes: Double) = {
  val k = 1000
  val i = math.floor(math.log(bytes) / math.log(k))
  val maxMemoryWebUI = bytes / math.pow(k, i)
  f"$maxMemoryWebUI%1.1f"
}
println(formatBytes(maxMemory))
// 956.6
def allocateMemory(executorMemory:Float=1, executors:Float=1, driverMemory:Float=1):Double={
    val driver_overmemory = Array(384, driverMemory * 0.07).max
    val executor_Memory = Array(384, executorMemory * 0.07).max
    val allocateMemory = (driver_overmemory + driverMemory) + executors * (executor_Memory + executorMemory)
    allocateMemory/1024
}
allocateMemory(1 * 1024, 16, 1 * 1024) 
// res3: Double = 23.375

查看服务环境

  • 通过8088端口proxy查看任务信息http://ip:8088/proxy/application_jobid/executors/
  • 通过8088端口cluster查看任务信息http://ip:8088/cluster/apps
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • 为什么需要调优 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。然而,通过Spark...
    卡卡xx阅读 1,577评论 1 3
  • 前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领...
    Alukar阅读 548评论 0 6
  • 1 前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数...
    wisfern阅读 2,434评论 3 39
  • 前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领...
    Yezhiwei阅读 151评论 0 1
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,520评论 28 53