引言
需求背景: ETL离线作业,需要实时监控运行状况,由于调度工具是Azkaban,故同步获取其后台配置库Mysql;本文为第二步:flink-cdc监控execution_jobs表,实时获取作业运行状况,关联第一步处理的projects广播变量,组装信息—— projectName、flowName、jobName、status、startTime、endTime
功能部件
flink-cdc-connector + Mysql + flink Table Api
实现逻辑
- 利用flink-connector,实现cdc元数据库execution_jobs表,获取作业实时运行情况;
- 筛选指定的字段,将TableStream转化为DataStream,并封装样例类;
- join广播变量,获取projectName;
前提准备
- 确保Mysql库,开启binlong并设置为ROW格式; (off:关闭; on:开启)
SHOW VARIABLES LIKE 'log_bin';
- 创建cdc对应的用户
-- create user
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'password';
-- grant user
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc' IDENTIFIED BY 'password';
-- flush
FLUSH PRIVILEGES;
实现Demo
- 创建结果样例类
object Domain {
case class ExecutionJobs(
project_node: String,
project_id: String,
flow_name: String,
job_name: String,
active_status: String,
create_time: String,
update_time: String,
event_time: String
)
case class ProjectsClass(
project_id: String,
project_name: String,
project_desc: String,
create_time: String,
update_time: String
)
case class UpsertNeo4j(
project_node: String,
project_name: String,
flow_name: String,
job_name: String,
active_status: String,
create_time: String,
update_time: String
)
}
- flink-cdc实现 TableApi
import com.haierubic.bigdata.dataflow.models.Domain.ExecutionJobs
import com.haierubic.bigdata.dataflow.models.Sentence
import com.haierubic.bigdata.dataflow.utils.ConfigParse
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
/**
* flink-cdc azkaban-mysql 元数据库获取工程运行情况
* created by moun
*
* @FAQ:
* 1. 使用flink-cdc-mysql 1.1.0版本;
* 2. flink-cdc-mysql: scan.startup.mode 参数不起作用; --替换为:'debezium.snapshot.mode' = 'schema_only'
*/
object ChangeDataCapture extends Sentence {
def genUpdateNeo4jInfo(tEnv: StreamTableEnvironment) = {
val azkHost: String = ConfigParse.getString("db.azkaban.host")
val azkPort: Int = ConfigParse.getInt("db.azkaban.port")
val azkUserName: String = ConfigParse.getString("db.azkaban.username")
val azkPassword: String = ConfigParse.getString("db.azkaban.password")
val azkDBName: String = ConfigParse.getString("db.azkaban.dbname")
val cdcTabExecutionJobs: String = ConfigParse.getString("cdc.azkaban_run_jobs.tablename")
// val url = ConfigParse.getString("db.neo4j.commitUrl")
val project_node = ConfigParse.getString("kg.neo4j.projectNode")
// cdc execution_jobs 表,获取工程作业运行记录
tEnv.executeSql(
s"""
|create table execution_jobs (
| exec_id int NOT NULL,
| project_id string NOT NULL,
| version int NOT NULL,
| flow_id string NOT NULL,
| job_id string NOT NULL,
| attempt int NOT NULL,
| start_time bigint NULL,
| end_time bigint NULL,
| status string NULL,
| input_params string,
| output_params string,
| attachments string
|) with (
| 'connector' = 'mysql-cdc',
| 'hostname' = '${azkHost}',
| 'port' = '${azkPort}',
| 'username' = '${azkUserName}',
| 'password' = '${azkPassword}',
| 'database-name' = '${azkDBName}',
| 'table-name' = '${cdcTabExecutionJobs}',
| 'debezium.snapshot.mode' = 'schema_only'
|)
|""".stripMargin
)
val update_info = tEnv.sqlQuery(
s"""
|select
| '${project_node}' as project_node,
| project_id,
| flow_id,
| job_id,
| status as active_status,
| from_unixtime(start_time/1000,'yyyy-MM-dd HH:mm:ss') as create_time,
| from_unixtime(end_time/1000,'yyyy-MM-dd HH:mm:ss') as update_time,
| cast((case when end_time = -1 then start_time else end_time end) as varchar) as event_time
|from execution_jobs
|""".stripMargin)
tEnv.toRetractStream[ExecutionJobs](update_info)
}
def main(args: Array[String]): Unit = {
println("cdcMysql~~")
}
}
- join广播变量获取projectName
- 自定义BroadcastProcessFunction 实现join关联逻辑
import com.haierubic.bigdata.dataflow.models.Domain.{ExecutionJobs, ProjectsClass, UpsertNeo4j}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
class CustomBroadcastProcessFunction extends BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j] {
// 建立MapStateDescriptor
val projectsDimDesc = new MapStateDescriptor(
"projects",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
)
override def processElement(
in1: ExecutionJobs,
readOnlyContext: BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j]#ReadOnlyContext,
collector: Collector[UpsertNeo4j]): Unit = {
// 1. 从value中获取 project_id
val project_id = in1.project_id
val project_name = readOnlyContext.getBroadcastState(projectsDimDesc).get(project_id)
// 可以查询到对应的工程名称,则输出,未查到则跳过
if (project_name != null) {
// 2. 封装collector 类型
val result = UpsertNeo4j(in1.project_node, project_name, in1.flow_name, in1.job_name, in1.active_status, in1.create_time, in1.update_time)
collector.collect(result)
}
}
override def processBroadcastElement(
in2: ProjectsClass,
context: BroadcastProcessFunction[ExecutionJobs, ProjectsClass, UpsertNeo4j]#Context,
collector: Collector[UpsertNeo4j]): Unit = {
// 获取广播状态
val broadcastState = context.getBroadcastState(projectsDimDesc)
// 清空广播状态
// broadcastState.clear()
// 更新广播状态
broadcastState.put(in2.project_id, in2.project_name)
}
}
- 将cdc结果与广播变量join组合新的DataStream
object DataFlowMonitor extends Sentence {
def main(args: Array[String]): Unit = {
// flink env
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置时间语义为 事件时间Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// flink table settings
val tSettings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
// flink table env
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tSettings)
// Step1: cdc-azkaban-execution_jobs, get project running status
val updateNeo4jSteam = ChangeDataCapture.genUpdateNeo4jInfo(tEnv)
// filter status = 'insert'
val filterUpdateNeo4jStream = updateNeo4jSteam.filter(_._1).map(_._2)
// Step2: broadcast azkaban-projects, get project_name
val projectsDataStream = QueryProjects.getProjects(env)
// Step3: join broadcast, replace project_name
val result = filterUpdateNeo4jStream.connect(projectsDataStream).process(
new CustomBroadcastProcessFunction() // 自定义广播变量函数
)
}