Flink-CDC读取Decimal 或者NUMBER等数值类型变成了非数值字符串

 public static void main(String[] args) throws Exception {
        Properties prop = new Properties();
        prop.put("decimal.handling.mode", "string");
 
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .hostname("...")
                .port(...)
                .database("...") // monitor XE database
                .schemaList("...") // monitor inventory schema
                .tableList("...") // monitor products table
                .username("...")
                .password("...")
                .debeziumProperties(prop)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        env.addSource(sourceFunction)
                .print()
                .setParallelism(1)
        ; // use parallelism 1 for sink to keep message ordering
 
        env.execute();
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容