54.认识sparklyr

  • sparklyr不仅提供了基于Spark的分布式机器学习算法库,还有其他的一些功能。如下:
    • 使用dplry和SQL(通过DBI)交互式的操作Spark的数据。
    • 过滤和聚合Spark数据集,然后将它们通过R进行分析和可视化。
    • 使用Spark MLlib和H2O SparkingWater实现分布式的机器学习。
    • 创建extensions,可以调用完整的SparkAPI并提供Spark包的接口。
    • 支持集成连接到Spark,并通过RStudioIDE浏览Spark DataFrames。

从CRAN安装sparklyr

install.packages("sparklyr")
  • 还要安装一个本地的Spark版本
    • 如果使用RStudio IDE,还需要下载一个最新的IDE,这个新的IDE包含了集成Spark的功能提升。
library(sparklyr)
spark_install(version = "1.6.2")

连接到Spark

  • 可以选择连接本地的Spark实例或者远程的Spark集群,如下连接到本地的Spark。
    • 返回的Spark connection(sc)为Spark集群提供了一个远程的dplyr数据源。
library(sparklyr) 
sc <- spark_connect(master = "local")

数据读取

  • 可以使用dplyr的copy_to函数将R的data frames拷贝到Spark。
    • 更典型的是可以通过spark_read的一系列函数读取Spark集群中的数据。
  • 如下例子,从R拷贝一些数据集到Spark。
    • 注意可能需要安装nycflights13和Lahman包才能运行这些代码。
library(dplyr) 
iris_tbl <- copy_to(sc, iris) 
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

dplyr使用

  • 针对集群中的表,现在可以使用所有可用的dplyr的verbs。以下是一个简单的过滤示例:
# filter by departure delay
flights_tbl %>% filter(dep_delay == 2)
  • 比如,分析航班延误的数据。
delay <- flights_tbl %>% 
group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
collect()

# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
  • 注意尽管上面显示的dplyr函数与在使用R的data frames时是一样的,但如果使用的是sparklyr,它们其实是被推到远端的Spark集群里执行的。

Window Functions

  • dplyr同时也支持window函数,比如:
library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

batting_tbl %>% 
select(playerID, yearID, teamID, G, AB:H) %>%  
arrange(playerID, yearID, teamID) %>% 
group_by(playerID) %>%  
filter(min_rank(desc(H)) <= 2 & H > 0)

Machine Learning

  • 使用Spark MLlib或H2O SparkingWater实现分布式的机器学习。
    • 它们都提供了一系列的基于DataFrames构建的high-levelAPIs,从而帮助创建和调试机器学习工作流。

Spark MLlib

  • 例子:将使用ml_linear_regression来拟合线性回归模型。
    • 使用内置的mtcar数据集,看看是否可以根据其重量(wt)和发动机的气缸数量(cyl)来预测汽车的燃油消耗(mpg)。
    • 假设在每种情况下,mpg和features(wt和cyl)之间的关系是线性的。
# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars)

# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# fit a linear model to the training dataset
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
  • 对于由Spark生成的线性回归模型,可以使用summary()来更多的了解拟合质量(quality of our fit),以及每个预测变量的统计显著性(statistical significance)。
summary(fit)
  • Spark机器学习支持众多的算法和特征变换,如上所示,会发现将这些功能与dplyr管道链接起来很容易。

H2O Sparkling Water

  • 以mtcars为例,这次使用H2O Sparkling Water来实现。
    • dplyr代码依旧是用来准备数据,当将数据分为test和training后,调用h2o.glm而不是ml_linear_regression。
# convert to h20_frame (uses the same underlying rdd)
training <- as_h2o_frame(partitions$training)
test <- as_h2o_frame(partitions$test)

# fit a linear model to the training dataset
fit <- h2o.glm(x = c("wt", "cyl"),
y = "mpg",
training_frame = training,
lamda_search = TRUE)

# inspect the model
print(fit)
  • 对于由H2O产生的线性回归模型,可以使用print() 或 summary()来更多的了解拟合质量(quality of our fit)。
    • summary()方法返回一些关于评分历史(scoringhistory)和变量重要性(variableimportance)的额外信息。

大数据视频推荐:
腾讯课堂
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容