003-PySpark, Pandas, Modin 运算对比

1.1

数据量内存支持时Pandas速度更快
数据量大时Pandas无法计算, PySpark可以时间换空间

1.2 测试脚本

import os
import sys
import datetime
import numpy as np

import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import countDistinct, sum
from pyspark.sql.types import *

cache_dir='/home/'

goods_cache=cache_dir+'data_goods.csv'
stock_cache=cache_dir+'data_stock.csv'

import findspark
findspark.init()

def test_pyspark():
    start=datetime.datetime.now()

    spark=SparkSession.builder.config("spark.default.parallelism", 3000).appName("taSpark").getOrCreate()
    df_stock = spark.read.csv(stock_cache,header=True)
    df2=df_stock.groupBy(['城市','客户']).agg(
        sum("库存").alias("求和"),
        countDistinct("商品编码").alias("去重计数")
    )
    df2.show(truncate=False)
    end = datetime.datetime.now()
    print('\n\n PySpark   运行时长     '+str(end-start)+'\n\n')

def test_pandas():
    start=datetime.datetime.now()

    import pandas as pd
    df_stock=pd.read_csv(stock_cache)
    df2=df_stock.groupby(['城市','客户']).agg({'库存':np.sum,'商品编码':pd.Series.nunique}).reset_index()
    df2.rename(columns={'库存':'求和','商品编码':'去重计数'},inplace=True)
    print(df2.head())

    end = datetime.datetime.now()
    print('\n\n Pandas   运行时长     '+str(end-start)+'\n\n')

def test_modin():
    start=datetime.datetime.now()
    import modin.pandas as pd

    df_stock=pd.read_csv(stock_cache)
    df2=df_stock.groupby(['城市','客户']).agg({'库存':np.sum,'商品编码':pd.Series.nunique}).reset_index()
    df2.rename(columns={'库存':'求和','商品编码':'去重计数'},inplace=True)
    print(df2.head())
    end = datetime.datetime.now()
    print('\n\n Modin   运行时长     '+str(end-start)+'\n\n')





def test_pyspark_join():
    start=datetime.datetime.now()

    spark=SparkSession.builder.config("spark.default.parallelism", 3000).appName("taSpark").getOrCreate()
    df_good = spark.read.csv(goods_cache,header=True)
    df_stock = spark.read.csv(stock_cache,header=True)
    df=df_stock.join(df_good,['城市','商品编码'],'left')

    df2=df.groupBy(['城市','客户']).agg(
        sum("库存").alias("求和"),
        countDistinct("商品编码").alias("去重计数")
    )
    df2.show(truncate=False)
    end = datetime.datetime.now()
    print('\n\n PySpark   运行时长     '+str(end-start)+'\n\n')


def test_pandas_join():
    start=datetime.datetime.now()

    import pandas as pd
    df_stock=pd.read_csv(stock_cache)
    df_goods=pd.read_csv(goods_cache)

    df=pd.merge(df_stock,df_goods,on=['城市','商品编码'],how='left')

    df2=df.groupby(['城市','客户']).agg({'库存':np.sum,'商品编码':pd.Series.nunique}).reset_index()
    df2.rename(columns={'库存':'求和','商品编码':'去重计数'},inplace=True)
    print(df2.head())

    end = datetime.datetime.now()
    print('\n\n Pandas   运行时长     '+str(end-start)+'\n\n')

def test_modin_join():

    import os
    os.environ["MODIN_OUT_OF_CORE"]='true'
    os.environ["MODIN_MEMORY"]='200000000000'
    start=datetime.datetime.now()
    import modin.pandas as pd

    df_stock=pd.read_csv(stock_cache)
    df_goods=pd.read_csv(goods_cache)
    df=pd.merge(df_stock,df_goods,on=['城市','商品编码'],how='left')

    df2=df.groupby(['城市','客户']).agg({'库存':np.sum,'商品编码':pd.Series.nunique}).reset_index()
    df2.rename(columns={'库存':'求和','商品编码':'去重计数'},inplace=True)
    print(df2.head())
    end = datetime.datetime.now()
    print('\n\n Modin   运行时长     '+str(end-start)+'\n\n')



if __name__ == "__main__":
    # test_modin()
    # test_pandas()
    # test_pyspark()

    test_modin_join()
    # test_pandas_join()
    # test_pyspark_join()
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容