1 Start up a cluster
1.1 Create Your Key Pair
在AWS Management Console, 选择EC2:

选择Key Pairs:

点击 "Create Key Pair"

Create 之后, 会自动下载一个 .pem 文件
You will subsequently use this .pem file to connect securely to a machine over the internet. This is a text file that contains an encrypted key.
1.2 Start up a cluster
回到 AWS Management Console, 选择EMR(Elastic MapReduce) , 点击"Create cluster"
一些配置选默认的就行, Primary 和 Core 这里可以选所需的资源

Cluster scaling and provisioning 这里可以选择你要几个实例, 比如我选的2, 也就意味着 1 primary and 2 core nodes

然后在 Security configuration and EC2 key pair 这里, 选你刚刚创建的那个key pair

Identify and Access Management (IAM) roles 这里, 设置一下. 就完事了, 然后点击 Create Cluster

查看状态:
刚创建的时候是starting, 这个时候还不能用, 要等大概15分钟左右, 会变成waiting, 就表示可用了.

等它的状态变成waiting了, 最后要修改一下Properities, 使得你可以通过SSH连接. 要点进Primary Node下面的这个链接:

点击 "Edit inbound rules", 加一条这样的规则, 然后保存:

1.3 Connect to the master node
本地启一个终端, 先修改pem文件的权限:
chmod 600 /Users/macwu/Downloads/KEY20250622.pem
把这个域名复制下来

执行如下代码连接: 注意要换你自己的pem文件的路径 和 master node的域名
ssh -i /Users/macwu/Downloads/KEY20250622.pem hadoop@ec2-54-84-161-92.compute-1.amazonaws.com
Enter yes to dismiss the security warning.
如果连接超时, 注意一下你实例的状态, 因为如果实例长时间未使用(我这边是1小时), 实例就会被terminate, 这个时候, 只能clone一个新的实例出来.
2 Run spark jobs
有两种方式,分别是Work with Spark interactively 和 Submit a Spark job in batch
2.1 Work with Spark interactively
使用如下命令:
pyspark
可以看到这个

在控制台逐行执行如下代码:
import re
# load up all of the 19997 documents in the corpus
corpus = sc.textFile ("s3://comp643bucket/lab/spark_intro_aws/20_news_same_line.txt")
# how many news/documents/lines?
corpus.count() # 19997
# each entry in validLines will be a "valid" line from the text file that should contain "id"
validLines = corpus.filter(lambda x : 'id' in x)
# see first 3 elements
validLines.take(3)
# transform into a bunch of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('"> ') + 3:x.index(' </doc>')]))
# see first 3 elements
keyAndText.take(3)
# now we split the text in each (docID, text) pair into a list of words
# we have a bit of fancy regular expression stuff here to make sure that we do not die on some of the documents
# after this, we have a data set with (docID, ["word1", "word2", "word3", ...])
regex = re.compile('[^a-zA-Z]')
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))
# see first 3 elements
keyAndListOfWords.take(3)
# now get the top 20,000 words
# first change (docID, ["word1", "word2", "word3", ...]) to ("word1", 1) ("word2", 1) ...
allWords = keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))
# now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey (lambda a, b: a + b)
# and get the top 20,000 words in a local array
# each entry is a ("word1", count) pair
topWords = allCounts.top (20000, lambda x : x[1])
# see the first 3 elements
topWords[:3]
2.2 Submit a Spark job in batch
用echo命令, 在EMR机器上, 新建一个python文件:
echo "print()" > topWords_batch.py
然后使用vi 编辑这个文件, 将文件修改为如下代码, 然后保存退出
import re
from pyspark import SparkContext
sc = SparkContext()
# load up all of the 19997 documents in the corpus
corpus = sc.textFile ("s3://comp643bucket/lab/spark_intro_aws/20_news_same_line.txt")
# each entry in validLines will be a line from the text file
validLines = corpus.filter(lambda x : 'id' in x)
# now we transform it into a bunch of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('"> ') + 3:x.index(' </doc>')]))
# now we split the text in each (docID, text) pair into a list of words
# after this, we have a data set with (docID, ["word1", "word2", "word3", ...])
# we have a bit of fancy regular expression stuff here to make sure that we do not
# die on some of the documents
regex = re.compile('[^a-zA-Z]')
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))
# now get the top 20,000 words... first change (docID, ["word1", "word2", "word3", ...])
# to ("word1", 1) ("word2", 1)...
allWords = keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))
# now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey (lambda a, b: a + b)
# and get the top 20,000 words in a local array
# each entry is a ("word1", count) pair
topWords = allCounts.top (20000, lambda x : x[1])
# and we'll create an RDD that has a bunch of (word, dictNum) pairs
# start by creating an RDD that has the number 0 thru 20000
# 20000 is the number of words that will be in our dictionary
twentyK = sc.parallelize(range(20000))
# now, we transform (0), (1), (2), ... to ("mostcommonword", 0) ("nextmostcommon", 1), ...
# the number will be the spot in the dictionary used to tell us where the word is located
# HINT: make use of topWords in the lambda that you supply
# dictionary = twentyK.map (/* your code here */)
dictionary = twentyK.map(lambda idx: (topWords[idx][0], idx))
# finally, save the dictionary as a text file
dictionary.saveAsTextFile('output')
使用如下命令执行, 查看结果:
spark-submit topWords_batch.py
hdfs dfs -ls output

用如下命令, 把输出结果下载到本地用以分析:
# 先把文件复制到用户的当前目录
hdfs dfs -get output ~/
然后在自己的本地机器上, 开个终端, 用scp命令下载文件:
scp -i ./KEY20250622.pem -r hadoop@ec2-34-207-141-135.compute-1.amazonaws.com:output ./output/
下载之后, 想干啥干啥.
3 Terminate Cluster
在控制台点击Terminate即可: