RDD2DataFrame

RDD2DataFrame

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.avcdata</groupId>
    <artifactId>spark-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.avcdata.Main</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>

    </dependencies>
    <repositories>
        <repository>
            <id>central</id>
            <name>Central Repository</name>
            <url>http://repo.maven.apache.org/maven2</url>
            <layout>default</layout>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
</project>

RDD2DataFrameRelection.java

package com.avcdata;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import java.util.List;

public class RDD2DataFrameRelection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameRelection").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("students.txt");

        JavaRDD<Student> studentRDD = lines.map((Function<String, Student>) line -> {
            String[] lineSplited = line.split(",");
            Student stu = new Student();
            stu.setId(Integer.valueOf(lineSplited[0]));
            stu.setName(lineSplited[1]);
            stu.setAge(Integer.valueOf(lineSplited[2]));
            return stu;
        });

        // 使用反射方式将RDD转换为DataFrame
        DataFrame studentDF = sqlContext.createDataFrame(studentRDD, Student.class);
        studentDF.printSchema();
        // 有了DataFrame后就可以注册一个临时表,SQL语句还是查询年龄小于18岁的人
        studentDF.registerTempTable("student");
        DataFrame teenagerDF = sqlContext.sql("SELECT * FROM student WHERE age <= 18");

        JavaRDD<Row> teenagerRDD = teenagerDF.toJavaRDD();
        JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map((Function<Row, Student>) row -> {
            // 可以直接通过列名了从Row里面来获取数据,这样的好处就是不用担心顺序
            int id = row.getAs("id");
            int age = row.getAs("age");
            String name = row.getAs("name");

            Student stu = new Student();
            stu.setId(id);
            stu.setAge(age);
            stu.setName(name);
            return stu;
        });

        List<Student> studentList = teenagerStudentRDD.collect();
        studentList.forEach(System.out::println);
    }
}

Scala版本

package com.avcdata

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

case class Person(id: Int, name: String, age: Int)

object RDD2DataFrame {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD2DataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val lines = sc.textFile("students.txt")
    val studentRDD = lines.map(line => {
      val lineSplited = line.split(",")
      Person(lineSplited(0).toInt, lineSplited(1), lineSplited(2).toInt)
    })

    import sqlContext.implicits._
    val studentDF = studentRDD.toDF()

    studentDF.registerTempTable("person")
    val teenagerDF = sqlContext.sql("SELECT * FROM person WHERE age < 18")
    teenagerDF.printSchema()
    val teenagerPersonRDD = teenagerDF.rdd.map(row => Person(row.getAs("id"), row.getAs("name"), row.getAs("age")))

    teenagerPersonRDD.collect().foreach(println)
  }
}

RDD2DataFrameDynamic.java

package com.avcdata;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;


public class RDD2DataFrameDynamic {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameRelection").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("students.txt");

        JavaRDD<Row> rows = lines.map((Function<String, Row>) line -> {
            String[] lineSplited = line.split(",");
            return RowFactory.create(Integer.valueOf(lineSplited[0]), lineSplited[1], Integer.valueOf(lineSplited[2]));
        });

        // 动态构造元数据,还有一种方式是通过反射的方式来构建出DataFrame,这里我们用的是动态创建元数据
        // 有些时候我们一开始不确定有哪些列,而这些列需要从数据库比如MySQL或者配置文件来加载出来
        List<StructField> fields = new ArrayList<>();
        fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));

        StructType schema = DataTypes.createStructType(fields);

        DataFrame studentDF = sqlContext.createDataFrame(rows, schema);

        studentDF.registerTempTable("stu");

        DataFrame teenagerDF = sqlContext.sql("SELECT * FROM stu WHERE age <= 18");

        List<Row> teenagerList = teenagerDF.javaRDD().collect();
        teenagerList.forEach(System.out::println);
    }
}

RDD2DataFrameDynamic.scala

package com.avcdata

import org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object RDD2DataFramedy {
  def main(args: Array[String]): Unit = {
    // 初始化配置及上下文
    val conf = new SparkConf().setAppName("RDD2DataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // 从文件中读取行RDD
    val lines = sc.textFile(path = "students.txt")
    // 将文件RDD转为RDD[Row]
    val rows = lines.map(line => {
      val lineSplited = line.split(",")
      RowFactory.create(Integer.valueOf(lineSplited(0)), lineSplited(1), Integer.valueOf(lineSplited(2)))
    })

    // 自定义DataFrame格式
    val schema = new StructType().add(StructField("id", DataTypes.IntegerType, nullable = true))
      .add(StructField("name", DataTypes.StringType, nullable = true))
      .add(StructField("age", DataTypes.IntegerType, nullable = true))


    val personDF = sqlContext.createDataFrame(rows, schema)
    personDF.registerTempTable(tableName = "stu")
    val teenagerPersonDF = sqlContext.sql(sqlText = "SELECT * FROM stu WHERE age < 18")
    teenagerPersonDF.rdd.collect foreach println
  }
}

students.txt

1,yasaka,17
2,marry,17
3,jack,18
4,tom,19
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352