聊聊flink Table的Joins

本文主要研究一下flink Table的Joins

实例

Inner Join

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.join(right).where("a = d").select("a, b, e");
  • join方法即inner join

Outer Join

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");

Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
  • outer join分为leftOuterJoin、rightOuterJoin、fullOuterJoin三种

Time-windowed Join

Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");

Table result = left.join(right)
  .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
  .select("a, b, e, ltime");
  • time-windowed join需要至少一个等值条件,然后还需要一个与两边时间相关的条件(可以使用<, <=, >=, >)

Inner Join with Table Function

// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
    .join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
    .select("a, b, s, t, v");
  • Table也可以跟table function进行inner join,如果table function返回空,则table的记录被丢弃

Left Outer Join with Table Function

// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
    .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
    .select("a, b, s, t, v");
  • Table也可以跟table function进行left outer join,如果table function返回空,则table的记录保留,空的部分为null值

Join with Temporal Table

Table ratesHistory = tableEnv.scan("RatesHistory");

// register temporal table function with a time attribute and primary key
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
    "r_proctime",
    "r_currency");
tableEnv.registerFunction("rates", rates);

// join with "Orders" based on the time attribute and key
Table orders = tableEnv.scan("Orders");
Table result = orders
    .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
  • Table也可以跟Temporal tables进行join,Temporal tables通过Table的createTemporalTableFunction而来,目前仅仅支持inner join的方式

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {
  //......

  def join(right: Table): Table = {
    join(right, None, JoinType.INNER)
  }

  def join(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.INNER)
  }

  def join(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.INNER)
  }

  def leftOuterJoin(right: Table): Table = {
    join(right, None, JoinType.LEFT_OUTER)
  }

  def leftOuterJoin(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.LEFT_OUTER)
  }

  def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
  }

  def rightOuterJoin(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.RIGHT_OUTER)
  }

  def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
  }

  def fullOuterJoin(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.FULL_OUTER)
  }

  def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.FULL_OUTER)
  }

  private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
    val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
    join(right, Some(joinPredicateExpr), joinType)
  }

  private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {

    // check if we join with a table or a table function
    if (!containsUnboundedUDTFCall(right.logicalPlan)) {
      // regular table-table join

      // check that the TableEnvironment of right table is not null
      // and right table belongs to the same TableEnvironment
      if (right.tableEnv != this.tableEnv) {
        throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
      }

      new Table(
        tableEnv,
        Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
          .validate(tableEnv))

    } else {
      // join with a table function

      // check join type
      if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
        throw new ValidationException(
          "TableFunctions are currently supported for join and leftOuterJoin.")
      }

      val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
      val udtfCall = LogicalTableFunctionCall(
        udtf.functionName,
        udtf.tableFunction,
        udtf.parameters,
        udtf.resultType,
        udtf.fieldNames,
        this.logicalPlan
      ).validate(tableEnv)

      new Table(
        tableEnv,
        Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true)
          .validate(tableEnv))
    }
  }

  //......
}
  • Table定义了join、leftOuterJoin、rightOuterJoin、fullOuterJoin方法,其最后都是调用的私有的join方法,其中JoinType用于表达join类型,分别有INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER这几种;另外接收String类型或者Expression的条件表达式,其中String类型最后是被解析为Expression类型;join方法最后是使用Join创建了新的Table

Join

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Join(
    left: LogicalNode,
    right: LogicalNode,
    joinType: JoinType,
    condition: Option[Expression],
    correlated: Boolean) extends BinaryNode {

  override def output: Seq[Attribute] = {
    left.output ++ right.output
  }

  private case class JoinFieldReference(
    name: String,
    resultType: TypeInformation[_],
    left: LogicalNode,
    right: LogicalNode) extends Attribute {

    val isFromLeftInput: Boolean = left.output.map(_.name).contains(name)

    val (indexInInput, indexInJoin) = if (isFromLeftInput) {
      val indexInLeft = left.output.map(_.name).indexOf(name)
      (indexInLeft, indexInLeft)
    } else {
      val indexInRight = right.output.map(_.name).indexOf(name)
      (indexInRight, indexInRight + left.output.length)
    }

    override def toString = s"'$name"

    override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
      // look up type of field
      val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType
      // create a new RexInputRef with index offset
      new RexInputRef(indexInJoin, fieldType)
    }

    override def withName(newName: String): Attribute = {
      if (newName == name) {
        this
      } else {
        JoinFieldReference(newName, resultType, left, right)
      }
    }
  }

  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
    val node = super.resolveExpressions(tableEnv).asInstanceOf[Join]
    val partialFunction: PartialFunction[Expression, Expression] = {
      case field: ResolvedFieldReference => JoinFieldReference(
        field.name,
        field.resultType,
        left,
        right)
    }
    val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
    Join(node.left, node.right, node.joinType, resolvedCondition, correlated)
  }

  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    left.construct(relBuilder)
    right.construct(relBuilder)

    val corSet = mutable.Set[CorrelationId]()
    if (correlated) {
      corSet += relBuilder.peek().getCluster.createCorrel()
    }

    relBuilder.join(
      convertJoinType(joinType),
      condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
      corSet.asJava)
  }

  private def convertJoinType(joinType: JoinType) = joinType match {
    case JoinType.INNER => JoinRelType.INNER
    case JoinType.LEFT_OUTER => JoinRelType.LEFT
    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
    case JoinType.FULL_OUTER => JoinRelType.FULL
  }

  private def ambiguousName: Set[String] =
    left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)

  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
    if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
      failValidation(s"Filter operator requires a boolean expression as input, " +
        s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}")
    } else if (ambiguousName.nonEmpty) {
      failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
    }

    resolvedJoin.condition.foreach(testJoinCondition)
    resolvedJoin
  }

  private def testJoinCondition(expression: Expression): Unit = {

    def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
      case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
        if x.isFromLeftInput != y.isFromLeftInput => true
      case _ => false
    }

    def checkIfFilterCondition(exp: BinaryComparison) = exp.children match {
      case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false
      case (x: JoinFieldReference) :: (_) :: Nil => true
      case (_) :: (y: JoinFieldReference) :: Nil => true
      case _ => false
    }

    var equiJoinPredicateFound = false
    // Whether the predicate is literal true.
    val alwaysTrue = expression match {
      case x: Literal if x.value.equals(true) => true
      case _ => false
    }

    def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
      case x: And => x.children.foreach(validateConditions(_, isAndBranch))
      case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
      case x: EqualTo =>
        if (isAndBranch && checkIfJoinCondition(x)) {
          equiJoinPredicateFound = true
        }
      case x: BinaryComparison =>
      // The boolean literal should be a valid condition type.
      case x: Literal if x.resultType == Types.BOOLEAN =>
      case x => failValidation(
        s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
    }

    validateConditions(expression, isAndBranch = true)

    // Due to a bug in Apache Calcite (see CALCITE-2004 and FLINK-7865) we cannot accept join
    // predicates except literal true for TableFunction left outer join.
    if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) {
      if (!alwaysTrue) failValidation("TableFunction left outer join predicate can only be " +
        "empty or literal true.")
    } else {
      if (!equiJoinPredicateFound) {
        failValidation(
          s"Invalid join condition: $expression. At least one equi-join predicate is " +
            s"required.")
      }
    }
  }
}
  • Join继承了BinaryNode,它内部将flink的JoinType转为calcite的JoinRelType类型,construct方法通过relBuilder.join来构建join关系

小结

  • Table支持多种形式的join,其中包括Inner Join、Outer Join、Time-windowed Join、Inner Join with Table Function、Left Outer Join with Table Function、Join with Temporal Table
  • Table定义了join、leftOuterJoin、rightOuterJoin、fullOuterJoin方法,其最后都是调用的私有的join方法,其中JoinType用于表达join类型,分别有INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER这几种;另外接收String类型或者Expression的条件表达式,其中String类型最后是被解析为Expression类型;join方法最后是使用Join创建了新的Table
  • Join继承了BinaryNode,它内部将flink的JoinType转为calcite的JoinRelType类型,construct方法通过relBuilder.join来构建join关系

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,442评论 0 13
  • Lua 5.1 参考手册 by Roberto Ierusalimschy, Luiz Henrique de F...
    苏黎九歌阅读 13,727评论 0 38
  • 听着民谣,越来越享受现在过的自由生活。并非无忧无虑,却也多了份洒脱。学生时代就想过,长大后会从事怎么样的工作,过上...
    夏了微晴阅读 184评论 0 0
  • 刘琳焦点讲师班坚持第546天分享(2018/11/29) 为什么我们说的话孩子听不进去?为什么孩子不理我们...
    小溪与大海阅读 362评论 0 2
  • 第八期训练营第二周day2 《博韦商务沟通》,270页 为了拿出有说服力的分析性报告,在选择最有效的组织策略之前,...
    平欣阅读 218评论 0 0