实现
实现Flink Job:读取阿里云LogService日志,统计事件后将指标写入搭建的Kafka中。JAVA 示例代码如下:
tableEnv.connect(
new Kafka()
.version(config.get("kafka.version"))
.topic(config.get("kafka.topic"))
.property("bootstrap.servers", config.get("bootstrap.servers"))
)
.withFormat(
new Json().deriveSchema()
)
.withSchema(
new Schema()
.field("field1", Types.STRING)
.field("field2", Types.INT)
)
.inAppendMode()
.registerTableSink("targetTable");
joinTable.insertInto("targetTable", qConfig);
使用了TableAPI处理后,注册了一个Kafka的TableSink进行输出,根据Schema输出JSON格式到Kafka。pom.xml片段如下,使用的是0.10 的flink kafka connector:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.6.2</version>
</dependency>
flink-json是必须的,否则也会报错
问题:
IDE本地测试执行正常,但Maven打包提交集群时报错:
Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSinkFactory' in
the classpath.
Reason: No factory implements 'org.apache.flink.table.factories.StreamTableSinkFactory'.
The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
at org.apache.flink.table.factories.TableFactoryService$.filterByFactoryClass(TableFactoryService.scala:176)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:125)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
解决
DEBUG 到TableFactoryService.scala
/**
* Filters factories with matching context by factory class.
*/
private def filterByFactoryClass[T](
factoryClass: Class[T],
properties: Map[String, String],
foundFactories: Seq[TableFactory])
: Seq[TableFactory] = {
val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
if (classFactories.isEmpty) {
throw new NoMatchingTableFactoryException(
s"No factory implements '${factoryClass.getCanonicalName}'.",
factoryClass,
foundFactories,
properties)
}
classFactories
}
可以看到程序通过isAssignableFrom判断foundFactories列表是否有StreamTableSinkFactory的实现类,如果没有就会抛出上述异常。打开JsonRowFormatFactory,发现的确没有实现,那只要让它发现真正的实现Factory,问题自然就解决了。
确定列表在Jar包在META-INF/service/org.apache.flink.table.factories.TableFactory
直接打开发现只有一行:
而直接查看StreamTableSinkFactory的实现:
明显应该使用Kafka010TableSourceSinkFactory才对。
关于打包时如何正确关联实现类,因为时间关系先不细究。
直接的解决方法:在项目里直接放入正确的org.apache.flink.table.factories.TableFactory重新打包
加入以下类后打包执行正常:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory