Dataset API

DataSet在spark中被称为类型化API,在编译时就会检查类型,而DF在运行时才会检查。DataSet具有严格JVM语言特性,仅在Scala与java中使用。在DataSet中每行包含的对象是自定义的,相当于定义了schema。

1. 什么时候使用DataSet?

  • 操作无法使用DF实现
  • 需要类型安全,并愿意牺牲一定的性能实现

2. 使用DataSet

需要提前前知道和定义数据schema。

  • java:编码器
    使用流程:
    1)编写类
    2)使用DF(即Dataset<Row>类型)的时候进行编码
import org.apache.spark.sql.Encoders;
public class Flight implements Serializable{
 String DEST_COUNTRY_NAME;
 String ORIGIN_COUNTRY_NAME;
 Long DEST_COUNTRY_NAME;
}
Dataset<Flight> flights = spark.read
 .parquet(“/data/flight-data/parquet/2010-summary.parquet/")
 .as(Encoders.bean(Flight.class));
  • Scala:case类
    使用步骤:
    1)创建Scala的case类
    2)使用DF的as方法指定到case类
//创建case类
case class Flight(DEST_COUNTRY_NAME: String,
 ORIGIN_COUNTRY_NAME: String, count: BigInt)
//DF转换到DS
val flightsDF = spark.read
 .parquet(“/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
)

3.action操作

基本使用同DF一致。只是在访问列的时候需要制定case class的属性名即可:

flights.first.DEST_COUNTRY_NAME

4.transformation操作

基本与DF中的转换操作一致。

  • 过滤
    使用函数对DS进行强制过滤。
//定义过滤器函数
def originIsDestination(flight_row: Flight): Boolean = {
 return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
//使用过滤器并使用已定义的函数
flights.filter(flight_row => originIsDestination(flight_row)).first()
//这种功能函数可以在spark以外的环境单独调试
flflights.collect().fifilter(flflight_row => originIsDestination(flflight_row))
Array[Flight] = Array(Flight(United States,United States,348113))
  • 映射
    实现一些其他操作,如提取一个值、比较一组值或类似操作。
val destnations = flights.map(f => f.DEST_COUNTRY_NAME) //返回的是String类型的DataSet
  • 连接
    连接操作与DF的用法相同。DataSet还提供了joinWith方法,作用是将两个DS整合成一个,每一列都表示一个DS,并能进行相应的操作。如果使用常规的join方法则会丢失DS属性返回常规的DF。
case class FlightMetadata(count: BigInt, randomData: BigInt)
val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
 .withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
 .as[FlightMetadata]
val flights2 = flights
 .joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))
//查询joinWith之后的DS
flights2.selectExpr("_1.DEST_COUNTRY_NAME")
  • 分组和聚合
    分组(grouping)和聚合(aggregation)与DF的聚合分组操作相同,即groupBy、rollup、cube仍可用,但返回值发生了类型丢失会变成DF。
flights.groupBy("DEST_COUNTRY_NAME").count()

若要保持类型,可以使用groupByKey方法,按照DS中的特定键进行分组并返回具有类型信息的DS。这种方式需要传入一个函数,而非列名。

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()

在DS上根据某个key执行分组后,我们就可以在K-V DataSet上根据我们定义的函数操作数据,该函数分组作为原始对象操作:

def grpSum(countryName:String, values: Iterator[Flight]) = {
 values.dropWhile(_.count < 5).map(x => (countryName, x))
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(5)
//还可以创建新操作,定义如何执行reduceGroups聚合
def sum2(left:Flight, right:Flight) = {
 Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).reduceGroups((l, r) => sum2(l, r))
 .take(5)

所以,仅在用户定义分组聚合时使用Dataset才有意义,这可能在大数据流水线处理的开始或结束的位置。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容