1. 版本说明
组件 | 版本 |
---|---|
hudi | 10.0 |
flink | 13.5 |
hive | 3.1.0 |
2. 实现效果 通过flink cdc 整合 hudi 到hive
flink cdc 讲解
flink cdc 1.2实例
flink cdc 2.0 实例
3.flink 需要的jar 包
需要的包:flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root 7802399 2月 16 00:36 doris-flink-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 249571 2月 16 00:36 flink-connector-jdbc_2.12-1.13.5.jar
-rw-r--r-- 1 root root 359138 2月 16 00:36 flink-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root 30087268 2月 17 22:12 flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root 92315 2月 16 00:36 flink-csv-1.13.5.jar
-rw-r--r-- 1 root root 106535830 2月 16 00:36 flink-dist_2.12-1.13.5.jar
-rw-r--r-- 1 root root 148127 2月 16 00:36 flink-json-1.13.5.jar
-rw-r--r-- 1 root root 43317025 2月 16 00:36 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 root root 7709740 2月 16 00:36 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root 3674116 2月 16 00:36 flink-sql-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root 35051557 2月 16 00:35 flink-table_2.12-1.13.5.jar
-rw-r--r-- 1 root root 38613344 2月 16 00:36 flink-table-blink_2.12-1.13.5.jar
-rw-r--r-- 1 root root 62447468 2月 16 00:36 hudi-flink-bundle_2.12-0.10.0.jar
-rw-r--r-- 1 root root 17276348 2月 16 00:36 hudi-hadoop-mr-bundle-0.10.0.jar
-rw-r--r-- 1 root root 207909 2月 16 00:36 log4j-1.2-api-2.16.0.jar
-rw-r--r-- 1 root root 301892 2月 16 00:36 log4j-api-2.16.0.jar
-rw-r--r-- 1 root root 1789565 2月 16 00:36 log4j-core-2.16.0.jar
-rw-r--r-- 1 root root 24258 2月 16 00:36 log4j-slf4j-impl-2.16.0.jar
-rw-r--r-- 1 root root 724213 2月 16 00:36 mysql-connector-java-5.1.9.jar
[root@node01 lib]# pwd
/opt/module/flink/flink-1.13.5/lib
[root@node01 lib]#
4. 实现功能场景
5. 实现步骤
1.创建数据库表,并且配置binlog 文件
2.在flinksql 中创建flink cdc 表
3.创建视图
4.创建输出表,关联Hudi表,并且自动同步到Hive表
5.查询视图数据,插入到输出表 -- flink 后台实时执行
5.1 开启mysql binlog
server-id=162
log-bin=mysql-bin
#sync-binlog=1
# 指定不同步的库
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
binlog-ignore-db=mysql
binlog_format=ROW
expire_logs_days=30
binlog_row_image=full
#指定同步的库
#binlog-do-db=test
重启mysql service mysqld restart
5.2 创建mysql 表
CREATE TABLE `Flink_cdc` (
`id` BIGINT(64) AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(64) NULL,
`age` INT(20) NULL,
birthday TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
) ;
INSERT INTO `wudldb`.`Flink_cdc`(NAME,age) VALUES("flink",18) ;
5.3 在flinksql 中 创建flinkcdc 表
Flink SQL> CREATE TABLE source_mysql (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.162',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'wudldb',
'table-name' = 'Flink_cdc'
);
[INFO] Execute statement succeed.
5.4 创建flinksql 中的 flinkcdc 视图
Flink SQL> create view view_source_flinkcdc_mysql
> AS
> SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql;
[INFO] Execute statement succeed.
5.5 创建输出表,关联Hudi表,并且自动同步到Hive表
Flink SQL> CREATE TABLE flink_cdc_sink_hudi_hive(
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3),
> part VARCHAR(20),
> primary key(id) not enforced
> )
> PARTITIONED BY (part)
> with(
> 'connector'='hudi',
> 'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive',
> 'table.type'= 'MERGE_ON_READ',
> 'hoodie.datasource.write.recordkey.field'= 'id',
> 'write.precombine.field'= 'ts',
> 'write.tasks'= '1',
> 'write.rate.limit'= '2000',
> 'compaction.tasks'= '1',
> 'compaction.async.enabled'= 'true',
> 'compaction.trigger.strategy'= 'num_commits',
> 'compaction.delta_commits'= '1',
> 'changelog.enabled'= 'true',
> 'read.streaming.enabled'= 'true',
> 'read.streaming.check-interval'= '3',
> 'hive_sync.enable'= 'true',
> 'hive_sync.mode'= 'hms',
> 'hive_sync.metastore.uris'= 'thrift://node02.com:9083',
> 'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',
> 'hive_sync.table'= 'flink_cdc_sink_hudi_hive',
> 'hive_sync.db'= 'db_hive',
> 'hive_sync.username'= 'root',
> 'hive_sync.password'= '123456',
> 'hive_sync.support_timestamp'= 'true'
> );
[INFO] Execute statement succeed.
5.6 . 查询视图数据,插入到输出表
Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c618c9f528b9793adf4418640bb2a0fc
5.7 查看flink 运行job
6.hudi 与hive 整合
将hudi hudi-hadoop-mr-bundle-0.10.0.jar 拷贝到hive的lib 目录下面 , 重启hive 服务
6.1 连接hive 查看hudi 同步到hive 中的表
0: jdbc:hive2://node01.com:2181,node02.com:21> show tables;
INFO : Compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO : Semantic Analysis Completed (retrial = false)
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null)
INFO : Completed compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.016 seconds
INFO : Executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO : Starting task [Stage-0:DDL] in serial mode
INFO : Completed executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.012 seconds
INFO : OK
+------------------------------+
| tab_name |
+------------------------------+
| flink_cdc_sink_hudi_hive_ro |
| flink_cdc_sink_hudi_hive_rt |
+------------------------------+
6.1 查询
0: jdbc:hive2://node01.com:2181,node02.com:21> select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro;
INFO : Compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO : Semantic Analysis Completed (retrial = false)
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:id, type:bigint, comment:null), FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null), FieldSchema(name:birthday, type:bigint, comment:null)], properties:null)
INFO : Completed compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.124 seconds
INFO : Executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO : Completed executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.029 seconds
INFO : OK
+-----+--------+------+----------------+
| id | name | age | birthday |
+-----+--------+------+----------------+
| 1 | flink | 18 | 1645142397000 |
+-----+--------+------+----------------+
1 row selected (0.278 seconds)
0: jdbc:hive2://node01.com:2181,node02.com:21>
整体效果
错误 中途遇到一个错误
flinkcdc 需要的 flink-connector-mysql-cdc-2.0.2.jar 而不是 flink-sql-connector-mysql-cdc-2.0.2.jar 这个包
否在会遇到一下错误:
Flink SQL> select * from users_source_mysql;
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/data/Schema
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:457)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:67)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:795)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213)
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.data.Schema
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 69 more
Shutting down the session...
done.
[root@node01 bin]#