Flink:注册Table Kafka Sink报错处理

实现

实现Flink Job:读取阿里云LogService日志,统计事件后将指标写入搭建的Kafka中。JAVA 示例代码如下:

        tableEnv.connect(
                new Kafka()
                        .version(config.get("kafka.version"))
                        .topic(config.get("kafka.topic"))
                        .property("bootstrap.servers", config.get("bootstrap.servers"))
                )
                .withFormat(
                        new Json().deriveSchema()
                )
                .withSchema(
                        new Schema()
                                .field("field1", Types.STRING)
                                .field("field2", Types.INT)
                )
                .inAppendMode()
                .registerTableSink("targetTable");
        joinTable.insertInto("targetTable", qConfig);

使用了TableAPI处理后,注册了一个Kafka的TableSink进行输出,根据Schema输出JSON格式到Kafka。pom.xml片段如下,使用的是0.10 的flink kafka connector:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.6.2</version>
        </dependency>

flink-json是必须的,否则也会报错

问题:

IDE本地测试执行正常,但Maven打包提交集群时报错:

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSinkFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.StreamTableSinkFactory'.
The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory

    at org.apache.flink.table.factories.TableFactoryService$.filterByFactoryClass(TableFactoryService.scala:176)
    at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:125)
    at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)

解决

DEBUG 到TableFactoryService.scala

  /**
    * Filters factories with matching context by factory class.
    */
  private def filterByFactoryClass[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
    if (classFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory implements '${factoryClass.getCanonicalName}'.",
        factoryClass,
        foundFactories,
        properties)
    }
    classFactories
  }

可以看到程序通过isAssignableFrom判断foundFactories列表是否有StreamTableSinkFactory的实现类,如果没有就会抛出上述异常。打开JsonRowFormatFactory,发现的确没有实现,那只要让它发现真正的实现Factory,问题自然就解决了。
确定列表在Jar包在META-INF/service/org.apache.flink.table.factories.TableFactory
直接打开发现只有一行:

TableFactory文件内容

而直接查看StreamTableSinkFactory的实现:
StreamTableSinkFactory及其实现类的列表

明显应该使用Kafka010TableSourceSinkFactory才对。
关于打包时如何正确关联实现类,因为时间关系先不细究。
直接的解决方法:在项目里直接放入正确的org.apache.flink.table.factories.TableFactory重新打包

IDE 文件路径及纠正后文件内容

加入以下类后打包执行正常:

org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容