1.Spark SQL中的一个空值处理
背景是Dataframe转为表之后使用Spark SQL,并需要过滤一些数值型空值。
Scala中的NaN值,一般可以看作是Float或Double类型,DF存储为空字符串。
val a = Float.NaN
转为表(createTempView)之后,DF中的NaN值会被记为“NaN”,该"NaN“值不会参与(Spark SQL的)count、avg等聚合运算,如果需要对该值在Spark SQL中进行识别或处理,可以在使用(相当于把字符串赋了一个类型):
cast('NaN' as float)
进一步的,Scala中的空值还有Nil(空列表)、null(无值无类型,一些文章建议少用)和Nothing(抛异常的时候用)等情况。
此外还有比较有特色Option(包含Some/None)等类型,其中None为”Empty“(isEmpty的结果),可以转String等类型,理论上使用Option类型的安全性更好,但之前用的其他方式加的保护,还没机会深究。
2. Spark SQL中的列表
背景是在Spark SQL中做Group by 聚合,然后字段的聚合方法是把分组内的数据写入一个List或Array(作为一个字段)。实际Spark官网提供了不少SQL内置函数,摘录几个用到的:
- 最基本的方法是collect_list(col),之后字段形式为[1,2,3,4,……]这种,其他还可以参考官网中collect或array开头的几个函数。比如:
select col1, sort_array(array_remove(collect_list(col2)), cast('NaN' as float)) from table1 group by col1
该例子可以理解为按col1列聚合之后,将col2的内容记录为列表,然后移除列表中的NaN值,然后再排序。
- 实际工程中,上例实际还用了一个UDF函数做进一步处理,因此排序和去除空值的工作,实际也可以利用UDF函数进行,但简单测试看了一下,貌似在SQL中处理和在UDF中处理的效率基本是一致的。
- Spark SQL中无论输出的是List(collect_list结果)还是Array(sort_array结果),自定义函数的输入条件都要用Seq,这个真是困惑了好久。
//定义udf的时候
def getmiddle(col: Seq[Float]):Float ={……}
/*使用的时候*/
select col1, getmiddle(collect_list(col2)) from table1 group by col1
3. Spark SQL中的结构体+列表
在Spark SQL中使用结构体,有struct和name_struct两类,理论上后者应该对应scala的case class,但在SQL中利用聚合方式即时生成了一个复合型的数据,类型大致为:array[float,string],把他包装为array(struct)或array(name_struct)并传到一个UDF函数中进行进一步处理,UDF的入口参数类型是seq[case class],此时一直报错:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to myCaseClass
有文章说这种参数传递时,会丢弃schema,所以无法转为case class,我没找到原文,但目前看起来是这样的。还尝试了struct转成元组等,也有问题,但懒得再细琢磨了。
后来实验了三种方法:
- 1,SQL这边用struct或name_struct都行,然后UDF这边用seq[ROW]类型(org.apache.spark.sql._),然后用row(0)、row(1)这种方式取值(any类型)即可。
- 2,SQL这边加个to_json:array(to_json(name_struct)),然后UDF用字符串序列接收,再依次转json处理。
- 3,直接用多个array传参数,代码其实更简洁,但我总害怕出现位置错误,比如一些空值等情况,导致两个队列中的数值关系错位等(逻辑上感觉不会,但没做过测试,怕没想周全)
4. DF的map等操作的一些限制
尝试在DF的map操作中使用math-case或if,一直出错,但感觉语法应该是对的,因为相同语法拿掉条件语句就可以运行。不知道是不支持还是怎样,这个一直没解决。
df.map(x=>
{
if ……else……
})
此外在DF的map中使用另一个广播变量DF(遍历一个小表),一直会报错,这个也没解决。