ExceptionInChainedOperatorException:flink写hbase对于null数据导致数据导致出现异常

使用的flink版本:1.9.1

异常描述

需求:

  1. 从kafka读取一条数据流
  2. 经过filter初次筛选符合要求的数据
  3. 然后通过map进行一次条件判断再解析。这个这个过程中可能返回null或目标输出outData。
  4. 最后将outData通过自定义sink写入hbase。
转换核心代码:
val stream: DataStream[Input] = source.filter(s => (!s.equals(null)) && (s.contains("\"type\":\"type1\"") || s.contains("\"type\":\"type2\"")))//一次过滤
      .map(json => {
        try {
          val recode: JSONObject = JSON.parseObject(json)
          val dataStr: String = recode.getString("data")
          val type = recode.getString("type")
          val data = JSON.parseObject(dataStr)
          var id: String = ""
          type match {
            case "type1" => {
              if (data.getInteger("act") == 2) { //二次过滤
                if (data.getJSONArray("ids").toArray().length > 0)
                  id = recode.getString("id") + "," + data.getJSONArray("ids").toArray().mkString(",")
                else
                  id = recode.getString("id")
                Input( id.reverse,  data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1
              } else null//非目标输出 导致问题的位置  此处给个随便的默认值 只要不是null就不会出问题,但是这样后面操作需要二次过滤-----标记点:2
            }
            case "type2" => {
              if (data.getInteger("act") == 2) { //二次过滤
                id = recode.getString("id")
                Input(id.reverse,  data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1
              } else null //非目标输出 导致问题的位置 此处给个随便的默认值 只要不是null就不会出问题,但是这样后面操作需要二次过滤 ----标记点:2
            }
          }
        } catch {
          case e => {
            e.printStackTrace()
            println("解析json失败: ", json)
            Input("id","sid", "sn", 0l)
          }
        }
      }

      )

    val result: DataStream[Output] = stream.map(s => {
      var rowkey = ""
      s.id.split(",").map(id => rowkey += s"$id${9999999999l - s.ts}|")
      if (rowkey.equals("")) {
        null
      } else {
        Output(rowkey, s.sid, s.sn, s.ts + "")
      }
    })
    
    result.addSink(new CustomSinkToHbase("habse_table", "cf", proInstance)).name("write to hbase").setParallelism(1)
自定义sink核心代码
override def invoke(value: Output, context: SinkFunction.Context[_]): Unit = {
  println(s"on ${new Date}, put $value to hbase  invoke ") //输出标记:1
  try {
    init()
    val puts = new util.ArrayList[Put]()
    value.rowkey.split("\\|").map(s => {
      val rowkey = s
      val put: Put = new Put(Bytes.toBytes(rowkey))
      put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("sid"), Bytes.toBytes(value.sid))
      put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("sn"), Bytes.toBytes(value.sn))
      put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("ts"), Bytes.toBytes(value.ts))
      puts.add(put)
    })
    table.put(puts)
    println(s"on ${new Date}, put $value to hbase  succeese ")//输出标记:2
  } catch {
    case e => {
      e.printStackTrace()
      if (table != null) table.close()
      if (conn != null) conn.close()
    }
  }
}
执行情况

在程序启动后,随着数据流的进入会产生不一样的结果:

  1. 如果数据从未有数据进入标记点2,那么一切正常
  2. 如果如果有数据进入标记点2,说明此时返回的是null,程序会马上报错:ExceptionInChainedOperatorException,后续的数据处理也会失败,程序陷入死循环。

具体表现如下:

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
  at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
  at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
  at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)

问题追踪

在程序报错后在taskmanager日志的表现为错误日志无限循环,web页面的表现为任务的开始时间重置。

辅助输出,确定程序出错位置

通过在hbase中添加辅助输出,结果如下

on Tue Apr 21 18:30:41 CST 2020, put  Output(714114118412528160|,001,张三,1587471839) to hbase  invoke 
on Tue Apr 21 18:30:42 CST 2020, put  Output(714114118412528160|,001,张三,1587471839) to hbase  invoke 
on Tue Apr 21 18:30:44 CST 2020, put  Output(714114118412528160|,001,张三,1587471839) to hbase  invoke 
on Tue Apr 21 18:30:45 CST 2020, put  Output(714114118412528160|,001,张三,1587471839) to hbase  invoke 
on Tue Apr 21 18:30:47 CST 2020, put  Output(714114118412528160|,001,张三,1587471839) to hbase  invoke 
.
.
.
on Tue Apr 21 18:30:45 CST 2020, put  Output(714114118412528160|,001,张三,1587471839) to hbase  invoke 
on Tue Apr 21 18:30:47 CST 2020, put  Output(714114118412528160|,001,张三,1587471839) to hbase  invoke 
//并没有到success这一步

如果数据流d1进入了标记点:2(输出null);
那么后续的数据流d2进入标记点:1(正常输出) ,此时在web页面task-manager stdout的中出现d2在输出标记:1 和输出标记:2(没有输出2的部分)无限循环。
输出标记:2 没有执行 说明没有写hbase。加上错误产生的条件为要有数据进入标记点:2,初步分析是这个null的返回值影响到了后面hbase的操作。


问题解决

无效手段
  1. 写hbase前过滤掉null的值
    val result: DataStream[Output] = stream.map(s => {
      var rowkey = ""
      s.id.split(",").map(id => rowkey += s"$id${9999999999l - s.ts}|")
      if (rowkey.equals("")) {
        null
      } else {
        Output(rowkey, s.sid, s.sn, s.ts + "")
      }
    }).filter(_!=null)//过滤null

经过测试,此方法无效。

有效的手段
  1. 将二次过滤放到一次过滤的位置
 source.filter(s => (!s.equals(null)) && (s.contains("\"type\":\"type1\"") || s.contains("\"type\":\"type2\"")) && (s.contains("\"act\":2"))//提前过滤act=2

问题解决,但是因为业务的问题,act不是通用条件,不具备通用性。当然可以进行了;进行两次filter,但是过于繁琐并且会产生多条数据流。

  1. 将标记点2的null改成默认值,然后通过二次过滤,去除默认值
 type match {
            case "type1" => {
              if (data.getInteger("act") == 2) { //二次过滤
                if (data.getJSONArray("ids").toArray().length > 0)
                  id = recode.getString("id") + "," + data.getJSONArray("ids").toArray().mkString(",")
                else
                  id = recode.getString("id")
                Input( id.reverse,  data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1
              } else Input("id","sid", "sn", 0l)//非目标输出 默认值--标记点:2
            }
            case "type2" => {
              if (data.getInteger("act") == 2) { //二次过滤
                id = recode.getString("id")
                Input(id.reverse,  data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1
              } else Input("id","sid", "sn", 0l) //非目标输出 默认值--标记点:2
            }
          }

问题解决,但是从整体数据量来看,标记点1的数量仅为标记点2数量的六分之一到五分之一之间,此处会做很多无用的json解析。在大数据量的时候还是会对效率的些许影响

  1. 采用侧输出进行数据分流,将一次过滤的通过侧输出拆分,对拆分后的出具进行特定条件的二次过滤,然后进行对应的解析。
 /**
   * 数据流处理
   *
   * @param source
   * @return
   */
  def deal(source: DataStream[String]) = {
    println("数据流处理")
    //拆分数据流
    val splitData: DataStream[String] = splitSource(source)
    //解析type1的
    val type1: DataStream[Input] = getMkc(splitData)

    //解析type2
    val type2: DataStream[Input] = getMss(splitData)

    //合并数据流
    val stream: DataStream[Input] = type1.union(type2)

    //拼接rowkey
    val result: DataStream[Output] = stream.map(s => {
      var rowkey = ""
      s.id.split(",").map(id => rowkey += s"$id${9999999999l - s.ts}|")
      if (rowkey.equals("")) {
        null
      } else {
        Output(rowkey, s.prdct_cd, s.sid, s.sn, s.ts + "")
      }
    })

    //将结果写入hbase
    result.addSink(new CustomSinkToHbase("habse_table", "cf", proInstance)).name("write to hbase").setParallelism(1)

    env.execute("test")
  }

  /**
   *  从侧输出中获取type1的数据,过滤开始演唱数据 .filter(_.contains("\"act\":2"))  进行解析
   * @param splitData
   * @return
   */
  def getMkc(splitData: DataStream[String]): DataStream[Input] = {
    splitData.getSideOutput(new OutputTag[String]("type1"))
      .filter(_.contains("\"act\":2"))
      .map(str => {
        try {
          val recode: JSONObject = JSON.parseObject(str)
          val dataStr: String = recode.getString("data")
          val data = JSON.parseObject(dataStr)
          var id: String = ""
          if (data.getJSONArray("ids").toArray().length > 0)
            id = recode.getString("id") + "," + data.getJSONArray("ids").toArray().mkString(",")
          else
            id = recode.getString("id")
          Input( id.reverse,  data.getString("sid"), data.getString("sn"),  recode.getLong("time") * 1000)
        } catch {
          case e => {
            e.printStackTrace()
            println("解析json失败: ", str)
           Input("id","sid", "sn", 0l)
          }
        }
      }
      )
  }

  /**
   * 从侧输出中获取type2的数据,过滤开始演唱数据 .filter(_.contains("\"act\":2"))  进行解析
   * @param splitData
   * @return
   */
  def getMss(splitData: DataStream[String]): DataStream[Input] = {
    splitData.getSideOutput(new OutputTag[String]("type2"))
      .filter(_.contains("\"act\":2"))
      .map(str => {
        try {
          val recode: JSONObject = JSON.parseObject(str)
          val dataStr: String = recode.getString("data")
          val data = JSON.parseObject(dataStr)
          var id: String = ""
          id = recode.getString("id")
          Input(id.reverse,  data.getString("sid"), data.getString("sn"),  recode.getLong("time") * 1000)
        } catch {
          case e => {
            e.printStackTrace()
            println("解析json失败: ", str)
            Input("id","sid", "sn", 0l)
          }
        }
      }
      )
  }

  /**
   * 使用侧输出切分数据流
   * @param source
   * @return
   */
  def splitSource(source: DataStream[String]) = {
    source.process(new ProcessFunction[String, String] {
      override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
        value match {
          case value if value.contains("\"type\":\"type1\"") => ctx.output(new OutputTag[String]("type1"), value)
          case value if value.contains("\"type\":\"type2\"") => ctx.output(new OutputTag[String]("type2"), value)
          case _ => out.collect(value)
        }
      }
    })
  }

问题解决,对比1的好处是,侧输出的时候,数据流还是只有一个,只是给数据打了一个标签,并且对可后期业务的扩展很友好。


总结

其实虽然问题解决了,但是具体问题出现的原理并没有整理明白。
目前猜测是null的输出类型对后续的输入类型有影响,但是具体的影响怎么发生,估计得抽空研究源码才能知道了。后续有结果再更

本文为原创文章,转载请注明出处!!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容

  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,263评论 0 34
  • 一、简介 Hbase:全名Hadoop DataBase,是一种开源的,可伸缩的,严格一致性(并非最终一致性)的分...
    菜鸟小玄阅读 2,376评论 0 12
  • 摘要前面一篇文章介绍了Kafka的具体内容,今天讲述一下HBase相关的知识。首先HBase作为大数据发展初期伴随...
    ThoughtWorks阅读 250评论 0 1
  • 一、编程规约 (一)命名规约 【强制】 代码中的命名均不能以下划线或美元符号开始,也不能以下划线或美元符号结束。反...
    喝咖啡的蚂蚁阅读 1,499评论 0 2
  • 阿里巴巴 JAVA 开发手册 1 / 32 Java 开发手册 版本号 制定团队 更新日期 备 注 1.0.0 阿...
    糖宝_阅读 7,544评论 0 5