flinkStreamSql 1.11版本,在读取数据写入oracle11g报错
0RA-00933:SQL命令未正确结束。
image.png
Exception in thread "main" org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure.This suppresses job restarts, Please check the stack trace for the root cause. at com.dtstack.flink.sql.core.rdb.JdbcResourceCheck.checkPrivilege(JdbcResourceCheck. java:138) at com.dtstack.flink.sql.core.rdb.JdbcResourceCheck.checkResourceStatus(JdbcResourceCheck. java91) at com.dtstack.flink.sql.sink.rdb.table.RdbTablelnfo.check(RdbTableInfo. java:213) at com.dtstack.flink.sql.sink.rdb.table.RdbSinkParser.getTableInfo(RdbSinkParser. java:56) at com.dtstack.flink.sql.sink, oracle, table.OracleSinkParser.getTableInfo(OracleSinkParser. java:39) at com.dtstack.flink.sql.table.AbstractTablelnfoParser.parseWithTableType(AbstractTablelnfoParser. java:127) at com.dtstack.flink.sql.parser.SqlParser.parseSql(SalParser.iava:136) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecutionlExecuteProcessHeher. java:197) at com.dtstack.flink.sql.Ma in.maintain. java:45) at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:146) Caused by:java.lang.IllegalArgumentException:0RA-00933:SQL命令未正确结束 …10 more
对应的测试SQL
CREATE TABLE MyTable(
name varchar,
channel varchar,
id int
)WITH(
type ='kafka',
bootstrapServers ='xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092',
zookeeperQuorum ='xx.xx.xx.xx:2181/kafka',
offsetReset ='latest',
groupId='testGroup',
topic ='topicTest',
parallelism ='1'
);
CREATE TABLE MyKTable(
name varchar,
channel varchar,
id int
)WITH(
type ='kafka',
bootstrapServers ='xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092',
zookeeperQuorum ='xx.xx.xx.xx:2181/kafka',
offsetReset ='latest',
groupId='testGroup',
topic ='topicTest2',
parallelism ='1'
);
CREATE TABLE MyResult(
PRIMARYKEY_ID int ,
NAME VARCHAR,
ADDRESS VARCHAR
)WITH(
type ='oracle',
url ='jdbc:oracle:thin:@xx.xx.xx.xx:1521/xxxx',
userName ='xxx',
password ='xxx',
tableName ='table',
updateMode ='append',
parallelism ='1',
batchSize ='1',
batchWaitInterval ='1000'
);
insert
into
MyResult
select
id as PRIMARYKEY_ID,
name as NAME,
channel as ADDRESS
from
MyTable a;
insert
into
MyKTable
select
name,
channel,
id
from
MyTable a;
查看源码,在flinkStreamSql提交任务的流程中,解析完SQL会在客户端先校验一些资源的可用性。使用Rdbms类型的 sink ,client端先检测配置的数据源,以及表是否可用,如果在client访问资源失败,就直接报错,不再提交到集群。
对应的代码在RdbTableInfo类的check方法里。跟踪源码找到了实际检测的sql语句:
image.png
用IEDA跑程序,在报错的语句断点执行ORACLE测试sql语句,statement执行单句sql在oracle11g版本中如果带了分号,就会报0RA-00933的错误
image.png
image.png
那就只能把这几个分号删除然后重新打包。
image.png
最后运行成功。。。