MapReduce 对大数据进行分割,分为一定大小的数据;然后将分割的数据交给多个 Mapper 函数处理。处理后产生多个规模较小的数据再提交给 Reducer 函数处理,得到更小规模的数据或最终结果。
(1)大文件切分。
import os
import os.path
import time
def FileSplit(sourceFile, targetFolder):
if not os.path.isfile(sourceFile):
print(sourceFile, ' does not exist.')
return
if not os.path.isdir(targetFolder): # 切分为多个小文件,放到指定文件夹中
os.mkdir(targetFolder)
tempData = []
number = 10 # 每个小文件的记录条数
fileNum = 1 # 小文件序号
with open(sourceFile, 'r') as srcFile:
dataLine = srcFile.readline()
print('data:', dataLine)
while dataLine:
for i in range(number):
tempData.append(dataLine)
dataLine = srcFile.readline()
if not dataLine:
break
desFile = os.path.join(targetFolder, sourceFile[0:-4] + str(fileNum) + '.txt')
with open(desFile, 'a+') as f:
f.writelines(tempData)
tempData = []
fileNum = fileNum + 1
print(fileNum)
if __name__ == '__main__':
sourceFile = 'test.txt'
targetFolder = 'test'
FileSplit(sourceFile, targetFolder)
(2)Mapper 代码。
import os
import re
import threading
import time
def Map(sourceFile):
if not os.path.exists(sourceFile):
print(sourceFile, ' does not exist.')
return
pattern = re.compile(r'[0-9]{1,2}/[0-9]{1,2}/[0-9]{4}') # 用正则表达式匹配日期
result = {}
with open(sourceFile, 'r') as srcFile:
for dataLine in srcFile:
print('dataline:', dataLine)
r = pattern.findall(dataLine)
print(r)
if r:
t = result.get(r[0], 0)
t += 1
result[r[0]] = t
desFile = sourceFile[0:-4] + '_map.txt' # 结果文件
with open(desFile, 'a+') as fp:
for k, v in result.items():
fp.write(k + ':' + str(v) +'\n')
if __name__ == '__main__':
desFolder = 'test'
files = os.listdir(desFolder)
def Main(i):
Map(desFolder + '\\' + files[i])
fileNumber = len(files)
for i in range(fileNumber):
t = threading.Thread(target = Main, args = (i,))
t.start()
(3)Reducer 代码。
import os
def Reduce(sourceFolder, targetFile):
if not os.path.isdir(sourceFolder):
print(sourceFolder, ' does not exist.')
return
result = {}
# 用列表推导式获取文件夹的 Mapper 结果文件
allFiles = [sourceFolder + '\\' + f for f in os.listdir(sourceFolder) if f.endswith('_map.txt')]
for f in allFiles:
with open(f, 'r') as fp:
for line in fp:
line = line.strip()
if not line:
continue
position = line.index(':')
key = line[0:position]
value = int(line[position + 1:])
result[key] = result.get(key, 0) + value
with open(targetFile, 'w') as fp:
for k, v in result.items():
fp.write(k + ':' + str(v) + '\n')
if __name__ == '__main__':
Reduce('test', 'test\\result.txt')