DataSet在spark中被称为类型化API,在编译时就会检查类型,而DF在运行时才会检查。DataSet具有严格JVM语言特性,仅在Scala与java中使用。在DataSet中每行包含的对象是自定义的,相当于定义了schema。
1. 什么时候使用DataSet?
- 操作无法使用DF实现
- 需要类型安全,并愿意牺牲一定的性能实现
2. 使用DataSet
需要提前前知道和定义数据schema。
- java:编码器
使用流程:
1)编写类
2)使用DF(即Dataset<Row>类型)的时候进行编码
import org.apache.spark.sql.Encoders;
public class Flight implements Serializable{
String DEST_COUNTRY_NAME;
String ORIGIN_COUNTRY_NAME;
Long DEST_COUNTRY_NAME;
}
Dataset<Flight> flights = spark.read
.parquet(“/data/flight-data/parquet/2010-summary.parquet/")
.as(Encoders.bean(Flight.class));
- Scala:case类
使用步骤:
1)创建Scala的case类
2)使用DF的as方法指定到case类
//创建case类
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String, count: BigInt)
//DF转换到DS
val flightsDF = spark.read
.parquet(“/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
)
3.action操作
基本使用同DF一致。只是在访问列的时候需要制定case class的属性名即可:
flights.first.DEST_COUNTRY_NAME
4.transformation操作
基本与DF中的转换操作一致。
- 过滤
使用函数对DS进行强制过滤。
//定义过滤器函数
def originIsDestination(flight_row: Flight): Boolean = {
return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
//使用过滤器并使用已定义的函数
flights.filter(flight_row => originIsDestination(flight_row)).first()
//这种功能函数可以在spark以外的环境单独调试
flflights.collect().fifilter(flflight_row => originIsDestination(flflight_row))
Array[Flight] = Array(Flight(United States,United States,348113))
- 映射
实现一些其他操作,如提取一个值、比较一组值或类似操作。
val destnations = flights.map(f => f.DEST_COUNTRY_NAME) //返回的是String类型的DataSet
- 连接
连接操作与DF的用法相同。DataSet还提供了joinWith方法,作用是将两个DS整合成一个,每一列都表示一个DS,并能进行相应的操作。如果使用常规的join方法则会丢失DS属性返回常规的DF。
case class FlightMetadata(count: BigInt, randomData: BigInt)
val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
.withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
.as[FlightMetadata]
val flights2 = flights
.joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))
//查询joinWith之后的DS
flights2.selectExpr("_1.DEST_COUNTRY_NAME")
- 分组和聚合
分组(grouping)和聚合(aggregation)与DF的聚合分组操作相同,即groupBy、rollup、cube仍可用,但返回值发生了类型丢失会变成DF。
flights.groupBy("DEST_COUNTRY_NAME").count()
若要保持类型,可以使用groupByKey方法,按照DS中的特定键进行分组并返回具有类型信息的DS。这种方式需要传入一个函数,而非列名。
flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()
在DS上根据某个key执行分组后,我们就可以在K-V DataSet上根据我们定义的函数操作数据,该函数分组作为原始对象操作:
def grpSum(countryName:String, values: Iterator[Flight]) = {
values.dropWhile(_.count < 5).map(x => (countryName, x))
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(5)
//还可以创建新操作,定义如何执行reduceGroups聚合
def sum2(left:Flight, right:Flight) = {
Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).reduceGroups((l, r) => sum2(l, r))
.take(5)
所以,仅在用户定义分组聚合时使用Dataset才有意义,这可能在大数据流水线处理的开始或结束的位置。