Spark机器学习库(MLlib)

概观

sparklyr为Spark的分布式机器学习库提供绑定。特别是,sparklyr允许访问spark.ml包提供的机器学习例程。与sparklyr的dplyr界面一起, 可以轻松地在Spark上创建和调整机器学习工作流程,完全在R中编排。
sparklyr提供了三个功能系列,可以与Spark机器学习一起使用:

  • 用于分析数据的机器学习算法(ml_*)
  • 用于处理各个特征的特征变换器(ft_*)
  • 用于操作Spark DataFrames(sdf_*)的函数

使用sparklyr的分析工作流程可能包含以下几个阶段。有关示例,请参阅示例工作流程。

  1. 通过sparklyr dplyr接口执行SQL查询,
  2. 使用函数sdf_和ft_函数系列来生成新列或对数据集进行分区,
  3. 从ml_*函数族中选择合适的机器学习算法来建模数据,
  4. 检查模型拟合的质量,并使用它来预测新数据。
  5. 收集结果以便在R中进行可视化和进一步分析

算法

可以通过一ml_*组函数从sparklyr访问Spark的机器学习库:

  • ml_kmeans K-Means聚类
  • ml_linear_regression 线性回归
  • ml_logistic_regression Logistic回归
  • ml_survival_regression 生存回归
  • ml_generalized_linear_regression 广义线性回归
  • ml_decision_tree 决策树
  • ml_random_forest 随机森林
  • ml_gradient_boosted_trees 渐变 - 树木
  • ml_pca 主成分分析
  • ml_naive_bayes 朴素贝叶斯
  • ml_multilayer_perceptron 多层感知器
  • ml_lda 潜在的Dirichlet分配
  • ml_one_vs_rest 一对阵休息

函数公式

该ml_*函数接受的参数response和features。但features也可以是具有主效应的公式(它目前不接受交互术语)。截取项可以通过使用省略-1。
就是这两种公式展现的方式

ml_linear_regression(z ~ -1 + x + y)
ml_linear_regression(intercept = FALSE, response = "z", features = c("x", "y"))

选项

可以使用ml_options函数中的参数修改Spark模型输出ml_*。这ml_options是专家调整模型输出的唯一界面。例如,model.transform可以在执行拟合之前用于改变Spark模型对象。

转换

Spark提供了特征变换器,促进了Spark DataFrame中数据的许多常见转换,并且Sparklyr在ft_*函数族中公开了这些变换。这些例程通常采用一个或多个输入列,并生成一个新的输出列,形成为这些列的转换。

  1. ft_binarizer 阈值数字特征为二进制(0/1)特征
  2. ft_bucketizer Bucketizer将一列连续特征转换为一列特征桶
  3. ft_discrete_cosine_transform 将时域中的长度NN实值序列变换为频域中的另一长度NN实值序列
  4. ft_elementwise_product 使用逐元素乘法将每个输入向量乘以提供的权重向量。
  5. ft_index_to_string 将一列标签索引映射回包含原始标签作为字符串的列
  6. ft_quantile_discretizer 采用具有连续特征的列,并输出具有分箱分类特征的列
  7. sql_transformer 实现由SQL语句定义的转换
  8. ft_string_indexer 将标签的字符串列编码为标签索引列
  9. ft_vector_assembler 将给定的列列表合并到单个矢量列中

例子

采用数据集iris

library(sparklyr)
library(ggplot2)
library(dplyr)
sc <- spark_connect(master = "local")
iris_tbl <- copy_to(sc, iris, "iris", overwrite = TRUE)
iris_tbt

K-MEANS聚类

使用Spark的K-means聚类将数据集分组。K均值聚类将分区指向成k组,使得从点到指定的聚类中心的平方和最小化。

使用k-means聚类

kmeans_model <- iris_tbl %>%
  ml_kmeans(formula = Species~.,centers = 3)

这里需要注意的是,非监督的模型如何写公式

kmeans_model <- iris_tbl %>%
  select(Petal_Width, Petal_Length) %>%
  ml_kmeans(formula = ~.,centers = 3)
kmeans_model
K-means clustering with 3 clusters

Cluster centers:
  Petal_Width Petal_Length
1    1.359259     4.292593
2    0.246000     1.462000
3    2.047826     5.626087

Within Set Sum of Squared Errors =  31.41289

进行预测

predicted <- sdf_predict(kmeans_model, iris_tbl) %>%
  collect

查看聚类的标签和真实的标签之间的关系

table(predicted$Species, predicted$prediction)
         
              0  1  2
  setosa      0 50  0
  versicolor 48  0  2
  virginica   6  0 44

sdf_predict(kmeans_model) %>%
  collect() %>%
  ggplot(aes(Petal_Length, Petal_Width)) +
  geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
             size = 2, alpha = 0.5) + 
  geom_point(data = kmeans_model$centers, aes(Petal_Width, Petal_Length),
             col = scales::muted(c("red", "green", "blue")),
             pch = 'x', size = 12) +
  scale_color_discrete(name = "Predicted Cluster",
                       labels = paste("Cluster", 1:3)) +
  labs(
    x = "Petal Length",
    y = "Petal Width",
    title = "K-Means Clustering",
    subtitle = "Use Spark.ML to predict cluster membership with the iris dataset."
  )

image.png

线性回归

建立线性回归模型

lm_model <- iris_tbl %>%
  select(Petal_Width, Petal_Length) %>%
  ml_linear_regression(Petal_Length ~ Petal_Width)

lm_model
Formula: Petal_Length ~ Petal_Width

Coefficients:
(Intercept) Petal_Width 
   1.083558    2.229940 
iris_tbl %>%
  select(Petal_Width, Petal_Length) %>%
  collect %>%
  ggplot(aes(Petal_Length, Petal_Width)) +
  geom_point(aes(Petal_Width, Petal_Length), size = 2, alpha = 0.5) +
  geom_abline(aes(slope = coef(lm_model)[["Petal_Width"]],
                  intercept = coef(lm_model)[["(Intercept)"]]),
              color = "red") +
  labs(
    x = "Petal Width",
    y = "Petal Length",
    title = "Linear Regression: Petal Length ~ Petal Width",
    subtitle = "Use Spark.ML linear regression to predict petal length as a function of petal width."
  )

image.png

逻辑回归

使用Spark的逻辑回归来执行逻辑回归,将二元结果建模为一个或多个解释变量的函数。

数据准备

beaver <- beaver2
beaver$activ <- factor(beaver$activ, labels = c("Non-Active", "Active"))
copy_to(sc, beaver, "beaver")


beaver_tbl <- tbl(sc, "beaver")

建立回归模型

glm_model <- beaver_tbl %>%
  mutate(binary_response = as.numeric(activ == "Active")) %>%
  ml_logistic_regression(binary_response ~ temp)


glm_model <- beaver_tbl %>%
  ml_logistic_regression(activ ~ temp)

> glm_model
Formula: activ ~ temp

Coefficients:
(Intercept)        temp 
  550.52331   -14.69184 

pre <- sdf_predict(glm_model) %>% collect()
pre
# A tibble: 100 x 12
     day  time  temp activ      features  label rawPrediction probability
   <dbl> <dbl> <dbl> <chr>      <list>    <dbl> <list>        <list>     
 1   307   930  36.6 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 2   307   940  36.7 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 3   307   950  36.9 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 4   307  1000  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 5   307  1010  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 6   307  1020  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 7   307  1030  37.2 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 8   307  1040  36.9 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
 9   307  1050  37.0 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
10   307  1100  36.9 Non-Active <dbl [1]>  1.00 <dbl [2]>     <dbl [2]>  
# ... with 90 more rows, and 4 more variables: prediction <dbl>,
#   predicted_label <chr>, probability_0 <dbl>, probability_1 <dbl>
> 

PCA

使用Spark的主成分分析(PCA)来降低维数。PCA是一种统计方法,用于查找旋转,使得第一个坐标具有可能的最大方差,并且每个后续坐标又具有可能的最大方差
建立PCA模型

pca_model <- tbl(sc, "iris") %>%
  select(-Species) %>%
  ml_pca()


print(pca_model)
Explained variance:

        PC1         PC2         PC3         PC4 
0.924618723 0.053066483 0.017102610 0.005212184 

Rotation:
                     PC1         PC2         PC3        PC4
Sepal_Length -0.36138659 -0.65658877  0.58202985  0.3154872
Sepal_Width   0.08452251 -0.73016143 -0.59791083 -0.3197231
Petal_Length -0.85667061  0.17337266 -0.07623608 -0.4798390
Petal_Width  -0.35828920  0.07548102 -0.54583143  0.7536574

随机森林

使用随机森林进行二分类或者多分类

rf_model <- iris_tbl %>%
  ml_random_forest(Species ~ Petal_Length + Petal_Width, type = "classification",num_trees = 500)



rf_predict <- sdf_predict(rf_model, iris_tbl) %>%
  ft_string_indexer("Species", "Species_idx") %>%
  collect

table(rf_predict$Species_idx, rf_predict$prediction)
     0  1  2
  0 49  1  0
  1  0 50  0
  2  0  0 50

数据集合划分

将Spark DataFrame拆分为训练,测试数据集。

划分数据集合

partitions <- tbl(sc, "iris") %>%
  sdf_partition(training = 0.75, test = 0.25, seed = 1099)

构建线性回归模型

fit <- partitions$training %>%
  ml_linear_regression(Petal_Length ~ Petal_Width)

评价模型的结果

estimate_mse <- function(df){
  sdf_predict(fit, df) %>%
    mutate(resid = Petal_Length - prediction) %>%
    summarize(mse = mean(resid ^ 2)) %>%
    collect
}
sapply(partitions, estimate_mse)

字符串索引

使用ft_string_indexer和ft_index_to_string将字符列转换为数字列,然后再将其转换回来。

ft_string2idx <- iris_tbl %>%
  ft_string_indexer("Species", "Species_idx") %>%
  ft_index_to_string("Species_idx", "Species_remap") %>%
  collect

table(ft_string2idx$Species, ft_string2idx$Species_remap)
        setosa versicolor virginica
  setosa         50          0         0
  versicolor      0         50         0
  virginica       0          0        50

SDF转换

ft_string2idx <- iris_tbl %>%
  sdf_mutate(Species_idx = ft_string_indexer(Species)) %>%
  sdf_mutate(Species_remap = ft_index_to_string(Species_idx)) %>%
  collect

ft_string2idx %>%
  select(Species, Species_idx, Species_remap) %>%
  distinct

简单的例子

让我们通过一个简单的例子来演示在R中使用Spark的机器学习算法。我们将使用ml_linear_regression来拟合线性回归模型。使用内置mtcars数据集,我们将尝试根据车辆的mpg重量(wt)和发动机所包含的气缸数()来预测汽车的油耗(cyl)。

首先,我们将mtcars数据集复制到Spark中。

mtcars_tbl <- copy_to(sc, mtcars, "mtcars")

使用Spark SQL,功能转换器和DataFrame函数转换数据。

使用Spark SQL删除马力小于100的所有汽车
使用Spark功能变换器将汽车分成两组,基于汽缸
使用Spark DataFrame函数将数据分区为测试和培训
然后使用spark ML拟合线性模型。将MPG作为重量和气缸的函数。

partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  sdf_mutate(cyl8 = ft_bucketizer(cyl, c(0,8,12))) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 888)



fit <- partitions$training %>%
  ml_linear_regression(mpg ~ wt + cyl)


summary(fit)
 
Deviance Residuals:
    Min      1Q  Median      3Q     Max 
-2.0947 -1.2747 -0.1129  1.0876  2.2185 

Coefficients:
(Intercept)          wt         cyl 
  33.795576   -1.596247   -1.580360 

R-Squared: 0.8267
Root Mean Squared Error: 1.437

这summary()表明我们的模型非常合适,并且汽车重量以及发动机中的汽缸数量都将成为其平均油耗的强大预测因素。(该模型表明,平均而言,较重的汽车消耗的燃料更多。)

让我们使用我们的Spark模型拟合来预测我们的测试数据集的平均油耗,并将预测的响应与真实的测量燃料消耗进行比较。我们将构建一个简单的ggplot2图,使我们能够检查预测的质量。

# Score the data
pred <- sdf_predict(fit, partitions$test) %>%
  collect

# Plot the predicted versus actual mpg
ggplot(pred, aes(x = mpg, y = prediction)) +
  geom_abline(lty = "dashed", col = "red") +
  geom_point() +
  theme(plot.title = element_text(hjust = 0.5)) +
  coord_fixed(ratio = 1) +
  labs(
    x = "Actual Fuel Consumption",
    y = "Predicted Fuel Consumption",
    title = "Predicted vs. Actual Fuel Consumption"
  )
image.png

虽然简单,但我们的模型似乎在预测汽车的平均油耗方面做得相当不错。

我们可以轻松有效地将功能变换器,机器学习算法和Spark DataFrame功能组合到Spark和R的完整分析中。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容