stack overflow原文地址
弱鸡小白在使用SparkR处理大规模的R dataframe时想使用map的方式进行数据操作。数据都是结构化的,并且每个分区都是相同的结构。本想的将这些数据作为parquet这样就可以避免collect的Action操作。现在很担心能不能再程序输出的output list后进行write.df的操作,能否使用worker tasks编写替代指定parquet进行操作?
小白的程序如下:
#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)
# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077", sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/", appName = "Peter Spark test", list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)
#### MAP function ####
run.model <- function(v) {
x <- v$xs[1]
y <- v$ys[1]
startTime <- format(Sys.time(), "%F %T")
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE)
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
return(xys)
}
# HERE BE THE SCRIPT BIT
main <- function() {
# Make unique identifiers for each run
xs <- c(1:365)
ys <- c(1:1)
xys <- data.frame(xs,ys,stringsAsFactors = FALSE)
# Convert to Spark dataframe for mapping
sqlContext <- get("sqlContext", envir = .GlobalEnv)
xys.sdf <- createDataFrame(sqlContext, xys)
# Let Spark do what Spark does
output.list <- SparkR:::map(xys.sdf, run.model)
# Reduce gives us a single R dataframe, which may not be what we want.
output.redux <- SparkR:::reduce(output.list, rbind)
# Or you can have it as a list of data frames. output.col <- collect(output.list)
return(NULL)
}
小白心里是这样想的,先生成一个名字叫xys的dataframe,两列数据,一列是1:365,另一列是1。通过createDataFrame将其转换成为RDD,然后进行map和reduce的操作。同时编写了一个demo小函数,用来进行map。
小白同学的心中是充满疑惑的:
- 并没有想象中的需要避免绝对的collect使用,而去将结果组合作为Parquet进行存储;
- 同时,也并不确信
:::map
的函数形式真正实现了并行,难道需要一直申明parallelise
对于小白的疑惑,大腿同学是这样解释的:
假设你的数据差不多是下面这个样子的:
rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])
首先给你瞅一眼mtcars的数据:
瞅一眼程序结果:
同时大腿也给出了自己的思路:
因为要对所有数据的列进行操作,完全可以把它转换成为row-wise的逐行操作类型;
rows <- SparkR:::flatMap(dfs, function(x) {
data <- as.list(x)
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
do.call(mapply, append(args, data))})
sdf <- createDataFrame(sqlContext, rows)
head(sdf)
大腿这里用了append秒rbind一万条街;用flatmap实现了map实现的捉襟见肘的多分区集合,小白深感佩服。
看到小白一脸葱白的样子,大神接着说:
接下来就可以使用简单的write.df
/
saveDF
了
小白啊,你的问题主要是一开始使用了一个内部方法map,他被从最初版本移除的一个重要原因是如果直接使用是不健全的,而且也不清楚将来会不会被支持,谁知道呢。
于是小白关注了大腿同学。