import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object TestFileCopy {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
.setIfMissing("spark.master", "local[4]")
.setAppName("Test File Copy App")
val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
//获取 SparkSession 的 SparkContext
val sc: SparkContext = spark.sparkContext
// hdfsFileCopy1(sc)
hdfsFileCopy2(sc)
sc.stop()
}
def hdfsFileCopy1(sc: SparkContext){
// 在输入数据之前先将hadoop config配置为cluster1集群
sc.hadoopConfiguration.addResource("cluster1/core-site.xml")
sc.hadoopConfiguration.addResource("cluster1/hdfs-site.xml")
val sourceDatePath = "hdfs://cluster1/tmp/"
val source: RDD[String] = sc.textFile(sourceDatePath + "aaa.txt")
source.foreach(println(_))
// 再将 hadoop config 设为cluster2集群
sc.hadoopConfiguration.addResource("cluster2/core-site.xml")
sc.hadoopConfiguration.addResource("cluster2/hdfs-site.xml")
val targetDatePath = "hdfs://cluster2/tmp/hdb/"
source.saveAsTextFile(targetDatePath)
}
def hdfsFileCopy2(sc: SparkContext){
// cluster1
sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://cluster1");
sc.hadoopConfiguration.set("dfs.nameservices", "cluster1");
sc.hadoopConfiguration.set("dfs.ha.namenodes.cluster1", "namenode98,namenode143");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster1.namenode98", "cdh-nn-01:8020");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster1.namenode143", "cdh-nn-02:8020");
sc.hadoopConfiguration.set("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
val sourceDatePath = "hdfs://cluster1/tmp/"
val source: RDD[String] = sc.textFile(sourceDatePath + "aaa.txt")
source.foreach(println(_))
// cluster2
sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://cluster2");
sc.hadoopConfiguration.set("dfs.nameservices", "cluster2");
sc.hadoopConfiguration.set("dfs.ha.namenodes.cluster2", "namenode424,namenode417");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster2.namenode424", "node-nn-01:8020");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster2.namenode417", "node-nn-02:8020");
sc.hadoopConfiguration.set("dfs.client.failover.proxy.provider.cluster2", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
val targetDatePath = "hdfs://cluster2/tmp/hdb/"
source.saveAsTextFile(targetDatePath)
}
}
使用 Spark 跨集群同步HDFS数据
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...