spark基础数据操作

主要涉及 DataFrame 相关操作
RDD相关操作

  • File

    // 读取文件
    val spark = SparkSession.builder().appName("Example").master("local[*]").getOrCreate()
    val sc = spark.sparkContext
    
    val data: Dataset[String] = spark.read.textFile("data")
    val df: DataFrame = data.toDF()
    val df0: DataFrame = spark.read.csv("data.csv")
    val df1: DataFrame = spark.read.format("csv")
        .option("sep", ";")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("examples/src/main/resources/people.csv")
    val df2: DataFrame = spark.read.json("data.json")
    val df3: DataFrame = spark.read.format("json").load("data.json")
    df0.write.format("csv").save("data.csv")
    
  • data

    // sc.parallelize / makeRDD 创建生成 rdd 数据
    // DataFrame == Dataset[Row]
    import org.apache.spark.ml.linalg.{Vectors, Vector}
    import org.apache.spark.sql.{Row, Dataset, DataFrame}
    val data: Dataset[Row] = df.select("a").distinct().limit(3)
    
    val data: RDD[Int] = sc.makeRDD(Seq(1, 2, 3))
    val data: RDD[Int] = sc.parallelize(Seq(1, 2, 3))
    val data: DataFrame = sc.parallelize(Seq(Tuple1.apply(28), Tuple1.apply(26))).toDF("a")
    
    val denseVector: Vector = Vectors.dense(1.0, 0.0, 3.0)
    val df: DataFrame = spark.createDataFrame(Seq(
            (0, "a", 18.0, Vectors.dense(1.0, 0.1, -8.0)),
            (1, "b", 5.0, Vectors.dense(2.0, 1.0, -4.0)),
            (2, "c", 9.0, Vectors.dense(4.0, 10.0, 8.0))
        )).toDF("id", "cate", "hour", "features")
    
  • na

    对缺省值进行处理

    df.na.fill("0", Seq("a","b"))
    df.na.fill(Map(("a"->"0"),("b","1"),("c","2"),("d","3")))
    df.na.drop(4)  // 过滤掉有效值小于4个的行
    df.na.drop(Seq("1", "3"))
    df.na.drop(2,Seq("1", "3", "4"))
    
  • withColumn

    特征处理,添加新的列

      import org.apache.spark.sql.functions
      df.withColumn("a", lit(12))        // 增加常量 lit 
      df.withColumn("b", $"a"*2)    // 根据已有列进行变换
      df.withColumn("b", col("a")*2)    // 根据已有列进行变换
    
      // 数据分割及合并
      val df1 = df.withColumn("ab", split(col("t"), ",")).select(
                              col("ab").getItem(0).as("a"),
                              col("ab").getItem(1).as("b")
                  ).drop("ab")
      val df2 = df.withColumn("ab", split(col("t"), ","))
                            .withColumn("a", col("ab").getItem(0))
                              .withColumn("b", col("ab").getItem(1))
                              .drop("ab")
      val df3 = df.withColumn("ab", split(col("t"),",")).withColumn("k", explode(col("ab")))   // 一行转多列
      val df4 = df.groupBy("n").withColumn("t", collect_list(when(col("a") === 1, col("a")).otherwise(lit(null)))
      val df5 = df.withColumn("t", concat(col("a"), ",", col("b")))
    
  • 统计

    添加相关统计特征

    import org.apache.spark.sql.functions.{format_number, min, max, avg, stddev, sum}
    val n = df.count()
    val df1 = df.groupBy(col("t")).count().orderBy(col("t")).withColumn("p", col("count")/n)  // 百分比
    
    val tmp = df.groupBy(col("t"))
                .agg(count(lit(1)).as("t-c"),
                    format_number(avg(col("t")), 2).as("t-avg"),
                    format_number(stddev(col("t")), 2).as("t-std"), 
                    min(col("t")).as("t-min"), 
                    max(col("t")).as("t-max"), 
                    max(col("t")).as("t-sum"))
                .na.fill(0)
    val df2 = df.join(tmp, Seq("t"), "left")
    
  • 窗口

    窗口函数相关

    import org.apache.spark.sql.expressions.Window
    val df1 = df.withColumn("t", row_number().over(Window.partitionBy(col("a")).orderBy(col("b").desc)))
    val df2 = df.withColumn("t", count(lit(1)).over(Window.partitionBy(col("a")).orderBy(col("b").desc)))
    val df3 = df.withColumn("t", collect_list(col("c")).over(Window.partitionBy("a").orderBy(col("b")).rowsBetween(-100, -1)))
    
  • Row

    对rdd数据进行处理,或通过row来获得dataFrame表的值

    import org.apache.spark.sql.Row
    import org.apache.spark.ml.linalg.{DenseVector, Vectors}
    df.rdd.map({case Row(A:String, B:Int) => (A, B)})
    df.map(row => {
            (row.getAs[Int]("a"), 
             row.getAs[Double]("b"), 
             Vectors.dense(row.getAs[Seq[Double]]("c").toArray
            )
        })
    
  • udf

    import org.apache.spark.sql.expressions.{UserDefinedFunction, Window}
    val plus: UserDefinedFunction = udf((a: Int, b: Int) => a + b)
    val subStr = udf((c:String, i:Int) => c.split(" ")(i))
    val df1 = df.withColumn("a+b", plus(col("a"), col("b")))
                .withColumn("c-0", subStr(col("c"), lit(0)))
    
    val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
      rows.map { case Row(a: Int, b: Int) => (a, b) }
        .sortBy { case (_, b) => b }
        .map { case (a, _) => a }
    })
    
  • 特征处理

    import org.apache.spark.ml.linalg.{Vector, Vectors}
    import org.apache.spark.ml.feature._
    
    val df: DataFrame = spark.createDataFrame(Seq(
            (0, "a", 18.0, Vectors.dense(1.0, 0.1, -8.0)),
            (1, "b", 5.0, Vectors.dense(2.0, 1.0, -4.0)),
            (2, "c", 9.0, Vectors.dense(4.0, 10.0, 8.0))
    )).toDF("id", "cate", "hour", "features")
    
    // 二值化
    val binarizer: Binarizer = new Binarizer().setInputCol("features").setOutputCol("binFeature").setThreshold(0.5)
    val df1: DataFrame = binarizer.transform(df)
    
    // 分桶, [a, b), 对单独值或vector进行处理
    val bucket: Array[Double] = Array(Double.NegativeInfinity, 6.0, 12.0, Double.PositiveInfinity)
    val bucketizer: Bucketizer = new Bucketizer().setInputCol("hour").setOutputCol("bucketHour").setSplits(bucket)
    val df2: DataFrame = bucketizer.transform(df1)
    
    // 绝对值最大最小化, 归一化到[-1, 1], 对特征处理   x/max(abs(x))
    val maxAbs: MaxAbsScaler = new MaxAbsScaler().setInputCol("features").setOutputCol("maxAbs")
    val df3: DataFrame = maxAbs.fit(df2).transform(df2)
    
    // 最大最小化  ml.lina's.Vector, 对特征处理   (x-min)/(max-min)
    val minMax: MinMaxScaler = new MinMaxScaler().setInputCol("features").setOutputCol("minMax")
    val df4: DataFrame = minMax.fit(df3).transform(df3)
    
    // 标准化  ml.linalg.Vector 对特征处理   (x-mean)/std     .setWithStd(true) .setWithMean(false)
    val ssc: StandardScaler = new StandardScaler().setInputCol("features").setOutputCol("ssc")
    val df5: DataFrame = ssc.fit(df4).transform(df4)
    
    // 正则化   对样本处理  x / pn
    val norm: Normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0)
    val df6: DataFrame = norm.transform(df5)
    
    // IndexToString
    val s2i: StringIndexer = new StringIndexer().setInputCol("cate").setOutputCol("cid")
    val df7: DataFrame = s2i.fit(df6).transform(df6)
    val i2s: IndexToString = new IndexToString().setInputCol("cid").setOutputCol("cid2O")
    val df8: DataFrame = i2s.transform(df7)
    
    // OneHotEncoder
    val encoder: OneHotEncoder = new OneHotEncoder().setInputCol("cid").setOutputCol("cidVec")
    val df9: DataFrame = encoder.transform(df8)
    
    // degree=2, (x, y) => (x, xx, y, xy, yy)
    val poly: PolynomialExpansion = new PolynomialExpansion().setInputCol("features").setOutputCol("poly").setDegree(2)
    val df10: DataFrame = poly.transform(df9)
    
    // 分箱
    val qdisc: QuantileDiscretizer = new QuantileDiscretizer().setInputCol("hour").setOutputCol("hqtd").setNumBuckets(3)
    val df11: DataFrame = qdisc.fit(df10).transform(df10)
    df11.show(false)
    
    // sql
    import org.apache.spark.ml.feature.SQLTransformer
    val dfq: DataFrame = spark.createDataFrame(Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
    val sqlTrans: SQLTransformer = new SQLTransformer().setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
    val dfq1: DataFrame = sqlTrans.transform(dfq)
    
    // VectorAssembler   合并特征
    val assembler: VectorAssembler = new VectorAssembler()
            .setInputCols(Array("v1", "v2", "v3", "v4"))
            .setOutputCol("features")
    val dfq2: DataFrame = assembler.transform(dfq1)
    dfq2.show(false)
    
  • Machine Learning

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容