接昨天未完待续,继续:
首先,我要完成功能是:将下面的电影的links数据,在Spark上处理,处理结果存入到Hive中
这个是最后成功的图
功能流程如下图:
涉及的代码如下:
object ETL {
def main(args: Array[String]): Unit = {
val localClusterURL = "local[2]"
val clusterMasterURL = "spark://s1:7077"
val conf = new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val hc = new HiveContext(sc)
import sqlContext.implicits._
hc.sql("use moive_recommend")
// 设置RDD的partition的数量一般以集群分配给应用的CPU核数的整数倍为宜。
val minPartitions = 8
val links = sc.textFile(args(0),minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()
links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
hc.sql("drop table if exists links")
hc.sql("create table if not exists links(moiveId int,imdbId int,tmdbId int) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")
}
}
从一开始,我就有一个疑惑:
如何建立Windows、Linux、HDFS、Spark、Hive、MySql间的联系,进行通讯?
后面所有的行为都是为了解决这个问题。
1 Windows和Linux间的联系
用的这个,不多说,百度。
所有的文件、jar都是通过这个与Linux交互。
2 Windows和Spark交互
需要将最开始的代码,打成jar包,通过上面的WinSCP传入spark的lib(自己指定)下。
而IDEA编译Scala代码需要在Maven中做如下配置
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
<archive>
<manifest>
<mainClass>org.brave.spark.streaming.Producer</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
然后,做如图右侧选择,将项目打成左侧jar包
3 HDFS和Spark交互
接下来到了比较困难的部分,主要是下面5行代码。
敲黑板啦!!!
val links = sc.textFile(args(0),minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()
links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
hc.sql("drop table if exists links")
hc.sql("create table if not exists links(moiveId int,imdbId int,tmdbId int) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")
我最终运行的命令如下:
最后一部分参数,是links.txt文件的地址,文件存在Linux上,也上传在HDFS上
/home/spark/temp/moiveRec/links.txt
我不懂:
怎么将HDFS的数据传入到spark上运行?或者说怎么区分到底传入的是Linux的数据还是HDFS数据?
写HDFS会报错,如果写Linux本地的数据,后面的
hc.sql("drop table if exists links")
hc.sql("create table if not exists links(moiveId int,imdbId int,tmdbId int) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")
语句会找不到数据库,无法新建数据表。
于是问题变为:Spark如何与Hive交互(最难的地方,也是突破点)
4 Hive与Mysql交互
要解决这个问题,首先还要解决Hive与Mysql间的交互问题。详情见:
配置一台Hive + Mysql元数据库
5 Spark和Hive交互
OK,最后一步的交互。
首先,配置一台Hive + Mysql元数据库里面最后关于
讲述不对,需要这一部分的配置,不能为空,原因如下:
https://www.cnblogs.com/linbingdong/p/5829369.html
最后配置如下:
继续启动Hive,报错如下:
解决方法:
https://blog.csdn.net/blueheart20/article/details/38460541
到此,Hive已经配置好了Metastore。
最终的关键问题的解决还是依靠官方文档
就是说,要把hive-site.xml,core-site.xml,hdfs-site.xml都放入到Spark的conf目录下。
还需要将mysql的驱动放入到lib目录下(高版本是jars目录)
成功
最终再次运行下面代码
没在报错,成功写入到Hive中。
总结:遇到问题时,要善于对其转化,转化为能用简单的keywords描述。然后带着keywords:(1)首先去官网查;(2)然后是搜索;
END