在AWS上使用MapReduce/Spark

1 Start up a cluster

1.1 Create Your Key Pair

在AWS Management Console, 选择EC2:


AWS Management Console, 选择EC2.png

选择Key Pairs:


选择Key Pairs.png

点击 "Create Key Pair"
Create Key Pair.png

Create 之后, 会自动下载一个 .pem 文件

You will subsequently use this .pem file to connect securely to a machine over the internet. This is a text file that contains an encrypted key.

1.2 Start up a cluster

回到 AWS Management Console, 选择EMR(Elastic MapReduce) , 点击"Create cluster"
一些配置选默认的就行, Primary 和 Core 这里可以选所需的资源


image.png

Cluster scaling and provisioning 这里可以选择你要几个实例, 比如我选的2, 也就意味着 1 primary and 2 core nodes


image.png

然后在 Security configuration and EC2 key pair 这里, 选你刚刚创建的那个key pair
image.png

Identify and Access Management (IAM) roles 这里, 设置一下. 就完事了, 然后点击 Create Cluster
image.png

查看状态:
刚创建的时候是starting, 这个时候还不能用, 要等大概15分钟左右, 会变成waiting, 就表示可用了.


image.png

等它的状态变成waiting了, 最后要修改一下Properities, 使得你可以通过SSH连接. 要点进Primary Node下面的这个链接:


image.png

点击 "Edit inbound rules", 加一条这样的规则, 然后保存:
image.png

1.3 Connect to the master node

本地启一个终端, 先修改pem文件的权限:

chmod 600 /Users/macwu/Downloads/KEY20250622.pem

把这个域名复制下来


image.png

执行如下代码连接: 注意要换你自己的pem文件的路径 和 master node的域名

ssh -i /Users/macwu/Downloads/KEY20250622.pem hadoop@ec2-54-84-161-92.compute-1.amazonaws.com

Enter yes to dismiss the security warning.
如果连接超时, 注意一下你实例的状态, 因为如果实例长时间未使用(我这边是1小时), 实例就会被terminate, 这个时候, 只能clone一个新的实例出来.

2 Run spark jobs

有两种方式,分别是Work with Spark interactively 和 Submit a Spark job in batch

2.1 Work with Spark interactively

使用如下命令:

pyspark

可以看到这个


image.png

在控制台逐行执行如下代码:

import re

# load up all of the 19997 documents in the corpus
corpus = sc.textFile ("s3://comp643bucket/lab/spark_intro_aws/20_news_same_line.txt")

# how many news/documents/lines?
corpus.count() # 19997

# each entry in validLines will be a "valid" line from the text file that should contain "id" 
validLines = corpus.filter(lambda x : 'id' in x)

# see first 3 elements
validLines.take(3)

# transform into a bunch of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('"> ') + 3:x.index(' </doc>')]))

# see first 3 elements
keyAndText.take(3)

# now we split the text in each (docID, text) pair into a list of words
# we have a bit of fancy regular expression stuff here to make sure that we do not die on some of the documents
# after this, we have a data set with (docID, ["word1", "word2", "word3", ...])
regex = re.compile('[^a-zA-Z]')
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))

# see first 3 elements
keyAndListOfWords.take(3)

# now get the top 20,000 words

# first change (docID, ["word1", "word2", "word3", ...]) to ("word1", 1) ("word2", 1) ...
allWords = keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))

# now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey (lambda a, b: a + b)

# and get the top 20,000 words in a local array
# each entry is a ("word1", count) pair
topWords = allCounts.top (20000, lambda x : x[1])

# see the first 3 elements
topWords[:3]

2.2 Submit a Spark job in batch

用echo命令, 在EMR机器上, 新建一个python文件:

echo "print()" > topWords_batch.py

然后使用vi 编辑这个文件, 将文件修改为如下代码, 然后保存退出

import re
from pyspark import SparkContext

sc = SparkContext()

# load up all of the 19997 documents in the corpus
corpus = sc.textFile ("s3://comp643bucket/lab/spark_intro_aws/20_news_same_line.txt")

# each entry in validLines will be a line from the text file
validLines = corpus.filter(lambda x : 'id' in x)

# now we transform it into a bunch of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('"> ') + 3:x.index(' </doc>')]))

# now we split the text in each (docID, text) pair into a list of words
# after this, we have a data set with (docID, ["word1", "word2", "word3", ...])
# we have a bit of fancy regular expression stuff here to make sure that we do not
# die on some of the documents
regex = re.compile('[^a-zA-Z]')
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))

# now get the top 20,000 words... first change (docID, ["word1", "word2", "word3", ...])
# to ("word1", 1) ("word2", 1)...
allWords = keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))

# now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey (lambda a, b: a + b)

# and get the top 20,000 words in a local array
# each entry is a ("word1", count) pair
topWords = allCounts.top (20000, lambda x : x[1])

# and we'll create an RDD that has a bunch of (word, dictNum) pairs
# start by creating an RDD that has the number 0 thru 20000
# 20000 is the number of words that will be in our dictionary
twentyK = sc.parallelize(range(20000))

# now, we transform (0), (1), (2), ... to ("mostcommonword", 0) ("nextmostcommon", 1), ...
# the number will be the spot in the dictionary used to tell us where the word is located
# HINT: make use of topWords in the lambda that you supply
# dictionary = twentyK.map (/* your code here */)
dictionary = twentyK.map(lambda idx: (topWords[idx][0], idx))

# finally, save the dictionary as a text file

dictionary.saveAsTextFile('output')

使用如下命令执行, 查看结果:

spark-submit topWords_batch.py
hdfs dfs -ls output
image.png

用如下命令, 把输出结果下载到本地用以分析:

# 先把文件复制到用户的当前目录
hdfs dfs -get output ~/

然后在自己的本地机器上, 开个终端, 用scp命令下载文件:

scp -i ./KEY20250622.pem -r hadoop@ec2-34-207-141-135.compute-1.amazonaws.com:output ./output/

下载之后, 想干啥干啥.

3 Terminate Cluster

在控制台点击Terminate即可:

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容