Spark UDF and functions(一)

1.创建与使用udf

udf有两种使用方法,一是通过sparkSession注册,在sql中直接使用;二是在dataset中通过Column使用。

udf用法一:注册(在sql中使用)

  • java:

        import org.apache.spark.sql.api.java.UDF1;
        import org.apache.spark.sql.types.DataTypes;
        sparkSession.udf().register("split", new UDF1<String, String[]>() {
          public String[] call(String s) throws Exception {
            return s.split(",");
          }
        }, DataTypes.createArrayType(DataTypes.StringType));
        //sparkSession.udf().register("split", (String value) -> value.split(","),DataTypes.createArrayType(DataTypes.StringType));
    
  • scala:

        spark.udf.register("split", (value: String) => {
          value.split(",")
        })
    

udf用法二:方法调用

  • java:

    spark >= 2.3

        import static org.apache.spark.sql.functions.*;
        import org.apache.spark.sql.expressions.UserDefinedFunction;
        UserDefinedFunction mode = udf(
          (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
        );
        df.select(mode.apply(col("vs"))).show();
    
    

    spark < 2.3

        UDF1 mode = new UDF1<Seq<String>, String>() {
        public String call(final Seq<String> types) throws Exception {
            return types.headOption();
          }
        };
        sparkSession.udf().register("mode", mode, DataTypes.StringType);
        df.select(callUDF("mode", col("vs"))).show();
        df.selectExpr("mode(vs)").show();
        
    
  • scala:

        import org.apache.spark.sql.functions._
        val test_split = udf((value:String)=> value.split(","))
        ds.withColumn("test_split",test_split($"column"))
    

2.String* 参数传入数组

使用scala时,dataset的select和drop等方法中有要传入String*可变参数类型的,但如果只有数组形式,转换方法如下:
dataset.drop(columns: _*)dataset.select(columns.map(col(_)): _*)

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容