02.flink实时数据管理-cdc捕获ETL作业进度 + join广播变量

引言

需求背景: ETL离线作业,需要实时监控运行状况,由于调度工具是Azkaban,故同步获取其后台配置库Mysql;本文为第二步:flink-cdc监控execution_jobs表,实时获取作业运行状况,关联第一步处理的projects广播变量,组装信息—— projectName、flowName、jobName、status、startTime、endTime

功能部件

flink-cdc-connector + Mysql + flink Table Api

实现逻辑

  1. 利用flink-connector,实现cdc元数据库execution_jobs表,获取作业实时运行情况;
  2. 筛选指定的字段,将TableStream转化为DataStream,并封装样例类;
  3. join广播变量,获取projectName;

前提准备

  1. 确保Mysql库,开启binlong并设置为ROW格式; (off:关闭; on:开启)
SHOW VARIABLES LIKE 'log_bin';
  1. 创建cdc对应的用户
-- create user
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'password';
-- grant user
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc' IDENTIFIED BY 'password';
-- flush
FLUSH PRIVILEGES;

实现Demo

  1. 创建结果样例类
object Domain {

case class ExecutionJobs(
                         project_node: String,
                         project_id: String,
                         flow_name: String,
                         job_name: String,
                         active_status: String,
                         create_time: String,
                         update_time: String,
                         event_time: String
                         )
  
case class ProjectsClass(
                          project_id: String,
                          project_name: String,
                          project_desc: String,
                          create_time: String,
                          update_time: String
                        )

case class UpsertNeo4j(
                        project_node: String,
                        project_name: String,
                        flow_name: String,
                        job_name: String,
                        active_status: String,
                        create_time: String,
                        update_time: String
                      )
                        
                
                         
}
  1. flink-cdc实现 TableApi

import com.haierubic.bigdata.dataflow.models.Domain.ExecutionJobs
import com.haierubic.bigdata.dataflow.models.Sentence
import com.haierubic.bigdata.dataflow.utils.ConfigParse
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._

/**
 * flink-cdc azkaban-mysql 元数据库获取工程运行情况
 * created by moun
 *
 * @FAQ:
 *   1. 使用flink-cdc-mysql 1.1.0版本;
 *   2. flink-cdc-mysql: scan.startup.mode 参数不起作用;  --替换为:'debezium.snapshot.mode' = 'schema_only'
 */

object ChangeDataCapture extends Sentence {

  def genUpdateNeo4jInfo(tEnv: StreamTableEnvironment) = {
    val azkHost: String = ConfigParse.getString("db.azkaban.host")
    val azkPort: Int = ConfigParse.getInt("db.azkaban.port")
    val azkUserName: String = ConfigParse.getString("db.azkaban.username")
    val azkPassword: String = ConfigParse.getString("db.azkaban.password")
    val azkDBName: String = ConfigParse.getString("db.azkaban.dbname")
    val cdcTabExecutionJobs: String = ConfigParse.getString("cdc.azkaban_run_jobs.tablename")

//    val url = ConfigParse.getString("db.neo4j.commitUrl")
    val project_node = ConfigParse.getString("kg.neo4j.projectNode")


    // cdc execution_jobs 表,获取工程作业运行记录
    tEnv.executeSql(
      s"""
         |create table execution_jobs (
         |  exec_id int NOT NULL,
         |  project_id string NOT NULL,
         |  version int NOT NULL,
         |  flow_id string NOT NULL,
         |  job_id string NOT NULL,
         |  attempt int NOT NULL,
         |  start_time bigint NULL,
         |  end_time bigint NULL,
         |  status string NULL,
         |  input_params string,
         |  output_params string,
         |  attachments string
         |) with (
         |  'connector' = 'mysql-cdc',
         |  'hostname' = '${azkHost}',
         |  'port' = '${azkPort}',
         |  'username' = '${azkUserName}',
         |  'password' = '${azkPassword}',
         |  'database-name' = '${azkDBName}',
         |  'table-name' = '${cdcTabExecutionJobs}',
         |  'debezium.snapshot.mode' = 'schema_only'
         |)
         |""".stripMargin
    )


    val update_info = tEnv.sqlQuery(
      s"""
         |select
         |  '${project_node}' as project_node,
         |  project_id,
         |  flow_id,
         |  job_id,
         |  status as active_status,
         |  from_unixtime(start_time/1000,'yyyy-MM-dd HH:mm:ss') as create_time,
         |  from_unixtime(end_time/1000,'yyyy-MM-dd HH:mm:ss') as update_time,
         |  cast((case when end_time = -1 then start_time else end_time end) as varchar) as event_time
         |from execution_jobs
         |""".stripMargin)


    tEnv.toRetractStream[ExecutionJobs](update_info)
  }

  def main(args: Array[String]): Unit = {
    println("cdcMysql~~")
  }

  
}
  1. join广播变量获取projectName
  • 自定义BroadcastProcessFunction 实现join关联逻辑

import com.haierubic.bigdata.dataflow.models.Domain.{ExecutionJobs, ProjectsClass, UpsertNeo4j}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector

class CustomBroadcastProcessFunction extends BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j] {

  // 建立MapStateDescriptor
  val projectsDimDesc = new MapStateDescriptor(
    "projects",
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO
  )

  override def processElement(
                               in1: ExecutionJobs,
                               readOnlyContext: BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j]#ReadOnlyContext,
                               collector: Collector[UpsertNeo4j]): Unit = {
    // 1. 从value中获取 project_id
    val project_id = in1.project_id

    val project_name = readOnlyContext.getBroadcastState(projectsDimDesc).get(project_id)

    // 可以查询到对应的工程名称,则输出,未查到则跳过
    if (project_name != null) {
      // 2. 封装collector 类型
      val result = UpsertNeo4j(in1.project_node, project_name, in1.flow_name, in1.job_name, in1.active_status, in1.create_time, in1.update_time)

      collector.collect(result)
    }

  }

  override def processBroadcastElement(
                                        in2: ProjectsClass,
                                        context: BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j]#Context,
                                        collector: Collector[UpsertNeo4j]): Unit = {
    // 获取广播状态
    val broadcastState = context.getBroadcastState(projectsDimDesc)

    // 清空广播状态
//    broadcastState.clear()

    // 更新广播状态
    broadcastState.put(in2.project_id, in2.project_name)
  }
}
  • 将cdc结果与广播变量join组合新的DataStream
object DataFlowMonitor extends Sentence {
  def main(args: Array[String]): Unit = {


    // flink env
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置时间语义为 事件时间Event Time
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.setParallelism(1)

    // flink table settings
    val tSettings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    // flink table env
    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tSettings)

    // Step1: cdc-azkaban-execution_jobs, get project running status
    val updateNeo4jSteam = ChangeDataCapture.genUpdateNeo4jInfo(tEnv)
    // filter status = 'insert'
    val filterUpdateNeo4jStream = updateNeo4jSteam.filter(_._1).map(_._2)
    // Step2: broadcast azkaban-projects, get project_name
    val projectsDataStream = QueryProjects.getProjects(env)

    // Step3: join broadcast, replace project_name
    val result = filterUpdateNeo4jStream.connect(projectsDataStream).process(
      new CustomBroadcastProcessFunction()  // 自定义广播变量函数
    )
    
}   
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容