自从Flink出了FlinkCDC之后,我们对数据库日志的采集就变得方便了许多了,除去了MaxWell、Cannel、OGG等第三方组件的繁琐配置,目前实现CDC有两种方式:HQL实现 和 DataStreamAPI实现(推荐)。
想更深入的了解CDC可以通过此链接进行学习:
1. 文档 -> https://ververica.github.io/flink-cdc-connectors/master/
2. 项目 -> https://github.com/ververica/flink-cdc-connectors
废话不多说,今天我们就使用 FlinkCDC 对业务数据进行动态的分流的实现。
一、动态分流
由于FlinkCDC是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
二、实现流程图
从图中我们可以看出,把分好的流保存到对应表、主题中:
1)业务数据保存到Kafka的主题中
2)维度数据保存到HBase的表中
三、代码实现
3.1)引入 pom.xml 主要的依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
<!-- 如果保存检查点到 hdfs上,需要引入此依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--phoenix-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.7.0-HBase-1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
3.2)主要逻辑代码
public static void main(String[] args) throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 设置CK&状态后端
//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
//env.enableCheckpointing(5000L);
//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//TODO 2.消费Kafka ods_base_db 主题数据创建流
String sourceTopic = "ods_base_db";
String groupId = "base_db_app_211212";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.将每行数据转换为JSON对象并过滤(delete) 主流
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
//取出数据的操作类型
String type = value.getString("type");
return !"delete".equals(type);
}
});
//TODO 4.使用FlinkCDC消费配置表并处理成 广播流
SourceFunction<String> mySqlSource = MySqlSource.<String>builder()
.hostname("your hostname")
.port(3306)
.databaseList("lcy_db") // monitor all tables under inventory database
.tableList("lcy_db.table_process")
.username("luchangyin")
.password("123456")
.deserializer(new MyStringDeserializationSchema()) // 自定义:converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial()) // .initial() latest
.build();
DataStream<String> tbProcessDStream = env.addSource(mySqlSource).name("source-cdc-table_process");
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = tbProcessDStream.broadcast(mapStateDescriptor);
//TODO 5.连接主流和广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6.分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {};
SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new MyTableProcessFunction(hbaseTag, mapStateDescriptor));
//TODO 7.提取Kafka流数据和HBase流数据
DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
//TODO 8.将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表
kafka.print("Kafka >>>>>>>>");
hbase.print("HBase >>>>>>>");
// ------- 入 HBASE 和 Kafka 的自定义函数自个去实现,这里不做讲述 ---------
// hbase.addSink(new DimSinkFunction());
// kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
// @Override
// public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long ts) {
// return new ProducerRecord<>(jsonObject.getString("sinkTable"),jsonObject.getString("after").getBytes());
// }
// }));
//TODO 9.启动任务
env.execute("BaseDBApp");
}
3.3)自定义反序列化类(这里接触过FlinkCDC都知道,接受过来的日志数据格式非常多,因此我们需要自定义获取我们需要的具体通用数据),在 utils 包下:
public class MyStringDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//构建结果对象
JSONObject result = new JSONObject();
//获取数据库名称&表名称
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
//获取数据
Struct value = (Struct) sourceRecord.value();
//After
Struct after = value.getStruct("after");
JSONObject data = new JSONObject();
if (after != null) { //delete数据,则after为null
Schema schema = after.schema();
List<Field> fieldList = schema.fields();
for (int i = 0; i < fieldList.size(); i++) {
Field field = fieldList.get(i);
Object fieldValue = after.get(field);
data.put(field.name(), fieldValue);
}
}
//Before
Struct before = value.getStruct("before");
JSONObject beforeData = new JSONObject();
if (before != null) { //delete数据,则after为null
Schema schema = before.schema();
List<Field> fieldList = schema.fields();
for (int i = 0; i < fieldList.size(); i++) {
Field field = fieldList.get(i);
Object fieldValue = before.get(field);
beforeData.put(field.name(), fieldValue);
}
}
//获取时间
// long ts = Long.parseLong(value.get("ts_ms").toString());
// String dt = MyDateUtils.getTimestamp2Fm(ts);
//获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//封装数据
result.put("database", database);
result.put("tableName", tableName);
result.put("after", data);
result.put("before", beforeData);
result.put("type", type);
// result.put("ts", ts);
// result.put("dt", dt);
//输出封装好的数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
3.4)创建实体类 TableProcess (对应我们的通用配置表):
@Data
public class TableProcess {
//动态分流Sink常量
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//来源表
String sourceTable;
//操作类型 insert,update,delete
String operateType;
//输出类型 hbase kafka
String sinkType;
//输出表(主题)
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
3.5)创建合并流的处理类 MyTableProcessFunction :
public class MyTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private OutputTag<JSONObject> objectOutputTag;
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
private Connection connection;
public MyTableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.objectOutputTag = objectOutputTag;
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void open(Configuration parameters) {
try {
// System.out.println("凤凰驱动:"+ ShoppingConfig.PHOENIX_DRIVER +" , 凤凰服务:"+ ShoppingConfig.PHOENIX_SERVER);
Class.forName(ShoppingConfig.PHOENIX_DRIVER);
connection = DriverManager.getConnection(ShoppingConfig.PHOENIX_SERVER);
// System.out.println("链接凤凰的对象为 -> "+ connection);
} catch (Exception e) {
System.out.println("链接凤凰失败-> "+ e.toString());
e.printStackTrace();
}
}
@Override
public void processBroadcastElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
//TODO 1.获取并解析数据
JSONObject jsonObject = JSON.parseObject(s);
String data = jsonObject.getString("after");
TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
//TODO 2.建表
if(TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())){
checkTable(tableProcess.getSinkTable(),
tableProcess.getSinkColumns(),
tableProcess.getSinkPk(),
tableProcess.getSinkExtend());
}
//TODO 3.写入状态,广播出去
BroadcastState<String,TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
String key = tableProcess.getSourceTable() +"-"+ tableProcess.getOperateType();
broadcastState.put(key, tableProcess);
}
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
//TODO 1.获取状态数据
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String key = value.getString("tableName") +"-"+ value.getString("type");
TableProcess tableProcess = broadcastState.get(key);
if (tableProcess != null){
//TODO 2.过滤字段
filterColumn(value.getJSONObject("after"), tableProcess.getSinkColumns());
//TODO 3.分流
//将输出表/主题信息写入Value
value.put("sinkTable",tableProcess.getSinkTable());
String sinkType = tableProcess.getSinkType();
if(TableProcess.SINK_TYPE_KAFKA.equals(sinkType)){
//Kafka数据,写入主流
out.collect(value);
}else if(TableProcess.SINK_TYPE_HBASE.equals(sinkType)){
//HBase数据,写入侧输出流
ctx.output(objectOutputTag, value);
}
}else {
System.out.println("该组合key:"+ key +"不存在!");
}
}
private void filterColumn(JSONObject data, String sinkColumns) {
String[] fields = sinkColumns.split(",");
List<String> columns = Arrays.asList(fields);
data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
}
//建表语句 : create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx;
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try{
if (sinkPk == null){
sinkPk = "id";
}
if(sinkExtend == null){
sinkExtend = "";
}
StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
.append(ShoppingConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append("(");
String[] fields = sinkColumns.split(",");
for(int i = 0; i < fields.length; i++){
String field = fields[i];
//判断是否为主键
if (sinkPk.equals(field)){
createTableSQL.append(field).append(" varchar primary key ");
}else{
createTableSQL.append(field).append(" varchar ");
}
//判断是否为最后一个字段,如果不是,则添加","
if (i < fields.length - 1){
createTableSQL.append(",");
}
}
createTableSQL.append(")").append(sinkExtend);
//打印建表语句
System.out.println(createTableSQL);
//预编译
preparedStatement = connection.prepareStatement(createTableSQL.toString());
//执行
preparedStatement.execute();
}catch (SQLException e){
throw new RuntimeException("Phoenix表"+ sinkTable +"建表失败!");
}finally {
if (preparedStatement != null){
try{
preparedStatement.close();
}catch (SQLException e){
e.printStackTrace();
}
}
}
}
}
3.6)在MySQL中创建我们的配置表
-- 配置表
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表 ',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表 (主题 )',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段 ',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段 ',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展 ',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('order_detail','insert','hbase','order_detail','id,order_id,sku_name,sku_num,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('user_info','read','hbase','user_info','id,name,phone_num,user_level,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('base_attr_info','read','kafka','base_attr_info','id,attr_name,category_id');
select * from table_process;
四、联调测试
4.1)部分kafka源数据:
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"钟表","id":10},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"鞋靴","id":11},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"母婴","id":12},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"礼品箱包","id":13},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"食品饮料、保健食品","id":14},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"珠宝","id":15},"type":"read","tableName":"base_category1"}
4.2)配置表部分数据:
配置表的数据 : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"kafka","sink_table":"base_attr_info","source_table":"base_attr_info","sink_columns":"id,attr_name,category_id"},"type":"read","tableName":"table_process"}
配置表的数据 : > {"database":"lcy_db","before":{},"after":{"operate_type":"insert","sink_type":"hbase","sink_table":"order_detail","source_table":"order_detail","sink_columns":"id,order_id,sku_name,sku_num,create_time"},"type":"read","tableName":"table_process"}
配置表的数据 : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"hbase","sink_table":"user_info","source_table":"user_info","sink_columns":"id,name,phone_num,user_level,create_time"},"type":"read","tableName":"table_process"}
4.3)分流结果数据:
# 入hbase的维表数据
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"时心邦","user_level":"1","phone_num":"13794339138","id":3998},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"宋厚庆","user_level":"1","phone_num":"13274778653","id":3999},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"康素云","user_level":"1","phone_num":"13739911376","id":4000},"type":"read","tableName":"user_info"}
# 入kafka的事实表数据
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"香水彩妆","id":112},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"面部护肤","id":113},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":473,"attr_name":"香调","id":114},"type":"read","tableName":"base_attr_info"}
五、总结
到这里我们已经完成了动态分流的主题实现,之前要是没接触过,说容易也不容易,但是说难也不是很难;之所以总结这个动态分流主要是在项目中还是挺重要的,毕竟原业务系统日志数据过来之后会统一放在同一个topic中,即使你在代码中使用判断有多少个业务表然后在发不作业也行,不过这样的弊端是如果源业务系统有新增表的话必须要添加判断然后再重新发布作业,这样是不利于我们在生产上的操作的,那么我们的动态分流技术就可以很好的避免了此类的弊端,如果使用了动态分流,那么如果业务表中有新增数据,我们只需要在配置表中添加新表的信息即可,即我们只需要维护这个配置表即可,程序不需要动,这样大大的提升了开发成本和维护效率。over了。。。