240 发简信
IP属地:西藏
  • @步闲 在输出路径前面写hdfs://cluster1或hdfs://cluster2

    Spark操作多HDFS集群

    由于特殊需求,需要在一次Spark任务中切换HDFS集群。 本文我将介绍如何在一次的spark任务中操作不同的HDFS集群 我们以wordcount为例,分析如何配置。我们的...

  • 120
    Flink输出到HDFS自定义分区

    获取分区路径 获取输出数据 最终结果

  • 楼主的配置有问题,应该把多个key相同的属性配置在一起
    val sc = new SparkContext()
    // 多个HDFS的相同配置
    sc.hadoopConfiguration.setStrings("fs.defaultFS", "hdfs://cluster1", "hdfs://cluster2");
    sc.hadoopConfiguration.setStrings("dfs.nameservices", "cluster1", "cluster2");
    // cluster1的配置
    sc.hadoopConfiguration.set("dfs.ha.namenodes.cluster1", "nn1,nn2");
    sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster1.nn1", "namenode001:8020");
    sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster1.nn2", "namenode002:8020");
    sc.hadoopConfiguration.set("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    // cluster2的配置
    sc.hadoopConfiguration.set("dfs.ha.namenodes.cluster2", "nn3,nn4");
    sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster2.nn3", "namenode003:8020");
    sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster2.nn4", "namenode004:8020");
    sc.hadoopConfiguration.set("dfs.client.failover.proxy.provider.cluster2", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

    Spark操作多HDFS集群

    由于特殊需求,需要在一次Spark任务中切换HDFS集群。 本文我将介绍如何在一次的spark任务中操作不同的HDFS集群 我们以wordcount为例,分析如何配置。我们的...