Flink 嵌套json 解析成Table

背景

JSON作为常用的数据格式,在消息中间件中用json做为消息格式也很常见。在flink table中消息可以理解为表的一行记录。所以对于一个消息队列中的一个topic来说,可以根据json数据格式映射成一张表。flink自身是支持json格式的,但是对于复杂格式支持不是太友好。笔者也是在flink table的应用中遇到了各种json格式,发布出来给大家看看,或有其他好的解析方式可留言探讨。

概述

下面3图可以很直观看出我理想的解析思想,左边是源json格式右边是table 格式

只有嵌套对象类json


图1

单一嵌套数组类json


图2

多嵌套数组类json

图3

针对多嵌套数组的笔者的最终实现和图3有差别。选择的方式为扁平化的实现方式,后面根据条件去选择tablename拆分成2张表


图4

实现 直接上源码。

package com.paic.phflink.core.util

import java.util

import org.apache.flink.api.common.typeinfo.TypeInformation

import org.apache.flink.api.java.typeutils.RowTypeInfo

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, JsonNodeFactory, ObjectNode}

import org.apache.flink.types.Row

import scala.collection.JavaConverters._

import scala.collection.mutable

/**

  * 给定JSON串,和目标schema描述,生成对于的 Row

*/

object JsonNodeUtil {

def main(args: Array[String]):Unit = {

//    objects_test()

//    oneList_test()

    twoList_test()

//    oneList_no_flatMap_test()

//    twoList_no_flatmap_test()

  }

val objects =

"""

{"a":"1","b":{"c":"2","d":"3"},"e":{"f":"4","g":"5"}}

    """.stripMargin

.getBytes()

val oneList =

"""

{"a":"1","b":[{"c":"2","d":"3"},{"c":"4","d":"5"}],"e":"6","f":{"g":"7","h":"8"}}

    """.stripMargin

.getBytes()

val twoList =

"""

{"a":"1","b":[{"c":"2","d":"3"},{"c":"4","d":"5"}],"e":"6","f":[{"g":"7","h":"8"},{"g":"9","h":"10"}],"i":{"j":"11","k":"12"}}

    """.stripMargin

.getBytes()

/**

    * 嵌套对象打平 和嵌套list一个不打平一个打平

    */

  def twoList_no_flatmap_test() = {

val objectMapper =new ObjectMapper()

val json =twoList

    val map =new util.LinkedHashMap[String, String]()

map.put("a", TableSchemaUtil.STRING)

map.put("b", TableSchemaUtil.OBJECT_ARRAY)

map.put("b_c", TableSchemaUtil.STRING)

map.put("b_d", TableSchemaUtil.STRING)

map.put("e", TableSchemaUtil.STRING)

map.put("f", TableSchemaUtil.OBJECT_ARRAY)

map.put("f_g", TableSchemaUtil.STRING)

map.put("f_h", TableSchemaUtil.STRING)

map.put("i_j", TableSchemaUtil.STRING)

map.put("i_k", TableSchemaUtil.STRING)

map.put("tableName", TableSchemaUtil.STRING)

val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

rows

}

/**

    * 嵌套对象打平 和嵌套list不打平

    */

  def oneList_no_flatMap_test() = {

val objectMapper =new ObjectMapper

val json =oneList

    val map =new util.LinkedHashMap[String, String]()

map.put("a", TableSchemaUtil.STRING)

map.put("b", TableSchemaUtil.OBJECT_ARRAY)

map.put("b_c", TableSchemaUtil.STRING)

map.put("b_d", TableSchemaUtil.STRING)

map.put("e", TableSchemaUtil.STRING)

map.put("f_g", TableSchemaUtil.STRING)

map.put("f_h", TableSchemaUtil.STRING)

val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

rows

}

/**

    * 嵌套对象 和嵌套多个list  打平

    * 多个数组的一定要指定 一个 tableName 列

    * map.put("tableName", TableSchemaUtil.STRING)

    * 后续根据这个tableName 进行查询分表

    * @return

    */

  def twoList_test() = {

val objectMapper =new ObjectMapper

val json =twoList

    val map =new util.LinkedHashMap[String, String]()

map.put("a", TableSchemaUtil.STRING)

map.put("b_c", TableSchemaUtil.STRING)

map.put("b_d", TableSchemaUtil.STRING)

map.put("e", TableSchemaUtil.STRING)

map.put("f_g", TableSchemaUtil.STRING)

map.put("f_h", TableSchemaUtil.STRING)

map.put("i_j", TableSchemaUtil.STRING)

map.put("i_k", TableSchemaUtil.STRING)

map.put("tableName", TableSchemaUtil.STRING)

val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

rows

}

/**

    * 嵌套对象 和嵌套list  打平

    * @return

    */

  def oneList_test() = {

val objectMapper =new ObjectMapper

val json =oneList

    val map =new util.LinkedHashMap[String, String]()

map.put("a", TableSchemaUtil.STRING)

map.put("b_c", TableSchemaUtil.STRING)

map.put("b_d", TableSchemaUtil.STRING)

map.put("e", TableSchemaUtil.STRING)

map.put("f_g", TableSchemaUtil.STRING)

map.put("f_h", TableSchemaUtil.STRING)

val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

rows

}

/**

    * 嵌套对象打平

    * @return

    */

  def objects_test() = {

val objectMapper =new ObjectMapper

val json =objects

    val map =new util.LinkedHashMap[String, String]()

map.put("a", TableSchemaUtil.STRING)

map.put("b_c", TableSchemaUtil.STRING)

map.put("b_d", TableSchemaUtil.STRING)

map.put("e_f", TableSchemaUtil.STRING)

map.put("e_g", TableSchemaUtil.STRING)

val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

rows

}

def getRows(objectMapper: ObjectMapper,json:Array[Byte],map:util.LinkedHashMap[String, String]) ={

val objectNode:ObjectNode = objectMapper.readValue(json,classOf[ObjectNode])

//最外层的数据

    val rootColums =new ObjectNode(JsonNodeFactory.instance)

//嵌套的数组

    val tables = mutable.LinkedHashMap[String, ArrayNode]()

val list =new util.ArrayList[Row]()

//解析json

    parse(objectNode,rootColums,tables,"")

val saclaLinkMap = mutable.LinkedHashMap[String, String]()

map.asScala.foreach{case (k:String,v:String) => saclaLinkMap += (k -> v)}

//对json转化成 row的时候选择打平跳过

    val feidMapScala = TableSchemaUtil.toFlinkType(saclaLinkMap)

val types: Array[TypeInformation[_]] = feidMapScala.map(_._2).toArray

val fieldNames: Array[String] = feidMapScala.map(_._1).toArray

val info =new RowTypeInfo(types,fieldNames)

val tableSize = tables.size

if(tableSize ==0 && rootColums.size() >0){

val row = JsonToRowUtil.convertRow(rootColums,info)

list.add(row)

}

//循环每个嵌套的数组 每个数组理解为一个表

    for( (table,value)<- tables) {//循环每个表

      //判断表是否需要打平

      if(map.containsKey(table) && map.get(table).startsWith(TableSchemaUtil.OBJECT_ARRAY)){

//不需要打平就把list数据弄成一个row 数组

        val tableLine = rootColums.deepCopy()//每一行初始化的ObjectNode

        if(tableSize >1){//如果有多个table 就要加一列table 名字做区分

          tableLine.put("tableName",table)//把每行数据都加一个table名字

        }

tableLine.put(table,value)

val row = JsonToRowUtil.convertRow(tableLine,info)

list.add(row)

}else{

val tableLines = value.elements()//表中的所有行

        while (tableLines.hasNext) {//循环每一行

          val line = tableLines.next()//获取每一行

          val child = line.fields()//每一行的所有列

          val tableLine = rootColums.deepCopy()//每一行初始化的ObjectNode

          if(tableSize >1){//如果有多个table 就要加一列table 名字做区分

            tableLine.put("tableName",table)//把每行数据都加一个table名字

          }

while (child.hasNext) {//循环每一列

            val colum = child.next()//获取每一列

            val columName = colum.getKey//列名

            val filedFullName = table +"_" + columName//组合列名

            val columValue = colum.getValue.asText()//列对应的值

            //          println(filedFullName, columValue)

            val dataValue = colum.getValue.asText()

tableLine.put(filedFullName,dataValue)

}

val row = JsonToRowUtil.convertRow(tableLine,info)

list.add(row)

}

}

}

//    returnJson.put("root",json)

    list

}

def parse(objectNode: ObjectNode,

            objectSchema:ObjectNode,

            listSchema:mutable.LinkedHashMap[String, ArrayNode],

            parentName:String =""

          ):Unit ={

val fieldNames = objectNode.fieldNames()

//得到第一层

    while(fieldNames.hasNext){

val field = fieldNames.next()

var node = objectNode.get(field)

val filedFullName =if(parentName.length >0){

parentName+"_"+field

}else{

field

}

if(node.isObject){

parse(objectNode.`with`(field),objectSchema,listSchema,filedFullName)

}else if(node.isArray){

val list:ArrayNode = node.asInstanceOf[ArrayNode]

if(list.size() >0){

//获取第0个解析一下

          //判断list里面是否还有嵌套,如果没有则直接去

          if(false){

//            node = list.get(0)

//            parse(node,objectSchema,listSchema,filedFullName)

          }else{

listSchema += (filedFullName -> list)

}

}else{//不为空则填充默认值

        }

}else{

val dataValue = node.asText()

//        println(filedFullName,dataValue)

        objectSchema.put(filedFullName,dataValue.toString)

//        val dataType = schema.get(filedFullName)

      }

}

}

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容