通常在开发Spark任务的时候,都是先在本地主机的IDE中开发完Spark任务,然后上传到Spark集群,最后通过命令行提交并运行,这样很不方便。本节就来介绍一种直接在Eclipse IDE中通过调用外部工具spark-submit来直接提交spark任务的简便方法。
这里以提交Python任务为例进行说明环境搭建。(Java和Scala类似)
1.下载安装Eclipse Scala IDE
为了便于说明,我直接在Spark集群测试环境的master节点上装上图形化界面。然后去Scala IDE官网下载Linux对应版本的eclipse并安装。这个过程很简单,不做详述。
2.安装PyDev插件
启动Eclipse,依次点击Help —— Install New Software… —— Add —— Name输入:PyDev,Location输入:https//dl.bintray.com/fabioz/pydev/7.1.0/ —— OK —— 在列出的资源中勾选PyDev —— next —— next —— 选择”I accept …” —— Finish。耐心等待安装完成,然后重启Eclipse即可。
3.设置字符串替代变量
启动Eclipse,依次点击Window —— Preferences —— Run/Debug —— String Substitution —— New —— Name输入:SPARK_HOME,Value输入:Spark实际安装位置 —— OK。
以同样的方式,设置HADOOP_CONF_DIR变量,值为Hadoop配置文件的路径;设置PYSPARK_PYTHON,值为anaconda安装目录下的bin/python。
4.设置Python解释器路径
依次点击Window —— Preferences —— PyDev —— Interpreter —— Python Interpreter —— Browse for Python/pypy exe —— Interpreter Name输入Python3,Interpreter Executable输入anaconda安装目录下的bin/python3 —— OK —— 在列出的依赖中选择所有 —— OK即可。
5.设置anaconda中的链接库路径
依次点击Window —— Preferences —— PyDev —— Interpreter —— Python Interpreter —— Libraries —— New Folder —— 路径:anaconda安装目录下的lib/python3.7/site-packages —— OK。如此即可使用anaconda中包含的所有Python模块。
6.设置Spark Python的链接库
依次点击Window —— Preferences —— PyDev —— Interpreter —— Python Interpreter —— Libraries —— New Egg/Zip(s) —— 选择路径:spark家目录下的python/lib/中的pyspark.zip和py4j-*.zip两个zip包——OK。如此即可使用pyspark和spark-submit。
7.设置环境变量
依次点击Window —— Preferences —— PyDev —— Interpreter —— Python Interpreter —— Environment —— new —— Name输入:SPARK_HOME,Value输入:${SPARK_HOME},即之前配置字符串替代量。
同理,配置环境变量:HADOOP_CONF_DIR=${HADOOP_CONF_DIR},PYSPARK_PYTHON={PYSPARK_PYTHON}。
8.新建PyDev项目
依次点击 File —— New —— Project… —— PyDev —— PyDev Project —— next —— Project Name:PythonSpark —— Project Type:Python —— Finish。一个工程就建好了。
下面是一个WordCount.py程序:
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
def SetLogger( sc ):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)
def SetPath(sc):
global Path
if sc.master[0:5]=="local" :
Path="file:/root/workspace/PythonProject/"
else:
Path="hdfs://master:9000/workspace/PythonProject/"
#如果要在cluster模式运行(hadoop yarn 或Spark Stand alone),先把数据文件上传到HDFS目录
def CreateSparkContext():
sparkConf = SparkConf()\
.setAppName("WordCounts") \
.set("spark.ui.showConsoleProgress", "false")
sc = SparkContext(conf = sparkConf)
print("master="+sc.master)
SetLogger(sc)
SetPath(sc)
return (sc)
if __name__ == "__main__":
print("开始运行RunWordCount")
sc=CreateSparkContext()
print("开始读取文本文件...")
textFile = sc.textFile(Path+"data/README.md")
print("文本文件共"+str(textFile.count())+"行")
countsRDD = textFile \
.flatMap(lambda line: line.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x,y :x+y)
print("文字统计共"+str(countsRDD.count())+"项数据")
print("开始存储到文本文件...")
try:
countsRDD.saveAsTextFile(Path+ "data/output")
except Exception as e:
print("输出目录已经存在,请先删除原有目录")
sc.stop()
9.通过外部工具运行spark-submit
依次点击:Run——External Tools——External Tools Configurations… ——Program —— 新建 —— Name:spark-submit ——Main ——Location:spark-submit的完整路径,Working Directory:{project_name},Arguments:–driver-memory 1g –master local[4]
{string_prompt}——run。即可以local模式提交Spark任务。其他模式类似。
10.查看结果
控制台:
开始运行RunWordCount
master=local[*]
开始读取文本文件…
文本文件共95行
文字统计共260项数据
开始存储到文本文件…# ls data/output/
part-00000 part-00001 _SUCCESS# head data/output/part-00000
(u'', 67)
(u'when', 1)
(u'R,', 1)
(u'including', 3)
(u'computation', 1)
(u'using:', 1)
(u'guidance', 2)
(u'Scala,', 1)
(u'environment', 1)
(u'only', 1)# head data/output/part-00001
(u'help', 1)
(u'Hadoop', 3)
(u'high-level', 1)
(u'find', 1)
(u'web', 1)
(u'Shell', 2)
(u'how', 2)
(u'graph', 1)
(u'run:', 1)
(u'should', 2)