Flink 使用介绍相关文档目录
准备工作
在这一步需要配置Oracle。主要包含。
- 开启Archive log
- 开启数据库和数据表的supplemental log
- 创建CDC用户并赋予权限
注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。
下面开始配置步骤。在安装Oracle的机器上执行:
su - oracle
sqlplus / as sysdba
进入Sqlplus。然后开启Archive log。
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
# 检查Archive log是否成功开启
archive log list;
注意:
- 本步骤需要重启数据库,请选择在合适的时间操作。
- 例子中的
/opt/oracle/oradata/recovery_area
目录oracle用户需要有读写权限。 - 如果执行
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
的时候报ORA-32001: write to SPFILE requested but no SPFILE is in use。需要检查spfile文件。
show parameter spfile;
# 如果输出value为空,说明没有创建spfile,执行下面SQL创建
create spfile from pfile;
# 关闭并重启
shutdown immediate;
startup;
# 检查spfile是否成功创建
show parameter spfile;
开启数据库和需要CDC的表的supplemental log:
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
其中inventory.customers
需要CDC的目标表,格式为schema.table_name。
最后,我们需要创建CDC专用用户,以及为它赋予权限。
# 示例路径/opt/oracle/oradata/SID/,需要提前创建好并赋予权限
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
注意:如果使用的是Oracle 11g,执行GRANT LOGMINING TO flinkuser;会报没有LOGMINING这个role,可忽略这个错误,不影响使用。如果使用12c版本赋权语句有所不同,可参考Debezium Connector for Oracle :: Debezium Documentation。
最后需要强调下,我们的Oracle CDC程序运行的时候可能会报出如下错误。
ORA-12514, TNS:listener does not currently know of service requested in connect descriptor
编辑listener.ora
文件(不知道路径的可以find一下),添加:
SID_LIST_LISTENER =
(SID_LIST =
(SID_DESC =
(SID_NAME = ora11g)
(ORACLE_HOME = /data/oracle/product/11.2.0/dbhome_1)
)
)
SID_NAME
和ORACLE_HOME
改为真实的值,ORACLE_HOME
可通过环境变量查看。
修改后别忘了执行:
lsnrctl reload
重启监听器。
到此为止,Oracle数据库环境配置完毕。
项目依赖
在pom.xml
中添加如下依赖:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
Oracle CDC SQL方式
直接上示例程序:
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnvironment = TableEnvironment.create(bsSettings)
val sql =
"""
|CREATE TABLE test (
| ID INT,
| NAME STRING,
| AGE INT
| ) WITH (
| 'connector' = 'oracle-cdc',
| 'hostname' = 'orcl11g.us.oracle.com',
| 'port' = '1521',
| 'username' = 'flinkuser',
| 'password' = 'flinkpw',
| 'database-name' = 'ora11g',
| 'schema-name' = 'INVENTORY',
| 'table-name' = 'CUSTOMERS'
| )
|""".stripMargin
tableEnvironment.executeSql(sql)
// 如下两种print数据方式都可以使用
// 方法 1
// val result = tableEnvironment.executeSql("select * from test")
// result.print()
// 方法 2
tableEnvironment.executeSql("CREATE TABLE sink_table (ID INT, NAME STRING, AGE INT) WITH ( 'connector' = 'print')")
tableEnvironment.executeSql("INSERT INTO sink_table SELECT ID, NAME, AGE FROM test")
注意:Oracle字段默认会转化为大写。如果create table的时候没有使用引号引住字段名,则字段名会被转换为大写。那么在Flink create table的时候字段也必须使用大写。否则对应字段的内容会变成null,无法正常获取到数据!Oracle中查看建表语句的方法为SELECT DBMS_METADATA.GET_DDL('TABLE','表名称') FROM DUAL;
Oracle CDC API方式
除了使用SQL方式外,我们还可以使用DataStream API方式。
val sourceFunction: SourceFunction[String] = OracleSource
.builder[String]
.hostname("orcl11g.us.oracle.com")
.port(1521)
.database("ora11g")
.schemaList("INVENTORY")
.tableList("INVENTORY.CUSTOMERS")
.username("flinkuser")
.password("flinkpw")
.deserializer(new JsonDebeziumDeserializationSchema)
.build
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(sourceFunction).print.setParallelism(1) // use parallelism 1 for sink to keep message ordering
env.execute()
注意:tableList参数有一个坑,必须配置为
schema-name.table-name
格式,否则会找不到数据表。和SQL中的table-name配置方式不同!
参考文献
Oracle CDC Connector — Flink CDC 2.0.0 documentation (ververica.github.io)