Apache Spark之文件读写

除了对本地文件系统进行读写以外,Spark还支持很多常见的文件格式(文本文件、JSON)和文件系统(HDFS)和数据库(MySQL、Hive、Hbase)。

1.文件系统的数据读写

1.1本地文件系统的数据读写

在本机上的/usr/local/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources 目录下新建一个TXT文件。文件内容如下:

image.png

要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。

通过SparkContext的textFile方法逐行加载文件中的数据,然后以 作为分隔符,将每一行的字符串切分为一个只包含两个元素(分别代表name和age)的字符串数组。

#加载本地文件
var file = sc.textFile("file:///usr/local/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
#取第一行的数据
 file.first()
image.png
#将file变量内容保存在另一个文件中
file.saveAsFile("file:///usr/local/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/user.txt")

1.2分布式文件系统HDFS的数据读写

首先要启动HDFS文件系统

nohup hive --service metastore > metastore.log 2>&1 &

然后进入/usr/local/hadoop/hadoop-2.8.2/bin 创建HDFS文件路径

#user为指定目录,root为登录Linux系统的用户名
#hdfs://master:9000/user/root
hdfs dfs -mkdir -p /user/root

people.txt 文件上传到HDFS文件系统中(放到root用户下)

#上传文件
hdfs dfs -put /usr/local/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt
#查看文件系统中的文件
hdfs dfs -ls .
#查看文件内容
hdfs dfs -cat ./people.txt
image.png

现在,让我们切换回到spark-shell窗口,编写语句从HDFS中加载people.txt文件,并显示第一行文本内容:

#加载数据
var files = sc.textFile("hdfs://master:9000/user/root/people.txt")
files.first()
image.png

需要注意的是,sc.textFile(“hdfs://master:9000/user/hadoop/word.txt”)中,“hdfs://master:9000/”是前面介绍Hadoop安装内容时确定下来的端口地址9000

2.不同文件格式的读写

2.1文本文件

和上面的例子相同。

#加载本地文件
var file = sc.textFile("file:///usr/local/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
#取第一行的数据
 file.first()

2.2JSON格式

Spark提供了一个JSON样例数据文件,存放在“/usr/local/spark/examples/src/main/resources/people.json”中。

#加载JSON数据源
val jsonStr = sc.textFile("file:///usr/local/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json")
#打印出来
jsonStr.first()
image.png

2.3CSV格式

CSV格式文件也称为逗号分隔值(Comma-Separated Values),CSV文件由任意数目的记录组成,记录间以某种换行符分隔,每条记录有字段组成,字段间的分隔符是其它字符或者字符串,最常见的是逗号或制表符。

参考于databricks官方文档

Scala版本

./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.0.3

val data =spark.read.format("com.databricks.spark.csv")#格式说明
                .option("header","true")#如果在CSV文件第一行有属性的话,就为true,没有就是false
                .option("inferSchema",true)#这里是自动推断属性列的数据类型
                .load("/home/zhoujian/spark/test")#文件路径
#用表格显示出来
display(data)
#显示字段类型
data.printSchema

Python版本

diamonds = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv')
display(diamonds)
diamonds.printSchema()

3.参考资料

Apache spark 中文官方文档

Transformations(转换)

下表列出了一些 Spark 常用的 transformations(转换). 详情请参考 RDD API 文档 (Scala, Java, Python, R) 和 pair RDD 函数文档 (Scala, Java).

Transformation(转换) Meaning(含义)
map(func) 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成.
filter(func) 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成.
flatMap(func) 与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item).
mapPartitions(func) 与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型.
mapPartitionsWithIndex(func) 与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型.
sample(withReplacement, fraction, seed) 样本数据,设置是否放回(withReplacement), 采样的百分比(fraction)、使用指定的随机数生成器的种子(seed).
union(otherDataset) 反回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集.
intersection(otherDataset) 返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集.
distinct([numTasks])) 返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素.
groupByKey([numTasks]) 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) . Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKeyaggregateByKey 来计算性能会更好. Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数.
reduceByKey(func, [numTasks]) 在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 在 (K, V) pairs 的 dataset 上调用时, 返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的. 允许聚合值的类型与输入值的类型不一样, 同时避免不必要的配置. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的.
sortByKey([ascending], [numTasks]) 在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs 的 dataset, 由 boolean 类型的 ascending 参数来指定.
join(otherDataset, [numTasks]) 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoinfullOuterJoin 来实现.
cogroup(otherDataset, [numTasks]) 在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset. 这个操作也调用了 groupWith.
cartesian(otherDataset) 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积).
pipe(command, [envVars]) 通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回.
coalesce(numPartitions) Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的.
repartition(numPartitions) Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀. 该操作总是通过网络来 shuffles 所有的数据.
repartitionAndSortWithinPartitions(partitioner) 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行.

Actions(动作)

下表列出了一些 Spark 常用的 actions 操作。详细请参考 RDD API 文档 (Scala, Java, Python, R)

和 pair RDD 函数文档 (Scala, Java).

Action(动作) Meaning(含义)
reduce(func) 使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )和关联(associative)的,这样才能保证它可以被并行地正确计算.
collect() 在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)之后返回足够小(sufficiently small)的数据子集通常是有用的.
count() 返回 dataset 中元素的个数.
first() 返回 dataset 中的第一个元素(类似于 take(1).
take(n) 将数据集中的前 n 个元素作为一个 array 数组返回.
takeSample(withReplacement, num, [seed]) 对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子.
takeOrdered(n, [ordering]) 返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素.
saveAsTextFile(path) 将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录.
saveAsSequenceFile(path) (Java and Scala) 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int, Double, String 等等).
saveAsObjectFile(path) (Java and Scala) 使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载.
countByKey() 仅适用于(K,V)类型的 RDD 。返回具有每个 key 的计数的 (K , Int)pairs 的 hashmap.
foreach(func) 对 dataset 中每个元素运行函数 func 。这通常用于副作用(side effects),例如更新一个 Accumulator(累加器)或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容

  • 在此特此声明:一下所有链接均来自互联网,在此记录下我的查阅学习历程,感谢各位原创作者的无私奉献 ! 技术一点一点积...
    远航的移动开发历程阅读 11,136评论 12 197
  • 许小佩同学是一个怕与外界交流的人,尤其是面对面沟通,电话沟通,一出现问题,就成了情绪高小达人。而我们今天分享的书叫...
    青果果说阅读 327评论 2 3
  • 早晨醒来,揉了一下眼腈,突然心里一惊,想起今日是颠倒节。 慌忙爬起来,走出卧室,刚走到卫生间,猛一抬头,吓得我倒退...
    张家二YY阅读 176评论 17 6
  • 《大熊与骚花的故事》属于一系列的文章,每周日更新一篇,根据真实故事,适当修饰。 “你是卖蜂蜜的吗?” “嗯。” “...
    丁同学阅读 276评论 0 0
  • 虽然我有些许洁癖,例如拿过钱或者逛过图书馆之后必定洗手,也从不爱用二手书籍。但是,当我找到那被人翻得破败不堪的《黄...
    墨脉阅读 1,037评论 0 1