时空数据处理与组织-RDD基础编程

实验平台:
基于Windows的Spark环境,采用Python编程。
实验内容:
根据给定的啤酒销售数据和去年同期销量数据,把xlsx文件转换为TXT文件,针对11月份啤酒销售数据,并通过编程进行数据处理和计算。

  1. 去除整月销量为0的数据。
  2. 转换数值格式,把销量数据中的引号、逗号等处理掉,并转换为数值。(基于提供的”实习2啤酒销量数据.tx”数据文件)
  3. 有多少类型的啤酒?
  4. 哪5种啤酒卖得最好?(销量最高)
  5. 在去年销量大于500的区域中,哪个销售区域销售的啤酒同比去年增长最快?(按照增长率计算)
  6. 统计每种啤酒的11月份前3周的销量。
  7. 统计啤酒卖得最好的前三个区域的11月份前3周销量。
    代码:
    1.文件转换
    将xlsx转为txt(用excel也可以转,但有编码问题,所以还是自己转)
# encoding = utf-8
import findspark
import xlrd
import os
import sys

findspark.init()
from pyspark import SparkConf, SparkContext
#此函数是将xlsx转为txt(用excel也可以转,但有编码问题,所以还是自己转)
# def txt_write(xlsxname, txtname):
#   with open(txtname, mode = "w+", encoding="utf-8") as f:
#       #打开指定文件
#       xlsx_file = xlsxname
#       book = xlrd.open_workbook(xlsx_file)
#       #通过sheet索引获得sheet对象
#       sheet01 = book.sheet_by_index(0)
#       cellvalue=''
#       # 获得指定索引的sheet名
#       # sheet1_name = book.sheet_names()[0]
#       # print(sheet1_name)
#       # 通过sheet名字获得sheet对象
#       # sheet1 = book.sheet_by_name(sheet1_name)
#       #获得行数和列数
#       #总行数
#       nrows = sheet01.nrows
#       #总列数
#       ncols = sheet01.ncols
#       #遍历打印表中的内容
#       for i in range(nrows):
#           for j in range(ncols):
#               cellvalue = sheet01.cell_value(i, j)
#               #强制转换数字格式
#               celltype = sheet01.cell_type(i, j)
#               if celltype == 2:
#                   cellvalue = str(cellvalue)
#               f.write(str(cellvalue) + "\t")
#           f.write('\n')

2.统计各种数据

conf = SparkConf().setMaster("local").setAppName("MyApp")
    sc = SparkContext(conf=conf)
    filepath="C:\\Users\\98275\\Desktop\\时空实习\\任务\\"
    xlsxFile = "实习2 啤酒销量数据.xlsx"
    logFile="实习2啤酒销量数据.txt"
    #转换为txt(老师后来给了txt,所以不需要此操作)
    #txt_write(filepath+xlsxFile,filepath+logFile)
    lines = sc.textFile("file:///"+filepath+logFile)
    #按tab位分割
    words = lines.map(lambda line: line.split("\t"))
    #print(words.take(10))
    #1) 去除整月销量为0的数据 '0'
    result1=words.filter(lambda line:line[4]!='0'or line[5]!='0'or line[6]!='0')
    #print(result1.take(10))
    list2=result1.take(result1.count())
    list2.pop(0)
    #print(list2)
    #去掉逗号和双引号
    for i in range(len(list2)):
        for j in range(3,9):
            list2[i][j]=list2[i][j].strip('"')
            list2[i][j]=list2[i][j].replace(",","")
            list2[i][j]=int(list2[i][j])
    #print(list2)
    rdd = sc.parallelize(list2)
    print(rdd.collect())

    soldRdd=rdd.sortBy(lambda line:-(line[4]+line[5]+line[6]))
    #print(soldRdd.collect())
    soldType=soldRdd.map(lambda line: (line[1],1)).reduceByKey(lambda a, b: a + b)
    #销量前五的啤酒
    print("销量前五的啤酒:")
    print(soldType.take(5))
    fastRdd=rdd.filter(lambda line:line[8]>=500)
    #print(fastRdd.collect())
    fastRdd=fastRdd \
        .map(lambda line:(line[2],(line[4]+line[5]+line[6]/(0.75*line[8])))) \
        .reduceByKey(lambda a, b: a + b) \
        .sortBy(lambda pair:-pair[1])
    #增长最快的区域
    print("增长最快的区域")
    print(fastRdd.take(1))
    everyRdd=rdd.map(lambda line:(line[1],line[4]+line[5]+line[6])) \
        .reduceByKey(lambda a, b: a + b)
    #每种啤酒的11月份前3周的销量
    print("每种啤酒的11月份前3周的销量:")
    print(everyRdd.collect())
    mostRdd=rdd.map(lambda line:(line[2],(line[4]+line[5]+line[6]))) \
        .reduceByKey(lambda a, b: a + b) \
        .sortBy(lambda pair:-pair[1])
    #啤酒卖得最好的前三个区域的11月份前3周销量
    print("啤酒卖得最好的前三个区域与11月份前3周销量:")
    print(mostRdd.take(3))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容