RDD2DataFrame

RDD2DataFrame

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.avcdata</groupId>
    <artifactId>spark-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.avcdata.Main</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>

    </dependencies>
    <repositories>
        <repository>
            <id>central</id>
            <name>Central Repository</name>
            <url>http://repo.maven.apache.org/maven2</url>
            <layout>default</layout>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
</project>

RDD2DataFrameRelection.java

package com.avcdata;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import java.util.List;

public class RDD2DataFrameRelection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameRelection").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("students.txt");

        JavaRDD<Student> studentRDD = lines.map((Function<String, Student>) line -> {
            String[] lineSplited = line.split(",");
            Student stu = new Student();
            stu.setId(Integer.valueOf(lineSplited[0]));
            stu.setName(lineSplited[1]);
            stu.setAge(Integer.valueOf(lineSplited[2]));
            return stu;
        });

        // 使用反射方式将RDD转换为DataFrame
        DataFrame studentDF = sqlContext.createDataFrame(studentRDD, Student.class);
        studentDF.printSchema();
        // 有了DataFrame后就可以注册一个临时表,SQL语句还是查询年龄小于18岁的人
        studentDF.registerTempTable("student");
        DataFrame teenagerDF = sqlContext.sql("SELECT * FROM student WHERE age <= 18");

        JavaRDD<Row> teenagerRDD = teenagerDF.toJavaRDD();
        JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map((Function<Row, Student>) row -> {
            // 可以直接通过列名了从Row里面来获取数据,这样的好处就是不用担心顺序
            int id = row.getAs("id");
            int age = row.getAs("age");
            String name = row.getAs("name");

            Student stu = new Student();
            stu.setId(id);
            stu.setAge(age);
            stu.setName(name);
            return stu;
        });

        List<Student> studentList = teenagerStudentRDD.collect();
        studentList.forEach(System.out::println);
    }
}

Scala版本

package com.avcdata

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

case class Person(id: Int, name: String, age: Int)

object RDD2DataFrame {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD2DataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val lines = sc.textFile("students.txt")
    val studentRDD = lines.map(line => {
      val lineSplited = line.split(",")
      Person(lineSplited(0).toInt, lineSplited(1), lineSplited(2).toInt)
    })

    import sqlContext.implicits._
    val studentDF = studentRDD.toDF()

    studentDF.registerTempTable("person")
    val teenagerDF = sqlContext.sql("SELECT * FROM person WHERE age < 18")
    teenagerDF.printSchema()
    val teenagerPersonRDD = teenagerDF.rdd.map(row => Person(row.getAs("id"), row.getAs("name"), row.getAs("age")))

    teenagerPersonRDD.collect().foreach(println)
  }
}

RDD2DataFrameDynamic.java

package com.avcdata;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;


public class RDD2DataFrameDynamic {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameRelection").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("students.txt");

        JavaRDD<Row> rows = lines.map((Function<String, Row>) line -> {
            String[] lineSplited = line.split(",");
            return RowFactory.create(Integer.valueOf(lineSplited[0]), lineSplited[1], Integer.valueOf(lineSplited[2]));
        });

        // 动态构造元数据,还有一种方式是通过反射的方式来构建出DataFrame,这里我们用的是动态创建元数据
        // 有些时候我们一开始不确定有哪些列,而这些列需要从数据库比如MySQL或者配置文件来加载出来
        List<StructField> fields = new ArrayList<>();
        fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));

        StructType schema = DataTypes.createStructType(fields);

        DataFrame studentDF = sqlContext.createDataFrame(rows, schema);

        studentDF.registerTempTable("stu");

        DataFrame teenagerDF = sqlContext.sql("SELECT * FROM stu WHERE age <= 18");

        List<Row> teenagerList = teenagerDF.javaRDD().collect();
        teenagerList.forEach(System.out::println);
    }
}

RDD2DataFrameDynamic.scala

package com.avcdata

import org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object RDD2DataFramedy {
  def main(args: Array[String]): Unit = {
    // 初始化配置及上下文
    val conf = new SparkConf().setAppName("RDD2DataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // 从文件中读取行RDD
    val lines = sc.textFile(path = "students.txt")
    // 将文件RDD转为RDD[Row]
    val rows = lines.map(line => {
      val lineSplited = line.split(",")
      RowFactory.create(Integer.valueOf(lineSplited(0)), lineSplited(1), Integer.valueOf(lineSplited(2)))
    })

    // 自定义DataFrame格式
    val schema = new StructType().add(StructField("id", DataTypes.IntegerType, nullable = true))
      .add(StructField("name", DataTypes.StringType, nullable = true))
      .add(StructField("age", DataTypes.IntegerType, nullable = true))


    val personDF = sqlContext.createDataFrame(rows, schema)
    personDF.registerTempTable(tableName = "stu")
    val teenagerPersonDF = sqlContext.sql(sqlText = "SELECT * FROM stu WHERE age < 18")
    teenagerPersonDF.rdd.collect foreach println
  }
}

students.txt

1,yasaka,17
2,marry,17
3,jack,18
4,tom,19
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。