背景:有4个文件待处理,每个文件大小均为20G,但是电脑内存仅有36G,那么如何用最快的速度将这四个文件进行处理用到的pandas的分块处理和python编程中的多线程、多进程
1、分块读取文件(pandas)
import pandas as pd
#最后定义如下函数返回df:
def main(filePath,fileName):
mylist = []
for chunk in pd.read_csv(file, sep='\t', chunksize=5000):
【...
(每块的处理逻辑)
result = func(chunk)】
#将每块处理的结果放到列表mylist里面
mylist.append(result)
temp_df = pd.concat(mylist, axis= 0)
...
return temp_df
main(filePath,fileName)
2、多线程(假设每个大文件的处理逻辑是1,即将一个大文件按照)
import threading
if __name__ == '__main__':
print("程序开始...")
# 要处理的四个文件
file = rootPath + "/dataSet/DataSet2017/sc_2017_total_month_"
textList = ["11_aa","11_ab","11_ac","11_ad"]
# 创建线程
threads = []
files = range(len(textList))
#创建线程
for i in files:
print("线程:",i)
t = threading.Thread(target=main,args=(file + textList[i],))
threads.append(t)
#for循环遍历数组,启动各个字线程
for i in files:
threads[i].start()
# join()作用:用于等待线程终止,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
for i in files:
threads[i].join()
#主线程
print("程序开始...")
3、多进程
# 导入进程池
from multiprocessing import Pool
if __name__ == '__main__':
print("程序开始...")
file = rootPath + "/dataSet/DataSet2017/sc_2017_total_month_"
textList = ["11_aa","11_ab","11_ac","11_ad"]
# 将要处理的文件列表长度
files = range(len(listFile))
# 创建线程,设置进程池进程数目为4
ps = Pool(4)
#创建进程
for i in files:
ps.apply_async(main,args=(filePath,listFile[i],)) # 异步执行(若想要同步执行,将apply_async换成apply)
# 里面是main函数的两个参数,注意,最后一个参数后面要有逗号
# 关闭进程池,停止接受其它进程
ps.close()
# 阻塞进程
ps.join()
#主线程
print("程序开始...")
结论:经过测试发现,对于本任务而言,多线程处理时间和单线程处理时间一样,并未有任何缩短。但是,多进程处理要比单线程、多线程处理的速度提升两倍。
对于后续处理任务,多线程和多进程的处理速度到底那个更快,要依赖于处理的任务。在决定使用多线程还是多进程时,可以进行简单的测试对比。