首先我们可以确定的是不能用read()与readlines()函数;
因为如果将这两个函数均将数据全部读入内存,会造成内存不足的情况。
针对数据按行划分的文件
以计算行数为例,首先针对几种不同的方法来作比较:
1、使用for遍历的方法,比较美观,网上搜索到八有十九是让你这样做,尽管已经很快了但还不是最快的
start = time.time()
with open(dataPath, 'r') as f:
count = 0
for line in f:
count += 1
print(count)
print(time.time() - start)
输出:
5000
0.09386205673217773
2、使用readline()模拟遍历,发现其实结果和第一种差不多
start = time.time()
with open(dataPath, 'r') as f:
line = f.readline()
count = 1
while line:
count += 1
line = f.readline()
print(count - 1)
print(time.time() - start)
输出:
5000
0.09433221817016602
3、对比readlines()直接去访问,结果却更慢了!
start = time.time()
with open(dataPath, 'r') as f:
count = 0
for line in f.readlines():
count += 1
print(count)
print(time.time() - start)
输出:
5000
0.12223696708679199
4、不断去检测文件指针位置,有的时候我们可能需要读到特定的文件位置就停下;就会发现tell()十分耗时!
start = time.time()
with open(dataPath, 'r') as f:
count = 0
while f.tell() < datasize:
f.readline()
count += 1;
print(count)
print(time.time() - start)
输出:
5000
0.29171299934387207
5、使用mmap的方法,mmap是一种虚拟内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。通过建立一个文件的内存映射将使用操作系统虚拟内存来直接访问文件系统上的数据,而不是使用常规的I/O函数访问数据。内存映射通常可以提供I/O性能,因为使用内存映射是,不需要对每个访问都建立一个单独的系统调用,也不需要在缓冲区之间复制数据;实际上,内核和用户应用都能直接访问内存,是目前测到最快的方法。
import mmap
start = time.time()
with open(dataPath, "r") as f:
# memory-map the file, size 0 means whole file
map = mmap.mmap(f.fileno(), length=0, access=mmap.ACCESS_READ)
count = 0
while map.readline():
count += 1
print(count)
map.close()
print(time.time() - start)
输出:
5000
0.023865938186645508
6、可以不按行读取,而是按块读取,然后分析\n的个数,但这只针对计算行数而论可行,但我们真正想要的是按行读取数据,所以这里只给出实现方法,不进行对比。
with open(r"d:\lines_test.txt",'rb') as f:
count = 0
while True:
buffer = f.read(1024 * 8192)
if not buffer:
break
count += buffer.count('\n')
print count
考虑MPI的情况
当文件很大的时候,任务又需要并行化的话,我们可以将文件拆分成多段去处理,例如对于4核的电脑,可以让4条进程分别去处理文件不同的部分,每条进程读四分之一的数据。但这时候要考虑到,分割点不一定刚好是换行符的情况,所以我们可以考虑从分割点下一个换行符开始搜索,分割点第一个换行符之前的交给前一个进程去处理,处理方法如图:
实现类似:
from mpi4py import MPI
import platform
import sys
import io
import os
import mmap
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
comm = MPI.COMM_WORLD
comm_size = comm.size
comm_rank = comm.rank
with open(filePath, 'r', encoding='utf-8') as f:
# Set the file pointer to the beginning of a line after blockSize * rank
# Use mmap to run faster
map = mmap.mmap(f.fileno(), length=0, access=mmap.ACCESS_READ)
map.seek(comm_rank * blockSize)
if comm_rank != 0:
map.readline()
# Each process handle about blocksize lines.
blockEnd = (comm_rank + 1) * blockSize
# Use index here to avoid using twice map.tell()
index = map.tell()
while index <= blockEnd and index < dataSize:
# line = map.readline().translate(None, b'\x00').decode()
line = map.readline().decode('utf-8')
index = map.tell()
try:
dosomething(line)
except Exception as err:
print(err)
continue
如果不用mmap.tell()改用f.tell()的话,效率其差,一开始我遇到这种情况的时候是想着自己不断去加len(line)去自己计算文件指针的位置的。但又发现一个问题,file.readline()会帮你去除部分字符,例如\r\n只会保留\n,而mmap.readline()则不会,而且试过表示很难,总是和f.tell()对不齐。
数据按特殊符号划分
考虑到可能数据划分点不是\n, 我们可以这样读取:
def rows(f, chunksize=1024, sep='|'):
"""
Read a file where the row separator is '|' lazily.
Usage:
>>> with open('big.csv') as f:
>>> for r in rows(f):
>>> process(row)
"""
curr_row = ''
while True:
chunk = f.read(chunksize)
if chunk == '': # End of file
yield curr_row
break
while True:
i = chunk.find(sep)
if i == -1:
break
yield curr_row + chunk[:i]
curr_row = ''
chunk = chunk[i+1:]
curr_row += chunk
数据无特定划分方式
一种方法是用yield:
def read_in_chunks(file_object, chunk_size=1024):
"""Lazy function (generator) to read a file piece by piece.
Default chunk size: 1k."""
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open('really_big_file.dat') as f:
for piece in read_in_chunks(f):
process_data(piece)
另外一种方法是用iter和一个helper function:
f = open('really_big_file.dat')
def read1k():
return f.read(1024)
for piece in iter(read1k, ''):
process_data(piece)