场景需求: 将SparkSQL计算的结果数据保存到MySQL,但是计算数据里面缺少into_time字段。通过withColumn和UDF实现新加字段。
SparkSession spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
/*
* 注册一个UDF函数,获取当前时间
*/
final String nowTime = DateUtils.getNowTime();
spark.udf().register("gettime", new UDF1<Object, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Object t1) throws Exception {
return nowTime;
}
}, DataTypes.StringType);
Dataset<Long> df = spark.range(10);
df.withColumn("into_time", org.apache.spark.sql.functions.callUDF("gettime", df.col("id"))).show();
//withColumn(String colName,Column col)
//第一个参数是需要新加字段的字段名
//第二个参数需要从当前DF对象的某一个字段来获取
//这里因为需要的是一个时间,输入跟输出并没有关系,所以使用一个自定义的UDF,将id输入UDF,返回值为当前时间。