Flink 使用之 Oracle CDC

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

准备工作

在这一步需要配置Oracle。主要包含。

  1. 开启Archive log
  2. 开启数据库和数据表的supplemental log
  3. 创建CDC用户并赋予权限

注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。

下面开始配置步骤。在安装Oracle的机器上执行:

su - oracle
sqlplus / as sysdba

进入Sqlplus。然后开启Archive log。

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
# 检查Archive log是否成功开启
archive log list;

注意:

  1. 本步骤需要重启数据库,请选择在合适的时间操作。
  2. 例子中的/opt/oracle/oradata/recovery_area目录oracle用户需要有读写权限。
  3. 如果执行alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;的时候报ORA-32001: write to SPFILE requested but no SPFILE is in use。需要检查spfile文件。
show parameter spfile;
# 如果输出value为空,说明没有创建spfile,执行下面SQL创建
create spfile from pfile;
# 关闭并重启
shutdown immediate;
startup;
# 检查spfile是否成功创建
show parameter spfile;

开启数据库和需要CDC的表的supplemental log:

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

其中inventory.customers需要CDC的目标表,格式为schema.table_name。

最后,我们需要创建CDC专用用户,以及为它赋予权限。

# 示例路径/opt/oracle/oradata/SID/,需要提前创建好并赋予权限
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;

GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

注意:如果使用的是Oracle 11g,执行GRANT LOGMINING TO flinkuser;会报没有LOGMINING这个role,可忽略这个错误,不影响使用。如果使用12c版本赋权语句有所不同,可参考Debezium Connector for Oracle :: Debezium Documentation

最后需要强调下,我们的Oracle CDC程序运行的时候可能会报出如下错误。

ORA-12514, TNS:listener does not currently know of service requested in connect descriptor

编辑listener.ora文件(不知道路径的可以find一下),添加:

SID_LIST_LISTENER =
  (SID_LIST =
    (SID_DESC =
      (SID_NAME = ora11g)
      (ORACLE_HOME = /data/oracle/product/11.2.0/dbhome_1)
    )
  )

SID_NAMEORACLE_HOME改为真实的值,ORACLE_HOME可通过环境变量查看。

修改后别忘了执行:

lsnrctl reload

重启监听器。

到此为止,Oracle数据库环境配置完毕。

项目依赖

pom.xml中添加如下依赖:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-oracle-cdc</artifactId>
    <!-- the dependency is available only for stable releases. -->
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

Oracle CDC SQL方式

直接上示例程序:

val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnvironment = TableEnvironment.create(bsSettings)

val sql =
    """
        |CREATE TABLE test (
        |     ID INT,
        |     NAME STRING,
        |     AGE INT
        |     ) WITH (
        |     'connector' = 'oracle-cdc',
        |     'hostname' = 'orcl11g.us.oracle.com',
        |     'port' = '1521',
        |     'username' = 'flinkuser',
        |     'password' = 'flinkpw',
        |     'database-name' = 'ora11g',
        |     'schema-name' = 'INVENTORY',
        |     'table-name' = 'CUSTOMERS'
        |     )
        |""".stripMargin

tableEnvironment.executeSql(sql)
// 如下两种print数据方式都可以使用
// 方法 1
//    val result = tableEnvironment.executeSql("select * from test")
//    result.print()

// 方法 2
tableEnvironment.executeSql("CREATE TABLE sink_table (ID INT, NAME STRING, AGE INT) WITH (    'connector' = 'print')")
tableEnvironment.executeSql("INSERT INTO sink_table SELECT ID, NAME, AGE FROM test")

注意:Oracle字段默认会转化为大写。如果create table的时候没有使用引号引住字段名,则字段名会被转换为大写。那么在Flink create table的时候字段也必须使用大写。否则对应字段的内容会变成null,无法正常获取到数据!Oracle中查看建表语句的方法为SELECT DBMS_METADATA.GET_DDL('TABLE','表名称') FROM DUAL;

Oracle CDC API方式

除了使用SQL方式外,我们还可以使用DataStream API方式。

val sourceFunction: SourceFunction[String] = OracleSource
    .builder[String]
    .hostname("orcl11g.us.oracle.com")
    .port(1521)
    .database("ora11g")
    .schemaList("INVENTORY")
    .tableList("INVENTORY.CUSTOMERS")
    .username("flinkuser")
    .password("flinkpw")
    .deserializer(new JsonDebeziumDeserializationSchema)
    .build

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.addSource(sourceFunction).print.setParallelism(1) // use parallelism 1 for sink to keep message ordering
    env.execute()

注意:tableList参数有一个坑,必须配置为schema-name.table-name格式,否则会找不到数据表。和SQL中的table-name配置方式不同!

参考文献

Oracle CDC Connector — Flink CDC 2.0.0 documentation (ververica.github.io)

Debezium Connector for Oracle :: Debezium Documentation

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容