本文基于flink1.9
flink在批处理需求中利用jdbcInputFormat获取关系型数据库数据源数据的时候,往往有各种特殊字段类型,例如oracle的clob,blob等,如果这类数据需要经过flink,就不能直接利用BasicTypeInfo中已经有的类型了。因为数据在获取的时候程序会强制将数据库中的特殊类型转为你设置BasicTypeInfo的类型,然后可能程序直接就会报错类型转换失败。
经过一番研究,发现其实BasicTypeInfo中有一个方法可以直接接收JDBC的特殊类型,例如oracle中的clob,在java里面,对应java.sql.Clob这个类型,也就是在新建TypeInformation对象的时候引入BasicTypeInfo.of(Clob.class)即可,这样程序就不会出现强制类型的转换异常了。
然而,这种方式接收到的字段,在利用flink table处理的时候就不能直接使用了,接下来需要做的事情也就是,如何手动对接收的数据进行类型转换,转换成基本的数据类型,例如String类型。最简单的就是创建一个flink table的函数,在函数中,接收关系型数据库数据源中的数据类型字段的参数,例如这里的Clob类型作为参数,然后返回String类型的数据,中间的处理过程也就是如何利用java将Clob类型转为String类型而已。
最后在实际使用的时候,引入该函数,并在查询该字段的时候使用该函数即可。
详细代码如下:
(1)自定义函数ClobToStringextends
import org.apache.flink.table.functions.ScalarFunction;
import java.io.Reader;
import java.sql.Clob;
//大对象转字符串
public class ClobToString extends ScalarFunction {
public String eval(Clob clob){
if(clob ==null) {
return null;
}
try {
Reader inStreamDoc = clob.getCharacterStream();
char[] tempDoc =new char[(int) clob.length()];
inStreamDoc.read(tempDoc);
inStreamDoc.close();
return new String(tempDoc);
}catch (Exception es) {
System.out.println(es.getMessage());
}
return null;
}
}
(2)处理过程
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
import org.best.function.ClobToString;
import java.sql.Clob;
public class TestBatchJob {
public static void main(String[] args) throws Exception {
TypeInformation[] fieldTypes =new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.of(Clob.class)
};
RowTypeInfo rowTypeInfo =new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("oracle.jdbc.OracleDriver")
.setDBUrl("jdbc:oracle:thin:@192.168.0.123:1521:orcl")
.setUsername("test")
.setPassword("123456")
.setQuery("select a,to_clob(b) b from t1 where rownum<=100")
.setRowTypeInfo(rowTypeInfo)
.finish();
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource s = env.createInput(jdbcInputFormat); //datasource
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
tableEnv.registerFunction("ClobToString", new ClobToString());
tableEnv.registerDataSet("t2", s, "a,b");
tableEnv.sqlQuery("select * from t2").printSchema();
Table query = tableEnv.sqlQuery("select a,ClobToString(b) as b from t2");
DataSet result = tableEnv.toDataSet(query, Row.class);
result.print();
env.execute("test");
}
}