Spark SQL:function.array的数据类型问题记录

摘要:Spark SQL

问题复现

需要对Spark SQL的DataFrame的一列做groupBy聚合其他所有特征,处理方式是将其他所有特征使用function.array配合function.collect_list聚合为数组,代码如下

    val joinData = data.join(announCountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
      .agg(collect_list(array("publish_date", "target_amt", "case_position", "case_reason", "plaintiff")).as("items"))
      .map {
        row => {
          // 初始化
          var involvedCount: Int = 0
          var involvedCountLast360: Int = 0
          var involvedDefendantCount: Int = 0 
          var involvedDefendantCountLast360: Int = 0
          var lzjfCount: Int = 0 
          var lzjfCountLast360: Int = 0
          var msqqCount: Int = 0 
          var msqqCountLast360: Int = 0
          // 解析
          val entName: String = row.getAs[String]("ent_name")
          val items: Seq[Seq[String]] = row.getAs[Seq[Seq[String]]]("items")
          for (item: Seq[String] <- items) {
            if (item.head != null) {
              val createCaseDate: String = item.head.split(" ")(0)
              val casePosition: String = item(1)
              val caseReason: String = item(2)
              val plaintiff: String = item(3)
              // TODO 业务统计逻辑
              }
            }
             // 统计结果输出
          (entName, involvedCount, involvedCountInc, involvedDefendantCount, involvedDefendantCountInc, lzjfCount, lzjfCountInc,
            msqqCount, msqqCountInc)
        }
      }.toDF("ent_name", "involved_count", "involved_count_inc", "involved_defendant_count", "involved_defendant_count_inc",
      "lzjf_count", "lzjf_count_inc", "msqq_count", "msqq_count_inc")

执行会报错

org.apache.spark.sql.AnalysisException: ... 
due to data type mismatch: input to function array should all be the same type, 
but it's [timestamp, double, string, string, string];;

报错说的很清楚,array函数内的列数据类型不一致,看下原始数据的数据类型

scala> announAmountData.printSchema
 |-- ent_name: string (nullable = true)
 |-- publish_date: timestamp (nullable = true)
 |-- target_amt: double (nullable = true)
 |-- case_position: string (nullable = true)
 |-- case_reason: string (nullable = true)
 |-- plaintiff: string (nullable = true)

里面包含string,double,timestamp三种类型,因此报错可以理解了,但是我发现这个问题的出现不是绝对的,因为类似这样的代码写了好多回,也有各种类型的数据类型,没有出过错(我怀疑array会自定将非string列改为string),理论上应该一起报错才对,下面开始测试一下


function.array测试

下面分别测试一下string,double,timestamp在使用array的各种场景下哪些会报错类型不一致

scala> val a = Seq(("a", "2021-01-1", 3.3, "1"),("b", "2022-01-01", 4.4, "2")).toDF("a", "b", "c", "d")
a: org.apache.spark.sql.DataFrame = [a: string, b: string ... 2 more fields]

scala> val b = a.select($"a", $"b".cast("timestamp"), $"c", $"d")
b: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]

scala> b.printSchema
root
 |-- a: string (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: double (nullable = false)
 |-- d: string (nullable = true)
(1)array(timestamp, string)和array(string, timestamp)
scala> b.withColumn("e", array("b", "d"))
res51: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]

scala> b.withColumn("e", array("d", "b"))
res52: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
scala> b.withColumn("e", array("d", "b")).show(false)
+---+-------------------+---+---+------------------------+
|a  |b                  |c  |d  |e                       |
+---+-------------------+---+---+------------------------+
|a  |2021-01-01 00:00:00|3.3|1  |[1, 2021-01-01 00:00:00]|
|b  |2022-01-01 00:00:00|4.4|2  |[2, 2022-01-01 00:00:00]|
+---+-------------------+---+---+------------------------+
scala> b.withColumn("e", array("d", "b")).printSchema
root
 |-- a: string (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: double (nullable = false)
 |-- d: string (nullable = true)
 |-- e: array (nullable = false)
 |    |-- element: string (containsNull = true)

这两个都是可以的,可见不需要类型一致,最终的array里面都是string,Spark SQL会自动将所有非string列转化为string

(2)array(double, string)和array(string, double)
scala> b.withColumn("e", array("c", "d"))
res58: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]

scala> b.withColumn("e", array("d", "c"))
res59: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]

这种也是可以的不报错,double和string可以组合array

(3)array(double, timestamp)和array(timestamp, double)
scala> b.withColumn("e", array("c", "b"))
org.apache.spark.sql.AnalysisException: cannot resolve 'array(`c`, `b`)' due to data type mismatch: input to function array should all be the same type, but it's [double, timestamp];;
'Project [a#271, b#279, c#273, d#274, array(c#273, b#279) AS e#478]
+- Project [a#271, cast(b#272 as timestamp) AS b#279, c#273, d#274]
   +- Project [_1#266 AS a#271, _2#267 AS b#272, _3#268 AS c#273, _4#269 AS d#274]
      +- LocalRelation [_1#266, _2#267, _3#268, _4#269]

直接报错,不论是array(timestamp, double)还是array(double, timestamp)都直接报错类型不一致,初步结论是array里面没有string列,因为只要将其中任一一列转化为string就可以执行

scala> b.withColumn("e", array($"c".cast("string"), $"b"))
res77: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]

scala> b.withColumn("e", array($"c", $"b".cast("string")))
res78: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
(4)timestamp,double,string组合出现的情况

这种也是有时报错有时不报错,看顺序,直接Kian测试结果

组合 报错
array(double,timestamp,string) ×
array(timestamp,double,string) ×
array(string,timestamp,double)
array(string,double,timestamp)
array(double,string,timestamp)
array(timestamp,string,double)

初步猜测array的书写顺序需要满足在所有的非string类前面,一定要有至少一个string列


解决方案

解决方案就是手动将所有的非string列先转化为string即可,也就不需要关注书写顺序的问题,改写成如下代码即可

    val joinData = data.join(announAmountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
      .agg(collect_list(array($"publish_date".cast("string"), $"target_amt".cast("string"), $"case_position", $"case_reason", $"plaintiff")).as("items"))
      .map {
        row => {

          ()
        }
      }.toDF()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 223,426评论 6 521
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 95,567评论 3 401
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 170,342评论 0 366
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 60,420评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 69,424评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,964评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,365评论 3 426
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,330评论 0 278
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,862评论 1 322
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,921评论 3 343
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,063评论 1 354
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,717评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,393评论 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,880评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,002评论 1 275
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,540评论 3 380
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,084评论 2 361