Flink使用GCS作为state backend

由于项目的原因,需要将Flink运行在GCP上,因此File System自然是想使用GCS. 在网上搜了很多,由于众所周知的原因,国内使用Google Cloud非常少,资料就更少了。Flink官方文档对这块描述又很简单,传送门。总结下如何使用GCS作为State Backend方法如下:

  1. 使用Flink对HDFS的支持方式支持GCS
  • 创建core-site.xml
    因为本文使用环境是Flink standalone环境,并无hfds,因此首先需要创建core-site.xml.
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
    <!-- Turn security off for tests by default -->
    <property>
      <name>fs.gs.impl</name>
      <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    </property>
    <property>
      <name>fs.AbstractFileSystem.gs.impl</name>
      <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    </property>
    <property>
      <name>google.cloud.auth.service.account.json.keyfile</name>
      <value>/data/flink-1.9.1/conf/gcs-service-account.json</value>
    </property>
    <property>
        <name>fs.gs.project.id</name>
        <value>XXX</value>
        <description>
            Required. Google Cloud Project ID with access to configured GCS buckets.
        </description>
    </property>
  • 配置flink-conf.yaml 使Flink能够找到core-site.xml配置文件
fs.hdfs.hadoopconf: /data/flink-1.9.1/conf/
  1. 在程序中使用GCS作为state backend
       CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setFailOnCheckpointingErrors(false);
        checkpointConfig.setCheckpointInterval(10000);
        checkpointConfig.setMinPauseBetweenCheckpoints(5000);
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
                "gs://flinkcheckpoint", true);
        env.setStateBackend((StateBackend) rocksDBStateBackend);
  1. 将相关jar包放到Flink能加载的class path上
  • gcs-connector-hadoop2-2.0.0.jar
  • gcsio-2.0.0.jar
  • google-api-client-1.30.1.jar
  • google-api-client-jackson2-1.30.1.jar
  • google-api-client-java6-1.30.1.jar
  • google-api-services-storage-v1-rev20190624-1.30.1.jar
  • google-extensions-0.4.jar
  • google-http-client-1.30.1.jar
  • google-http-client-jackson2-1.30.1.jar
  • google-oauth-client-1.30.1.jar
  • google-oauth-client-java6-1.30.1.jar
  • flink-shaded-hadoop2-2.8.3-1.8.3.jar
    如果程序报如下错误,这个时候可以check下Flink的log,一般是因为少加载包了,注意看下 flink-shaded-hadoop2-2.8.3-1.8.3.jar 这个包是必须的,是否已经放到了Flink可以加载的地方。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
  1. 创建并下载GCP上的相关project的json格式的service account,并将其放置在步骤1 google.cloud.auth.service.account.json.keyfile里的地址下,fs.gs.project.id配置为其project id.
  2. 一切就绪,Run Flink job, 在GCS的目录下可以检查是否有check point文件生成。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。