Learning Spark [8] - MLlib库 - 线性回归

机器学习数据管道(Machine Learning Pipeline)

Pipeline的概念,在很多机器学习的模型中都存在,是一个种整理以及操控数据的方法。在MLlib中,Pipeline API提供了一个在dataframe之上,管理机器学习工作流的接口。

MLlib术语

  • Transformer
    输入一个Dataframe,并输出一个包含了一些新列的Dataframe。Transformer不会从数据中学习到参数,且只会引用一些rule-based规则去转换数据。函数为transform()

Transformer主要用于清洗数据,以便于数据可以用于模型之中。

  • Estimator
    Estimator会从数据中学习到参数,根据函数fit()返回一个Model

该Model便可以被认定为一个Transformer

  • Pipeline
    将一个系列的Transformer和Estimator整理为一个模型。Pipeline自身同样为Estimator,函数为pipeline.fit(),并返回一个PipelineModel

PipelineModel同样可以被认定为Transformer

例子

数据集为San Francisco housing dataset from Inside Airbnb。该数据集包括了Airbnb的租房的信息,例如卧室间数、位置、评价等等。我们的目标为预测一间夜的出租价格。
在起初拿到数据后,我们需要对数据的异常值或者离群值进行去除(例如:小于$0的出租价),并进行数据清洗(例如转换数据类型),等等,再次不过多赘述。

# In Python
from pyspark.sql import SparkSession

# create new spark instance
spark = (SparkSession
         .builder
         .appName('SparkSQLExampleApp')
         # .master("local[*]")
         # .config("spark.sql.catalogImplementation","hive")
         # .enableHiveSupport()
         .getOrCreate())

path = '.../sf-airbnb-clean.parquet'

airbnb = spark.read.parquet(path)
len(airbnb.columns)
34

该数据集有34个列,我们选择几列来拥有一些大概的数据概览。

airbnbDF.select('neighbourhood_cleansed', 'room_type', 'bedrooms', 'bathrooms',                 
                'number_of_reviews', 'price').show(5)
+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows

创建训练集与测试集

训练集与测试集
# Split in Train and Test
trainDF, testDF = airbnb.randomSplit([.8, .2], seed = 42) # seed is for set the randomness
print(f'train size: {trainDF.count()}, test size: {testDF.count()}')
train size: 5780, test size: 1366

单元线性回归

在Spark中建立线性回归模型,我们需要先将所有的自变量合并到一个向量里。我们先只选择一个自变量:卧室间数。我们可以使用vectorAssembler Transofrmer来进行这个操作。

from pyspark.ml.feature import VectorAssembler 

vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") 
vecTrainDF = vecAssembler.transform(trainDF) 
vecTrainDF.select("bedrooms", "features", "price").show(10)
+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows

在拥有的自变量(卧室间数)和因变量(价格)后,我们就可以进行下一步,构建模型。

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
lrModel = lr.fit(vecTrainDF)

这里的lrModel就是一个estimator,它包含了使用训练集训练出来的参数,也是就是线性方程y=ax+b中的a。实际的结果如下

a = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f'The formula for the linear regression is price = {a} * num_of_bedrooms + {b}')
The formula for the linear regression is price = 123.68 * num_of_bedrooms + 47.51

建立Pipeline(管道)

建立pipeline可以更好管理代码,我们就省去了上述冗长的代码,从而更优雅的训练模型或者使用测试集来测试模型的性能。

from pyspark.ml import Pipeline

# transformer config
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") 
# model config
lr = LinearRegression(featuresCol = 'features', labelCol = 'price')
# model training
pipelineModel = Pipeline(stages = [vecAssembler, lr]).fit(trainDF)

# At this time, pipelineModel已经自动可以以transformer来调用,所以便可以很便利的使用测试集来测试模型性能。
predDF = pipelineModel.transform(testDF)
predDF.select('bedrooms', 'features', 'price', 'prediction').show(10)
+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows

多元线性回归&虚拟变量

为了解决自变量中的离散值,我们可以讲该变量转换为Dummy Variable(虚拟变量)。
在Spark中,函数OneHotEncoder()可以实现这一转换。

from pyspark.ml.feature import OneHotEncoder, StringIndexer

# dummy variable 
categoricalCols = [field for (field, dataTypes) in trainDF.dtypes if dataTypes == 'string']

indexOutputCols = [x + 'Index' for x in categoricalCols]
oheOutputCols = [x + 'OHE' for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols,                               
                              outputCols=indexOutputCols,                               
                              handleInvalid = 'skip')

oheEncoder = OneHotEncoder(inputCols = indexOutputCols,
                           outputCols = oheOutputCols)

numericCols = [field for (field, dataTypes) in trainDF.dtypes if ((dataTypes == 'double') & (field != 'price'))]
assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols = assemblerInputs,
                               outputCol = 'features')

lr = LinearRegression(labelCol = 'price', featuresCol = 'features')

pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
|            features|             price|        prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...|  4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows

测试模型性能

常用模型性能的测试指标有:RMSE(root mean-square error),和R^2

from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
    predictionCol = 'prediction',
    labelCol = 'price',
    metricName = 'rmse')
rmse = regressionEvaluator.evaluate(predDF)
print(f'RMSE is {round(rmse,2)}')
RMSE is 220.56
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) 
print(f"R2 is {round(r2, 2)}")
R2 is 0.16

R2过小,我们尝试对因变量进行对数转换。

# log-transform y
from pyspark.sql.functions import log, col

logTrainDF = trainDF.withColumn('price', log(col('price')))
logTestDF = testDF.withColumn('price', log(col('price')))

log_pipelineModel = pipeline.fit(logTrainDF)
predDF = log_pipelineModel.transform(logTestDF)

r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) 
print(f"R2 is {round(r2, 2)}")
R2 is 0.57
predDF.select('features', 'price', 'prediction').show(10)
+--------------------+------------------+------------------+
|            features|             price|        prediction|
+--------------------+------------------+------------------+
|(98,[0,3,6,22,43,...| 4.442651256490317| 4.644425529745689|
|(98,[0,3,6,22,43,...|3.8066624897703196| 4.223594858687562|
|(98,[0,3,6,22,43,...| 4.248495242049359| 4.248280556674246|
|(98,[0,3,6,12,42,...| 4.852030263919617|3.8921581128135756|
|(98,[0,3,6,12,43,...|5.0689042022202315| 4.608476041020452|
|(98,[0,3,6,12,43,...| 5.521460917862246| 5.365868119786427|
|(98,[0,3,6,11,42,...|  4.59511985013459| 5.084838593929874|
|(98,[0,3,6,31,42,...| 4.553876891600541| 5.008339179369244|
|(98,[0,3,6,28,42,...| 4.605170185988092| 4.154386449584621|
|(98,[0,3,6,28,43,...| 7.605890001053122| 5.434322576497891|
+--------------------+------------------+------------------+
only showing top 10 rows

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容