我们可以自己实现一个SparkConfig
默认配置工具以便统一管理
package com.yzy.spark;
import org.apache.spark.SparkConf;
public class SparkConfig {
private static SparkConf sparkConf;
private SparkConfig() {
}
public static SparkConf getSparkConf() {
if (sparkConf == null) {
sparkConf = new SparkConf();
sparkConf.set("spark.io.compression.codec", "lz4");
sparkConf.set("spark.rdd.compress", "true");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.streaming.ui.retainedBatches", "200");
sparkConf.set("spark.ui.retainedJobs", "200");
sparkConf.set("spark.ui.retainedStages", "500");
sparkConf.set("spark.sql.ui.retainedExecutions", "200");
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
sparkConf.set("spark.yarn.maxAppAttempts", "4");
sparkConf.set("spark.yarn.am.attemptFailuresValidityInterval", "1h");
sparkConf.set("spark.yarn.max.executor.failures", "16");
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "1h");
sparkConf.set("spark.task.maxFailures", "8");
sparkConf.set("spark.hadoop.fs.hdfs.impl.disable.cache", "true");
sparkConf.set("spark.streaming.kafka.maxRetries", "3");
sparkConf.set("spark.default.parallelism", "40");
sparkConf.set("spark.sql.shuffle.partitions", "40");
sparkConf.set("spark.streaming.blockInterval", "1000ms");
}
return sparkConf;
}
}
//初始化sparkConf
SparkConf sparkConf = SparkConfig.getSparkConf().setMaster(master).setAppName(appName);
上述配置仅供参考,Spark 配置详情请参见官方文档