sparklyr

下载sparklyr

install.packages("sparklyr")
library(sparklyr)
spark_install(version = "2.1.0")

如果要跟新sparklyr的版本的话,可以使用下面的这个命令:

devtools::install_github("rstudio/sparklyr")

连接sparklyr

可以链接本地以及远程spark集群,这里连接的是本地:

library(sparklyr)
sc <- spark_connect(master = "local")

使用dplyr进行数据处理

将R中的数据复制到spark集群中

iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
src_tbls(sc)

使用到的函数就是copy_to.查看集群中所包含的数据集合:
···

src_tbls(sc)
[1] "batting" "flights" "iris"
···
然后就可以直接使用dplyr对数据进行操作:

> flights_tbl %>% filter(dep_delay == 2)
# Source:   lazy query [?? x 19]
# Database: spark_connection
    year month   day dep_time sched_dep_time dep_delay
   <int> <int> <int>    <int>          <int>     <dbl>
 1  2013     1     1      517            515      2.00
 2  2013     1     1      542            540      2.00
 3  2013     1     1      702            700      2.00
 4  2013     1     1      715            713      2.00
 5  2013     1     1      752            750      2.00
 6  2013     1     1      917            915      2.00
 7  2013     1     1      932            930      2.00
 8  2013     1     1     1028           1026      2.00
 9  2013     1     1     1042           1040      2.00
10  2013     1     1     1231           1229      2.00
# ... with more rows, and 13 more variables:
#   arr_time <int>, sched_arr_time <int>,
#   arr_delay <dbl>, carrier <chr>, flight <int>,
#   tailnum <chr>, origin <chr>, dest <chr>,
#   air_time <dbl>, distance <dbl>, hour <dbl>,
#   minute <dbl>, time_hour <dttm>

使用ggplot2作图:

delay <- flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect

# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)
image.png

SQL操作

> library(DBI)
Warning message:
程辑包‘DBI’是用R版本3.4.4 来建造的 
> iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
> iris_preview
   Sepal_Length Sepal_Width Petal_Length Petal_Width
1           5.1         3.5          1.4         0.2
2           4.9         3.0          1.4         0.2
3           4.7         3.2          1.3         0.2
4           4.6         3.1          1.5         0.2
5           5.0         3.6          1.4         0.2
6           5.4         3.9          1.7         0.4
7           4.6         3.4          1.4         0.3
8           5.0         3.4          1.5         0.2
9           4.4         2.9          1.4         0.2
10          4.9         3.1          1.5         0.1
   Species
1   setosa
2   setosa
3   setosa
4   setosa
5   setosa
6   setosa
7   setosa
8   setosa
9   setosa
10  setosa

机器学习

在这里尝试构建一个机器学习模型:

mtcars_tbl <- copy_to(sc, mtcars)
> partitions <- mtcars_tbl %>%
+   filter(hp >= 100) %>%
+   mutate(cyl8 = cyl == 8) %>%
+   sdf_partition(training = 0.5, test = 0.5, seed = 1099)
> fit <- partitions$training %>%
+   ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
> fit
Formula: mpg ~ wt + cyl

Coefficients:
(Intercept)          wt         cyl 
  33.499452   -2.818463   -0.923187 
summary(fit)
Deviance Residuals:
   Min     1Q Median     3Q    Max 
-1.752 -1.134 -0.499  1.296  2.282 

Coefficients:
(Intercept)          wt         cyl 
  33.499452   -2.818463   -0.923187 

R-Squared: 0.8274
Root Mean Squared Error: 1.422

读写数据

temp_csv <- tempfile(fileext = ".csv")
temp_parquet <- tempfile(fileext = ".parquet")
temp_json <- tempfile(fileext = ".json")

spark_write_csv(iris_tbl, temp_csv)
iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)

spark_write_parquet(iris_tbl, temp_parquet)
iris_parquet_tbl <- spark_read_parquet(sc, "iris_parquet", temp_parquet)

spark_write_json(iris_tbl, temp_json)
iris_json_tbl <- spark_read_json(sc, "iris_json", temp_json)

src_tbls(sc)

Rstudio IDE

RStudio IDE的最新RStudio预览版包括对Spark和sparklyr软件包的集成支持,其中包括以下工具:

创建和管理Spark连接
浏览Spark DataFrames的表格和列
预览Spark DataFrames的前1000行
一旦你安装了Sparklyr软件包,你应该在IDE中找到一个新的Spark窗格。 此窗格包含一个新建连接对话框,可用于连接本地或远程Spark实例:


image.png

Livy进行链接

Livy启用到Apache Spark群集的远程连接。 在连接到Livy之前,您需要连接到运行Livy的现有服务的连接信息。 否则,要在本地环境中测试livy,可以按照以下方式在本地安装并运行它:

livy_install()
livy_service_start()
copy_to(sc,iris)
# Source:   table<iris> [?? x 5]
# Database: spark_connection
   Sepal_Length Sepal_Width Petal_Length Petal_Width Species
          <dbl>       <dbl>        <dbl>       <dbl> <chr>  
 1         5.10        3.50         1.40       0.200 setosa 
 2         4.90        3.00         1.40       0.200 setosa 
 3         4.70        3.20         1.30       0.200 setosa 
 4         4.60        3.10         1.50       0.200 setosa 
 5         5.00        3.60         1.40       0.200 setosa 
 6         5.40        3.90         1.70       0.400 setosa 
 7         4.60        3.40         1.40       0.300 setosa 
 8         5.00        3.40         1.50       0.200 setosa 
 9         4.40        2.90         1.40       0.200 setosa 
10         4.90        3.10         1.50       0.100 setosa 
# ... with more rows
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Hadoop和Spark Apache Hadoop是一个开源软件库,可以跨计算机集群分布式处理大型数据集。它具有...
    Liam_ml阅读 4,844评论 0 0
  • 如果你曾经想要使用Sparklyr软件包在R群集中使用Spark集群来处理大数据集,但还没有开始,因为设置Spar...
    Liam_ml阅读 1,306评论 0 0
  • 使用sparklyr可以通过R连接数据库,并且可以使用R的相关工具对spark中的数据进行处理。 R 调用spar...
    Liam_ml阅读 7,204评论 0 2
  • 如果您曾经想要使用Sparklyr软件包在R群集中使用Spark集群来处理大数据集,但还没有开始,因为设置Spar...
    Liam_ml阅读 2,910评论 0 6
  • Sparklyr是rstudio 社区维护的一个spark的接口。 文档 Sparklyr 文档:https://...
    Liam_ml阅读 5,589评论 0 0

友情链接更多精彩内容