Spark 中的 --files 参数与 ConfigFactory 工厂方法

Spark 中的 --files 参数与 ConfigFactory 工厂方法

scala 对象

以前有个大数据项目做小程序统计,读取 HDFS 上的 Parquet 文件,统计完毕后,将结果写入到 MySQL 数据库。首先想到的是将 MySQL 的配置写在代码里面:

val jdbcUrl  = "jdbc:mysql://127.0.0.1:6606/test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=false"
val user     = "root"
val password = "averyloooooongword"
val driver   = "com.mysql.jdbc.Driver"

properties 文件

如果是测试,生产环境各有一套,那上面的代码就要分别复制俩份,不便于维护!后来知道了可以把配置放在 resources 目录下, 针对本地,测试和生产环境,分别创建不同的 properties 文件:

conf.properties  
conf_product.properties 
env.properties  
local.properties

例如其中的 conf.properties 内容如下:

#  测试环境配置

## 数据库配置
jdbc.url=jdbc:mysql://10.0.0.11:3306/ald_xinen_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false
jdbc.user=aldwx
jdbc.pwd=123456
jdbc.driver=com.mysql.jdbc.Driver

# parquet 文件目录
tongji.parquet=hdfs://10.0.0.212:9000/ald_log_parquet

然后在代码里面读取 resource 文件中的配置:

    /**
     * 根据 key 获取 properties 文件中的 value
     * @param key properties 文件中等号左边的键
     * @return 返回 properties 文件中等号右边的值
     */
    public static String getProperty(String key) {
        Properties properties = new Properties();
        InputStream in = ConfigurationUtil.class.getClassLoader().getResourceAsStream(getEnvProperty("env.conf"));
        try {
            properties.load(in);
            in.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return (String) properties.get(key);
    }

这样解决了多个环境中配置不同的问题,只需要复制多个 properties 文件,根据需要修改就行。但是这种方法不是最优的,因为配置不是结构化的,而是通过注释分割了不同的配置。

conf 文件

resources 目录下的文件如下:

application.conf              
application.production.conf      
application.local.conf             
log4j.properties              
metrics.properties

ConfigFactory 工厂方法默认会读取 resources 目录下面名为 application.conf 的文件:

# Spark 相关配置
spark {
  master                   = "local[2]"
  streaming.batch.duration = 5001  // Would normally be `ms` in config but Spark just wants the Long
  eventLog.enabled         = true
  ui.enabled               = true
  ui.port                  = 4040
  metrics.conf             = metrics.properties
  checkpoint.path          = "/tmp/checkpoint/local"
  stopper.port             = 12345
  spark.cleaner.ttl        = 3600
  spark.cleaner.referenceTracking.cleanCheckpoints = true
}

# Kafka 相关配置
kafka {

  metadata.broker.list = "localhost:9092"
  zookeeper.connect    = "localhost:2181"

  topic.dtcdata {
    name = "dc-diagnostic-report"
    partition.num = 1      
    replication.factor = 1  
  }

  group.id             = "group-rds"
  timeOut              = "3000"
  bufferSize           = "100"
  clientId             = "telematics"
  key.serializer.class = "kafka.serializer.StringEncoder"
  serializer.class     = "com.wm.dtc.pipeline.kafka.SourceDataSerializer"
//  serializer.class     = "kafka.serializer.DefaultEncoder"
}

# MySQL 配置
mysql {
  dataSource.maxLifetime              = 800000
  dataSource.idleTimeout              = 600000
  dataSource.maximumPoolSize          = 10
  dataSource.cachePrepStmts           = true
  dataSource.prepStmtCacheSize        = 250
  dataSource.prepStmtCacheSqlLimit    = 204800
  dataSource.useServerPrepStmts       = true
  dataSource.useLocalSessionState     = true
  dataSource.rewriteBatchedStatements = true
  dataSource.cacheResultSetMetadata   = true
  dataSource.cacheServerConfiguration = true
  dataSource.elideSetAutoCommits      = true
  dataSource.maintainTimeStats        = false

  jdbcUrl="jdbc:mysql://127.0.0.1:6606/wmdtc?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=false"
  jdbcDriver="com.mysql.jdbc.Driver"
  dataSource.user="root"
  dataSource.password="123456"
}

为了验证,我创建了一个 Object 对象:

package allinone
import com.typesafe.config.ConfigFactory
import scopt.OptionParser

object SparkFilesArgs extends App  {
  val config = ConfigFactory.load()
  val sparkConf = config.getConfig("spark")
  val sparkMaster = sparkConf.getString("master")
  val sparkDuration = sparkConf.getLong("streaming.batch.duration")
  println(sparkMaster, sparkDuration)
}

如果我直接运行就会打印:

(local[2],5001)

确实是 application.conf 文件中 Spark 的配置。

但是生产环境我们打算使用另外一个配置文件 application.production.conf:

spark {
  master = "yarn"
  streaming.batch.duration = 5002
  eventLog.enabled=true
  ui.enabled = true
  ui.port = 4040
  metrics.conf = metrics.properties
  checkpoint.path = "/tmp/telematics"
  stopper.port = 12345
  spark.cleaner.ttl = 3600
  spark.cleaner.referenceTracking.cleanCheckpoints = true

  trajectory.path = "hdfs://road_match_result"
  city.path = "hdfs://root/data/city.csv"
}

##cassandra相关配置
cassandra {
  keyspace = wmdtc
  cardata.name = can_signal
  trip.name = trip
  latest.name = latest
  latest.interval = 15000

  connection.host = "Bigdata2,Bigdata3,Bigdata4,Bigdata5,Bigdata6"
  write.consistency_level = LOCAL_ONE
  read.consistency_level = LOCAL_ONE
  concurrent.writes = 24
  batch.size.bytes = 65536
  batch.grouping.buffer.size = 1000
  connection.keep_alive_ms = 300000
  auth.username = cihon
  auth.password = cihon
}

kafka {
  metadata.broker.list = "Bigdata2:9092,Bigdata3:9092,Bigdata4:9092,Bigdata5:9092,Bigdata6:9092"
  zookeeper.connect = "Bigdata2:2181,Bigdata3:2181,Bigdata4:2181"

  topic.obddata {
    name = "dtc"
  }

  group.id = "signal"
  timeOut = "3000"
  bufferSize = "100"
  clientId = "telematic"

  key.serializer.class = "kafka.serializer.StringEncoder"
  serializer.class = "com.wm.telematics.pipeline.kafka.SourceDataSerializer"

}

akka {
  loglevel = INFO
  stdout-loglevel = WARNING
  loggers = ["akka.event.slf4j.Slf4jLogger"]
}

##geoService接口URL
webservice {
  url = "http://10.1.108.15:8088/map/roadmessage"
}

##geoService相关配置
geoservice {
  timeout = 3
  useRealData = false
}

既然 ConfigFactory 方法默认读取 application.conf 文件,但是

val config = ConfigFactory.load()

相当于:

val config = ConfigFactory.load("application.conf")

但是 load 方法也接受参数:resourceBasename:

val config = ConfigFactory.load("application.production") // 加载生产环境的配置

这样在代码里面通过加载不同的配置文件实现本地、测试、生产环境的切换和部署,但是在代码里面读取配置还是不够优美!所以我们有 Spark 的 --files 命令行选项。顾名思义,显而易见,也正如官网所描述的那样, --files 参数后面的值是逗号分割的文本文件, 里面有一个 .conf 文件, load 方法会加载 --files 选项传递过来的配置文件:

#!/bin/sh

CONF_DIR=/root/telematics/resources
APP_CONF=application.production.conf
EXECUTOR_JMX_PORT=23339
DRIVER_JMX_PORT=2340

spark-submit \
  --name WM_telematics \
  --class allinone.SparkFilesArgs \
  --master local[*] \
  --deploy-mode client \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 1g \
  --executor-cores 3 \
  --num-executors 3 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.executor.memoryOverhead=4096 \
  --conf spark.driver.memoryOverhead=2048 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --conf spark.network.timeout=1800s \
  --conf spark.scheduler.executorTaskBlacklistTime=30000 \
  --conf spark.core.connection.ack.wait.timeout=300s \
  --files $CONF_DIR/$APP_CONF,$CONF_DIR/log4j.properties,$CONF_DIR/metrics.properties \
  /Users/ohmycloud/work/cihon/sxw/all-in-one/target/allinone-1.0-SNAPSHOT.jar

它打印:

(local[*],5002)

因为我在命令行选项中指定了 master 为 local[*], 配置文件为 application.production.conf

resource not found on classpath: application.conf

本地 localhost

jar 包里面我把 application.conf 给删除了,用 --files 传参数给 spark-submit 的方式,但是报:在 classpath 下找不到 application.conf 这个文件了。

cat spark-submit.sh:

#!/bin/sh

CONF_DIR=/Users/ohmycloud/work/cihon/gac/sources
APP_CONF=application.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark-submit \
  --class $1 \
  --master local[2] \
  --deploy-mode client \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 2g \
  --executor-cores 2 \
  --num-executors 4 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.yarn.executor.memoryOverhead=1024 \
  --conf spark.yarn.driver.memoryOverhead=1024 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --files $CONF_DIR/$APP_CONF \
  /Users/ohmycloud/demo/Spark/WriteParquet2Kafka/target/socket-structured-streaming-1.0-SNAPSHOT.jar

原因是 application.conf 文件所在的路径 /Users/ohmycloud/work/cihon/gac/sources 不在 classpath 里面!

使用

 --driver-class-path /Users/ohmycloud/work/cihon/gac/sources 

而非

 --driver-class-path /Users/ohmycloud/work/cihon/gac/sources/application.conf 

来添加 class path。

#!/bin/sh

CONF_DIR=/Users/ohmycloud/work/cihon/gac/sources
APP_CONF=application.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark-submit \
  --class $1 \
  --master local[2] \
  --deploy-mode client \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 2g \
  --executor-cores 2 \
  --num-executors 4 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.yarn.executor.memoryOverhead=1024 \
  --conf spark.yarn.driver.memoryOverhead=1024 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --driver-class-path /Users/ohmycloud/work/cihon/gac/sources \
  --files $CONF_DIR/$APP_CONF \
  /Users/ohmycloud/demo/Spark/WriteParquet2Kafka/target/socket-structured-streaming-1.0-SNAPSHOT.jar

yarn 模式

yarn 模式下,不需要添加 driver-class-path 了:

#!/bin/sh

CONF_DIR=/root/resources
APP_CONF=application.test.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark2-submit \
  --class $1 \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 2g \
  --executor-cores 2 \
  --num-executors 4 \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.executor.memoryOverhead=1024 \
  --conf spark.driver.memoryOverhead=1024 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --files $CONF_DIR/$APP_CONF,$CONF_DIR/log4j.properties,$CONF_DIR/metrics.properties \
  target/socket-structured-streaming-1.0-SNAPSHOT.jar

但是实际上, 后来发现有时候不行,所以最好还是加上 driver-class-path!

Attention

attention

我在一个离线程序中给配置文件起了一个不带 application 的名字后,程序就报【找不到某个键了】,该成 application.conf 之后就可以了。

Attention Again

如果你什么都做了,但是发现通过 --files 传的文件仍然不生效,那么你可能在 pom 文件里忘记了这个:

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        ...
    </build>

最后

把项目中的 application.conf 改名为不含 application 单词的名字,把传入的配置文件名改为 application.conf

References

Using typesafe config with Spark on Yarn
Externalize properties – typesafe config
Spark Context and Spark Configuration
How to specify custom conf file for Spark Standalone's master?
Scala Load Configuration With PureConfig
Example: Running a Spark application with optional parameters

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容