Python 多进程处理Pandas

1 怎么多进程处理

比如,对于一个pd.DataFrame 数据,需要对其中的每一行进行一个gen_new_df操作,然后将所有的结果拼接起来。

import pandas as pd
import multiprocessing as mp
import numpy as np
# test_df: 你要处理的数据
# pro_count: 进程数量
test_df_split = np.array_split(test_df, pro_count)
pool = mp.Pool(pro_count)
df = pd.concat(pool.map(gen_new_df, test_df_split))
pool.close()
pool.join()

2 实例

2.1 生成测试数据:

import pandas as pd
from faker import Faker
from datetime import datetime

fake = Faker()
n = 10000

# 生成随机的名字和时间戳
name_data = [fake.name() for _ in range(n)]  # 生成随机名字
timestamp = [datetime.now().strftime('%Y-%m-%d %H:%M:%S') for _ in range(n)]  # 生成时间戳

# 创建DataFrame
df = pd.DataFrame({"Name": name_data, "Time": timestamp})

# 将DataFrame保存为CSV文件
df.to_csv("test.csv", index=False)

2.2 定义处理函数:

比如我们这里,让1行变成5行

def gen_new_df(input_df):
    new_df = pd.DataFrame(columns=['Name', 'Time', 'uuid', 'index'])
    for i in range(len(input_df)):
        input_dic = input_df.iloc[i].to_dict()
        for j in range(5):
            new_dic = {
                "Name": input_dic["Name"],
                "Time": input_dic["Time"],
                "uuid": str(uuid.uuid4()),
                "index": str(j)
            }
            new_df = pd.concat([new_df, pd.DataFrame([new_dic])], ignore_index=True)
    return new_df

2.3 多进程处理

可以先查看我们的CPU核数:

import multiprocessing as mp
num_cores = mp.cpu_count()
print(num_cores)

用不同的进程数量来处理:

if __name__ == '__main__':
    # 用不同的进程数量进行测试
    test_pro_count = [1, 2, 4, 7, 8, 9, 10, 15, 20, 25, 30]
    test_df = pd.read_csv("test.csv")
    time_con_list = []

    begin_time = time.time()
    gen_new_df(test_df)
    time_con = time.time() - begin_time
    print("不使用多进程: ", "---", time_con)

    time_consume = []
    for pro_count in test_pro_count:
        begin_time = time.time()
        test_df_split = np.array_split(test_df, pro_count)
        pool = mp.Pool(pro_count)

        df = pd.concat(pool.map(gen_new_df, test_df_split))
        pool.close()
        pool.join()

        time_con = time.time() - begin_time
        time_con_list.append(time_con)
        print("进程数量:", pro_count, "消耗时间: ", time_con)
    print(time_con_list)

3 效率分析

进程数量 --- 消耗时间
0 --- 27.70097780227661
1 --- 28.131190061569214
2 --- 9.749696016311646
4 --- 4.519064903259277
7 --- 2.705703020095825
8 --- 2.702981948852539
9 --- 3.6096010208129883
10 --- 2.7417500019073486
15 --- 4.223258018493652
20 --- 3.8286550045013428
25 --- 5.2411949634552
30 --- 6.33648681640625
测试结果.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容