flinkStreamSql连接oracle11g版本适配

flinkStreamSql 1.11版本,在读取数据写入oracle11g报错
0RA-00933:SQL命令未正确结束。

image.png

Exception in thread "main" org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure.This suppresses job restarts, Please check the stack trace for the root cause. at com.dtstack.flink.sql.core.rdb.JdbcResourceCheck.checkPrivilege(JdbcResourceCheck. java:138) at com.dtstack.flink.sql.core.rdb.JdbcResourceCheck.checkResourceStatus(JdbcResourceCheck. java91) at com.dtstack.flink.sql.sink.rdb.table.RdbTablelnfo.check(RdbTableInfo. java:213) at com.dtstack.flink.sql.sink.rdb.table.RdbSinkParser.getTableInfo(RdbSinkParser. java:56) at com.dtstack.flink.sql.sink, oracle, table.OracleSinkParser.getTableInfo(OracleSinkParser. java:39) at com.dtstack.flink.sql.table.AbstractTablelnfoParser.parseWithTableType(AbstractTablelnfoParser. java:127) at com.dtstack.flink.sql.parser.SqlParser.parseSql(SalParser.iava:136) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecutionlExecuteProcessHeher. java:197) at com.dtstack.flink.sql.Ma in.maintain. java:45) at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:146) Caused by:java.lang.IllegalArgumentException:0RA-00933:SQL命令未正确结束 …10 more

对应的测试SQL

CREATE TABLE MyTable(
    name varchar,
    channel varchar,
    id int
 )WITH(
    type ='kafka',
    bootstrapServers ='xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092',
    zookeeperQuorum ='xx.xx.xx.xx:2181/kafka',
    offsetReset ='latest',
    groupId='testGroup',
    topic ='topicTest',
    parallelism ='1'
 );
 
 CREATE TABLE MyKTable(
    name varchar,
    channel varchar,
    id int
 )WITH(
    type ='kafka',
    bootstrapServers ='xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092',
    zookeeperQuorum ='xx.xx.xx.xx:2181/kafka',
    offsetReset ='latest',
    groupId='testGroup',
    topic ='topicTest2',
    parallelism ='1'
 );
 

 CREATE TABLE MyResult(
    PRIMARYKEY_ID int ,
    NAME VARCHAR,
    ADDRESS VARCHAR
 )WITH(
    type ='oracle',
    url ='jdbc:oracle:thin:@xx.xx.xx.xx:1521/xxxx',
    userName ='xxx',
    password ='xxx',
    tableName ='table',
    updateMode ='append',
    parallelism ='1',
    batchSize ='1',
    batchWaitInterval ='1000'
 );

insert          
into
    MyResult
    select
        id as PRIMARYKEY_ID,
        name as NAME,
        channel as ADDRESS
                                                 
    from
        MyTable a;
        insert          
into
    MyKTable
    select
        name,
        channel,
        id                                         
    from
        MyTable a;

查看源码,在flinkStreamSql提交任务的流程中,解析完SQL会在客户端先校验一些资源的可用性。使用Rdbms类型的 sink ,client端先检测配置的数据源,以及表是否可用,如果在client访问资源失败,就直接报错,不再提交到集群。
对应的代码在RdbTableInfo类的check方法里。跟踪源码找到了实际检测的sql语句:

image.png

用IEDA跑程序,在报错的语句断点执行ORACLE测试sql语句,statement执行单句sql在oracle11g版本中如果带了分号,就会报0RA-00933的错误

image.png
image.png

那就只能把这几个分号删除然后重新打包。


image.png

最后运行成功。。。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容