首先准备好hadoop和spark以及scala的环境
主节点如下
分节点如下
然后完成以及idea的安装以及idea上scala的插件安装,我们就可以开始编程了。
有以下两点需要注意的:
1.scala和spark的版本,最好按照推荐安装,我是用的spark-2.20,之前用scala-2.12.* 出现巨多问题,例如运行任务时报错:
java.lang.NoSuchMethodError:scala.Predef$.ArrowAssoc(....)
按照官网上的推荐spark-2.20就最好是用scala-2.11.*。
2.如果虚拟机上编程太慢的话,其实更建议直接在本地用idea编程打包后,把jar包传输到虚拟机
一、WordCount
(着重以WordCount编程为重点进行练习,后面的例子若有重复的步骤就简单略过)
1.打开idea,创建scala工程
其中,JDK和Scala SDK就是java和scala的路径
2.在src文件夹下创建两个子目录,一个cluster用于跑spark,另外一个local用于idea上调试。(其中out目录和META-INF创建jar包后自动生成的,开始并没有)然后在两个个文件夹下分别创建scala.class
3.然后要想进行spark编程,我们就得导入spark的相关包
File → Projecte Structure → Libraries → “+”→ Java → *选择spark目录下的jars文件夹*
ps:其实我们的编程暂时用不到这个目录下的所有包,可以只导入需要的,但就需要花时间去找;也可以全部导入,但是整个工程就会变得臃肿然后点OK再点OK,回到界面,我们的相关包就导入完成了
4.接下来就是正式的编程,我们先上WordCount的代码
//指在cluster这个目录下
package cluster
//导入了spark的SparkConf, SparkContext两个类
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: ")
System.exit(1)
}
//实例化configuration,用于编辑我们任务的相关信息,后面的setAppName可以在控制台运行的时候再设置
val conf = new SparkConf().setAppName("MySparkApp")
// sc是Spark Context,指的是“上下文”,也就是我们运行的环境,需要把conf当参数传进去;
val sc = new SparkContext(conf)
//通过sc获取一个(hdfs上的)文本文件,args(0)就是我们控制台上传入的参数,local运行的话就是传入本地一个文本的path
val line = sc.textFile(args(0))
//下面就是wordcount具体的执行代码
line.flatMap(_.split("")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
sc.stop()
}
}
这就是WordCount的代码,用于在Spark平台上跑。如果需要在idea上测试的话,就可以把args(0)具体改为一个文本文件的path。比如在工程的目录下创建data文件夹,把test.txt扔上去,args(0)就可以修改为"data/test.txt";然后把sc设置为
val spark=new SparkContext("local","SparkPI")
这样的本地模式,就可以直接在idea上run。
5.打包成jar文件
File → Projecte Structure → Artifacts → “+” → JAR → From modules with dependencies... ...(这个选项的意思是把我们引入的所有外部包都封装起来,Empty就是不算上我们引入的spark包)
然后Main Class就选择我们的cluster,local是我们用于本地测试的,并且Main Class的名字一定要记住,后面spark上运行是要使用的。然后点ok就可以创建相关文件。如果之前就创建了的话,需要把之前的相关信息,也就是工程下的META-INF文件夹删除才可以成功创建。
回到主界面,然后Build → BuildArtifacts 就可以自行创建jar包了。
6.idea会把创建的jar包放进工程下的out文件夹,我们把它找到,为了方便,把jar包放进spark目录下,然后打开,进入META-INF文件夹,把后缀名为.DSA .SF . RSA的文件删除,
因为查资料说某些包的签名有问题,会导致我们在运行时找不到主类,事实上也确实是....
7.我们进入spark目录,通过bin文件夹下的spark-submit来提交任务,执行命令./bin/spark-submit -h来获得帮助文档,我就拿挑几个常用的:
8.
基本格式:
Usage: spark-submit [options] [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]
选项:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
我们就按上面所说执行如下命令
./bin/spark-submit --master yarn-clien --class cluster.WordCount WordCount.jar /tmp/text.txt
--master 就是我们master的url
--class 就是我们打包jar的主类名称
WordCount.jar 就是jar包名
/tmp/text.txt 是我事先放在hdfs上的测试文本文件,也就是我们编程中的args(0)参数
测试文本text.txt 内容如下
执行之后我们就可以等他完成,看到如下的结果:
对比下没有问题,中间缺失字母的地方应该是空格和换行符。
WordCount执行完毕。
二、Pi
重复步骤就不再多余赘述了
1.先上cluster上的代码
package // avoid overflow
import scala.math.random
import org.apache.spark._
object Pi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
args(0)也是我们之后运行时要添加的参数。如果是在本地模式下,依旧是把sc设置成本地模式。
val spark=new SparkContext("local","SparkPI")
如果要设置几个核跑的话就这样"local[*]",*为你人为设定的个数。其实也可以在控制台上设置。
2.打成jar包,删除包下相关文件,再mv到spark目录下
3.启动hadoop,spark后,照例用spark-submit提交任务
这个地方我没设置参数是因为代码中自带了判断,没有参数这一情况下,他就自己设置为了2。
if (args.length > 0) args(0).toInt else 2
然后等待结果
在一堆info中查找结果还是挺费眼睛的,所以我们可以在代码中对打印结果这个步骤稍加修改,增添两行*号:
再打包运行一遍,找起来就应该方便多了。
三、K-Means
这个例子我们就尝试用本地(local)模式来运行
1.首先我们得在idea上准备好测试的文本和输出的路径
我们在工程下创建data目录,把测试文本扔进去,内容如下
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
然后再创建一个result文件夹用于存放结果,准备工作做好
2.准备好代码,如下:
package local
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
object K_Means {
def main(args: Array[String]) {
//初始化sc
val sc = new SparkContext("local", "Kmeans")
//读入数据
val rdd = sc.textFile("data/Kmeans_data.txt")
//rdd转化,转化成对应的RDD
val data = rdd.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))
//最大迭代次数
val numIteration = 20
//聚类个数
val numClass = 5
//构建模型
val model = KMeans.train(data, numClass, numIteration)
//输出聚类中心
println("Cluster centers:")
for (c <- model.clusterCenters) {
println(" " + c.toString)
}
//使用误差平方之和来评估数据模型
val cost = model.computeCost(data)
println("Within Set Sum of Squared Errors = " + cost)
//使用模型测试单点数据
println("Vectors 0.2 0.2 0.2 is belongs to clusters:" + model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))
println("Vectors 0.25 0.25 0.25 is belongs to clusters:" + model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))
println("Vectors 8 8 8 is belongs to clusters:" + model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))
//交叉评估1,只返回结果
val testdata = rdd.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))
val result1 = model.predict(testdata)
result1.saveAsTextFile("result/kmeanout1/")
//交叉评估2,返回数据集和结果
val result2 = rdd.map {
line =>
val linevectore = Vectors.dense(line.split(" ").map(_.toDouble))
val prediction = model.predict(linevectore)
line + " " + prediction
}.saveAsTextFile("result/kmeanout2/")
sc.stop()
}
}
我们可以看到代码中运用到了mllib库,专门用于机器学习的算法。然而我们本地运行的重点是如下几个地方:
一是我们的sc不再由conf做参数,而是直接运用local的本地模式
二是我们读取文件的地方不再由控制台提供,而是直接由代码提供。路径为我们创建工程时就准备好的测试数据,把它转化为rdd然后才能后续操作。
三是我们输出的结果由saveAsTextFile方法存入本地指定的目标文件夹result,而没有像之前打印在控制台上(其实也可以打印的)
3.然后我们就开始运行Run → Run... →K_Means(主类名称)
我们可以通过idea的控制台看到各种info信息,和我们在spark上跑的信息一样。甚至如图还给了你端口号,说明虽然是本地运行,但还是启用了spark平台,也说明我们运行是成功的。
4.查看我们运行的结果,也就是我们是之前设定的输出路径
可以看到我们之前什么都没有的result文件夹下多了两个子文件夹(说明在输出时不存在的文件夹它会自行创建),里面就包含了我们Kmeans的结果,分别打开两个文件夹的part-00000
我们得到了我们想要的结果,Kmeans的本地模式运行也就成功结束了。
如果要转入到spark上运行,也就像之前的,更改sc,然后修改数据来源和输出路径为控制台输入的参数就行了。
至此结束,有任何问题欢迎指出(#^.^#)