(五)RDD的创建

一、RDD创建方法一
parallelizing an existing collection in your driver program

scala>  val distData = sc.parallelize(Array(1, 2, 3, 4, 5))
disData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> disData.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5)

遇到action才会导致job的执行

观察UI界面,看到task的数量为2

task为什么为2呢?查看parallelize函数的源码如下:
 /** Distribute a local Scala collection to form an RDD.
   *
   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
   * modified collection. Pass a copy of the argument to avoid this.
   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
   * @param seq Scala collection to distribute
   * @param numSlices number of partitions to divide the collection into
   * @return RDD representing distributed collection
   */
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

从这段源码看到有个参数numSlices: Int = defaultParallelism叫做默认并行度,按住ctrl+鼠标左键,如下:

 /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
  def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

ctrl+alt+鼠标左键,单击TaskSchedulerImpl

override def defaultParallelism(): Int = backend.defaultParallelism()

ctrl+alt+鼠标左键,单击LocalSchedulerBackend

 override def defaultParallelism(): Int =
    scheduler.conf.getInt("spark.default.parallelism", totalCores)

spark.default.parallelism默认值没有设置的情况下,取totalCores,因启动时设置--master local[2],所以totalCores=2,task数量也为2
如果启动时设置--master local[3],则task数量应该为3

[hadoop@hadoop001 bin]$ ./spark-shell --master local[3]
scala> val distData = sc.parallelize(Array(1, 2, 3, 4, 5)).collect
distData: Array[Int] = Array(1, 2, 3, 4, 5)   

如果传入数据时设置一下defaultParallelism:scala> val distData = sc.parallelize(Array(1, 2, 3, 4, 5),4),则task数应为4

[hadoop@hadoop001 bin]$ ./spark-shell --master local[2]
scala> val distData = sc.parallelize(Array(1, 2, 3, 4, 5),4).collect
distData: Array[Int] = Array(1, 2, 3, 4, 5) 

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU(避免core出现空闲的状态) in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)).
二、RDD创建方法二
referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
1.从本地文件系统传入文件

scala> val distFile = sc.textFile("file:///home/hadoop/data/ruozeinput.txt").collect
distFile: Array[String] = Array(hello   world   hello, hello    world   welcome, hello)

2.从HDFS传入文件

scala> val distFile = sc.textFile("hdfs://hadoop001:9000/data/ruozeinput.txt").collect
distFile: Array[String] = Array(hello   world   hello, hello    world   welcome, hello)
scala> val distFile = sc.textFile("hdfs://192.168.137.141:9000/data").collect
distFile: Array[String] = Array(hello   world   hello, hello    world   welcome, hello, hello   world   hello, hello world   welcome, hello, hello   world   hello, hello    world   welcome, hello, hello   world   hello, hello world   welcome, hello)

3.wholeTextFiles
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.

[hadoop@hadoop001 sbin]$ hadoop fs -ls /data
Found 4 items
-rw-r--r--   1 hadoop supergroup         44 2018-09-11 23:02 /data/1
-rw-r--r--   1 hadoop supergroup         44 2018-09-11 23:02 /data/2
-rw-r--r--   1 hadoop supergroup         44 2018-09-11 23:02 /data/3
-rw-r--r--   1 hadoop supergroup         44 2018-09-11 23:02 /data/ruozeinput.txt
scala> val distFile = sc.wholeTextFiles("hdfs://192.168.137.141:9000/data").collect
distFile: Array[(String, String)] =                                             
Array((hdfs://192.168.137.141:9000/data/1,"hello        world   hello
hello   world   welcome
hello
"), (hdfs://192.168.137.141:9000/data/2,"hello  world   hello
hello   world   welcome
hello
"), (hdfs://192.168.137.141:9000/data/3,"hello  world   hello
hello   world   welcome
hello
"), (hdfs://192.168.137.141:9000/data/ruozeinput.txt,"hello     world   hello
hello   world   welcome
hello
"))

4.saveAsTextFile

scala> val distFile = sc.textFile("hdfs://hadoop001:9000/data/ruozeinput.txt")
distFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:9000/data/ruozeinput.txt MapPartitionsRDD[11] at textFile at <console>:24
scala> distFile.saveAsTextFile("hdfs://192.168.137.141:9000/out/")
[hadoop@hadoop001 sbin]$ hadoop fs -ls /out
Found 3 items
-rw-r--r--   3 hadoop supergroup          0 2018-09-11 23:37 /out/_SUCCESS
-rw-r--r--   3 hadoop supergroup         38 2018-09-11 23:37 /out/part-00000
-rw-r--r--   3 hadoop supergroup          6 2018-09-11 23:37 /out/part-00001
[hadoop@hadoop001 sbin]$ hadoop fs -text /out/part*
hello   world   hello
hello   world   welcome
hello

三、创建RDD注意事项
1.If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
2.All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
3.The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
默认一个block创建一个partition

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容