销售预测案例源码分析

本文重在借案例学习spark相关数据结构与语法

流程

1. 特征转换

val stateHolidayIndexer = new StringIndexer()
    .setInputCol("StateHoliday")
    .setOutputCol("StateHolidayIndex")
  val schoolHolidayIndexer = new StringIndexer()
    .setInputCol("SchoolHoliday")
    .setOutputCol("SchoolHolidayIndex")
  val stateHolidayEncoder = new OneHotEncoder()
    .setInputCol("StateHolidayIndex")
    .setOutputCol("StateHolidayVec")
  val schoolHolidayEncoder = new OneHotEncoder()
    .setInputCol("SchoolHolidayIndex")
    .setOutputCol("SchoolHolidayVec")
  val dayOfMonthEncoder = new OneHotEncoder()
    .setInputCol("DayOfMonth")
    .setOutputCol("DayOfMonthVec")
  val dayOfWeekEncoder = new OneHotEncoder()
    .setInputCol("DayOfWeek")
    .setOutputCol("DayOfWeekVec")
  val storeEncoder = new OneHotEncoder()
    .setInputCol("Store")
    .setOutputCol("StoreVec")

  val assembler = new VectorAssembler()
    .setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
      "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
    .setOutputCol("features")
  • 先转化为StringIndexer

    • inputCol原始列
    • outputCol转化为对应的index列:
      • 从0开始编号,出现频次最多的项目,编号小

      • 有时候会有着这样的场景

      • 用一个df转换另一个df,当df2对应列中的值超出了df1中的范围时,可以选择策略

        • skip:忽略掉
        • keep:超出项对应分配一个index
        • 默认为抛出异常
        val indexed2 = indexer.fit(df1).setHandleInvalid("skip").transform(df2)
        
  • 做OneHotEncoder

    • 转化为对应向量
    • 只指定一位为1,其余为0,出现频率最低的为(最终序号, [], [])
  • VectorAssembler

    • 将对应元素合并成一个向量,打平

2. 环境初始化(面向像我这样的小白选手)

main中 大部分抄袭文档

val conf = new SparkConf().setAppName("alithink").setMaster("local")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().getOrCreate()
  • SparkConf:
    • Spark各种key-value的配置项
      • setAppName: 给你的应用配置一个名字
      • setMaster: 连接到的主URL,例如这里的local代表本地单线程运行,local[4]本地4核运行,或者spark://master:7077 spark典型的Mater/slave模式
  • SparkContext:
    • 理解为与spark集群的对接人,可以用她来创建RDDs, accumulators 和 broadcast variables
    • 每个JVM环境活着的SparkContext只有一个,创建一个新的前先stop(将来这个限制可能会被移除)
  • SparkSession:
    • 合并了SparkContext和SQLContext
      • 内部有对应属性在需要时可以取得对应实例
    • 用于操作DataSet和DataFrame API
    • 使用:
      • REPL已经预先创建了(比如spark-shell, zeppelin)

      • 获取已经存在的或者新创建一个:

        • SparkSession.builder().getOrCreate()
          • 前提是sparkContext已经创建
        • 尽量用SparkSession来接管一切吧(上述代码可以改为如下)
        val conf = new SparkConf().setAppName("alithink").setMaster("local")
        // val sc = new SparkContext(conf)
        // val sparkSession = SparkSession.builder().getOrCreate()
        
        val sparkSession = SparkSession.builder
                         //.master("local")
                         //.appName("alithink")
                         .config(conf)
                         .getOrCreate()
        

3. 训练数据整理

// main中调用
val data = loadTrainingData(sparkSession, "/Users/alithink/Space/common_data/train.csv")
// 具体实现函数
def loadTrainingData(sqlContext:SparkSession, filePath:String):DataFrame = {
  val trainRaw = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .load(filePath)
    .repartition(30)
  trainRaw.createOrReplaceTempView("raw_training_data")
   
  sqlContext.sql("""SELECT
    double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek)   DayOfWeek,
    StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
    FROM raw_training_data
    """).na.drop()
}
  • SparkSession:
    • read 返回一个DataFrameReader
      • format(读取格式):com.databricks.spark.csv期初为一个开源库,后来已经集成到spark2.*啦
      • option("header", "true") 使用第一行作为头
      • 赠送 .option("inferSchema", "true") 自动推导类型
  • DataFrame(粗略一说,内容太多_):
    • DataSet[Row]
    • DataFrame vs RDD


      image
    • DataFrame vs DataSet
      • 往往区别是在于行类型的不确定与确定
  • DataSet:
    • repartition: 返回按规则分区后的dataset
      • 一句话:分区由少变多,或者在一些不是键值对的RDD中想要重新分区的话,就需要使用repartition了
      • 有多变少,直接coalesce,repartition其实就是shuffle=true的coalesce
      • 关于分区:分区的个数决定了并行计算的粒度
    • createOrReplaceTempView:
      • 创建本地临时‘表’,便于之后sql操作
  • sql:
    • na.drop() 丢掉所有包含null的row

4. 线性回归(随机森林类似,换了方法以及ParamMaps)

def preppedLRPipeline():TrainValidationSplit = {
    val lr = new LinearRegression()
    
    val paramGrid = new ParamGridBuilder()
     .addGrid(lr.regParam, Array(0.1, 0.01))
     .addGrid(lr.fitIntercept)
     .addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
     .build()
    
    val pipeline = new Pipeline()
     .setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
       stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
       dayOfWeekEncoder, dayOfMonthEncoder,
       assembler, lr))
    
    val tvs = new TrainValidationSplit()
     .setEstimator(pipeline)
     .setEvaluator(new RegressionEvaluator)
     .setEstimatorParamMaps(paramGrid)
     .setTrainRatio(0.75)
    tvs
}
  • LinearRegression:
    • spark mllib自带的线性回归,支持多种类型的正则方法(具体算法迷茫中)
      • Lasso L1
      • ridge L2
      • elastic net L2 + L1
      • none
  • ParamGridBuilder:
    • 参数网格:
      • 通过不同参数的组合,形成大量参数调优组合后的模型
      • 然后用对应的验证评估方法去择优
    • regParam:定义规范化项的权重
    • elasticNetParam:Elastic net参数,取值介于0,1
    • 这里elaticNetParam设置5个值,regParam2个值,代表会有 5*2=10个不同的模型被训练。
  • Pipeline:
    • 由一个个stages组成,每一个stage可以是estimator或者transformer
    • fit model时触发
  • TrainValidationSplit:
    • 参数调整检验。
    • 随机将输入的dataset划分为训练集和验证集,使用评估机制选择效果最好的模型。
  • RegressionEvaluator:
    • 上面说的用于验证模型效果的evaluator

5. 模型训练与验证

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
    val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
    logger.info("Fitting data")
    val model = tvs.fit(training)
    logger.info("Now performing test on hold out set")
    val holdout = model.transform(test).select("prediction","label")

    // have to do a type conversion for RegressionMetrics
    val rm = new RegressionMetrics(holdout.rdd.map(x =>
      (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

    logger.info("Test Metrics")
    logger.info("Test Explained Variance:")
    logger.info(rm.explainedVariance)
    logger.info("Test R^2 Coef:")
    logger.info(rm.r2)
    logger.info("Test MSE:")
    logger.info(rm.meanSquaredError)rm = new RegressionMetrics(holdout.rdd.map(x =>
      (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

    logger.info("Test Metrics")
    logger.info("Test Explained Variance:")

    logger.info("Test RMSE:")
    logger.info(rm.rootMeanSquaredError)

    model
}
  • 首先划分训练集和测试集
  • fit:
    • 用训练集拟合出一个model
  • RegressionMetrics:
    • 回归evaluator
    • 集中评估标准:
      • R^2:决定系数,反应因变量的全部变异能通过回归关系被自变量解释的比例。如R平方为0.8,则表示回归关系可以解释因变量80%的变异。换句话说,如果我们能控制自变量不变,则因变量的变异程度会减少80%
      • explainedVariance: 解释方差,具体详见:http://blog.sciencenet.cn/blog-1148346-852482.html
      • MAE mean absolute error: 绝对误差,准确值与其测量值之间的误差。
      • MSE mean squared error: 均方误差, 衡量平均误差的方法。
      • RMSE root mean square error: 均方根误差。
  • 最后用训练好的模型transform测试集,然后将结果保存。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容