实验平台:
基于Windows的Spark环境,采用Python编程。
实验内容:
根据给定的啤酒销售数据和去年同期销量数据,把xlsx文件转换为TXT文件,针对11月份啤酒销售数据,并通过编程进行数据处理和计算。
- 去除整月销量为0的数据。
- 转换数值格式,把销量数据中的引号、逗号等处理掉,并转换为数值。(基于提供的”实习2啤酒销量数据.tx”数据文件)
- 有多少类型的啤酒?
- 哪5种啤酒卖得最好?(销量最高)
- 在去年销量大于500的区域中,哪个销售区域销售的啤酒同比去年增长最快?(按照增长率计算)
- 统计每种啤酒的11月份前3周的销量。
- 统计啤酒卖得最好的前三个区域的11月份前3周销量。
代码:
1.文件转换
将xlsx转为txt(用excel也可以转,但有编码问题,所以还是自己转)
# encoding = utf-8
import findspark
import xlrd
import os
import sys
findspark.init()
from pyspark import SparkConf, SparkContext
#此函数是将xlsx转为txt(用excel也可以转,但有编码问题,所以还是自己转)
# def txt_write(xlsxname, txtname):
# with open(txtname, mode = "w+", encoding="utf-8") as f:
# #打开指定文件
# xlsx_file = xlsxname
# book = xlrd.open_workbook(xlsx_file)
# #通过sheet索引获得sheet对象
# sheet01 = book.sheet_by_index(0)
# cellvalue=''
# # 获得指定索引的sheet名
# # sheet1_name = book.sheet_names()[0]
# # print(sheet1_name)
# # 通过sheet名字获得sheet对象
# # sheet1 = book.sheet_by_name(sheet1_name)
# #获得行数和列数
# #总行数
# nrows = sheet01.nrows
# #总列数
# ncols = sheet01.ncols
# #遍历打印表中的内容
# for i in range(nrows):
# for j in range(ncols):
# cellvalue = sheet01.cell_value(i, j)
# #强制转换数字格式
# celltype = sheet01.cell_type(i, j)
# if celltype == 2:
# cellvalue = str(cellvalue)
# f.write(str(cellvalue) + "\t")
# f.write('\n')
2.统计各种数据
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf=conf)
filepath="C:\\Users\\98275\\Desktop\\时空实习\\任务\\"
xlsxFile = "实习2 啤酒销量数据.xlsx"
logFile="实习2啤酒销量数据.txt"
#转换为txt(老师后来给了txt,所以不需要此操作)
#txt_write(filepath+xlsxFile,filepath+logFile)
lines = sc.textFile("file:///"+filepath+logFile)
#按tab位分割
words = lines.map(lambda line: line.split("\t"))
#print(words.take(10))
#1) 去除整月销量为0的数据 '0'
result1=words.filter(lambda line:line[4]!='0'or line[5]!='0'or line[6]!='0')
#print(result1.take(10))
list2=result1.take(result1.count())
list2.pop(0)
#print(list2)
#去掉逗号和双引号
for i in range(len(list2)):
for j in range(3,9):
list2[i][j]=list2[i][j].strip('"')
list2[i][j]=list2[i][j].replace(",","")
list2[i][j]=int(list2[i][j])
#print(list2)
rdd = sc.parallelize(list2)
print(rdd.collect())
soldRdd=rdd.sortBy(lambda line:-(line[4]+line[5]+line[6]))
#print(soldRdd.collect())
soldType=soldRdd.map(lambda line: (line[1],1)).reduceByKey(lambda a, b: a + b)
#销量前五的啤酒
print("销量前五的啤酒:")
print(soldType.take(5))
fastRdd=rdd.filter(lambda line:line[8]>=500)
#print(fastRdd.collect())
fastRdd=fastRdd \
.map(lambda line:(line[2],(line[4]+line[5]+line[6]/(0.75*line[8])))) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda pair:-pair[1])
#增长最快的区域
print("增长最快的区域")
print(fastRdd.take(1))
everyRdd=rdd.map(lambda line:(line[1],line[4]+line[5]+line[6])) \
.reduceByKey(lambda a, b: a + b)
#每种啤酒的11月份前3周的销量
print("每种啤酒的11月份前3周的销量:")
print(everyRdd.collect())
mostRdd=rdd.map(lambda line:(line[2],(line[4]+line[5]+line[6]))) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda pair:-pair[1])
#啤酒卖得最好的前三个区域的11月份前3周销量
print("啤酒卖得最好的前三个区域与11月份前3周销量:")
print(mostRdd.take(3))