python在处理体积较大的文件有独特的优势,底层支持分批读取,这样可以读取的文件最大可以达到几个G。同时分批处理也可以考虑加入多线程,可以大大提高运算效率,但在处理时候也需要考虑内存和区块问题。
1. 按行读取
这种按行去读取直接就使用底层提供的函数即可:
with open(_file_path) as f:
for line in f:
...
with open本身就带了打开文件、出现异常时侯处理、退出时候close文件的功能。for后面可以很轻易就获取每行内容。
如果你想读取特定的行数,可以用:
lines = f.readlines(100)
注意返回的lines也是个迭代器容器,也就是list,可以用for循环取出,只是读取的内容会全部存入内存,要注意本文所讨论的内存溢出问题。这行代码只是读取100行,如果想接着读取,外层需要加while来控制。读到最后没有内容lines就为空,可以以此判断。
2. 按自定义字节数读取
假设我现在有文件处理的不是以行为单位,而是以两个换行或者其他标识符作为数据块结尾,比如文件内容是:
names=what\n
alias=whatever\n
\n
\n
names=lala\n
alias=nana\n
othername=ii\n
\n
\n
...
那如果按行读取处理难免数据块没办法完整获取,当然这个例子是回车,麻烦一些也可以获取到,但如果分隔符不是\n\n而是$$或者更复杂的情况该怎么办呢?
所以我们可能需要按字节去读取:
def read_by_bytes(size_in_bytes):
# read_index = 0
# total_index = int(os.stat(_file_path).st_size / (10 * 1024 * 1024)) + 1
with open(_file_path) as f:
prev = ''
f_read = partial(f.read, size_in_bytes)
'''迭代器,返回''时停止'''
for text in iter(f_read, ''):
read_index += 1
# print("processing :{:.2%}".format(read_index/total_index))
# if not text.endswith('\n\n') and not len(text) < size_in_bytes:
if not text.endswith('\n\n'):
try:
text, rest = text.rsplit('\n\n', 1)
text = prev + text
prev = rest
except Exception as e:
'''针对最后一块数据没有\n\n结尾的情况'''
text = prev + text
prev = ''
else:
text = prev + text
prev = ''
'''对每块读取的字节进行操作'''
process_data(text)
'''对最后剩余的字节进行操作'''
if prev:
process_data(prev)
这里面我用的代码较为复杂,因为要处理一些额外的报错。
partial(f.read, size_in_bytes)实际相当于f.read(size_in_bytes),而partial则是用于给函数传固定参数。
大体上,先使用os.stat(_file_path).st_size这个属性获取文件实际的体积,然后按自己的需求去确定每次读取多少字节,用f_read = partial(f.read, size_in_bytes)这个函数来读取。读取出来的是迭代器,for text in iter(f_read, ''):这行表示的是当迭代器返回空时就停止。
然后我们开始对读取出来的数据内容进行处理,这里用中间的内容举例:取出来的内容,前面部分有可能是上一个数据块的结尾,后面部分又可能是后面一个数据块的开头,那么我们就要做合并和归类。
所以我们用最后出现的\n\n作为分隔符去切割,text, rest = text.rsplit('\n\n', 1)把内容分为两块,前面的一块text加上次切割保存的rest(本次就是prev)就构成了完整的一块,剩下的rest保留给下一次用。
这里用到了try也就是说可能切割的文本里没有\n\n会出现异常,这跟你的文本内容有关。因为没有\n\n做整个文本的结尾或者切割的最后一块里刚好不包含\n\n(不是一块完整的数据块)。那么切割会抛出异常,我们就把上次剩的rest加上本次的text进行process_data就可。
如果说我们切割的刚刚好是\n\n结尾了,那么整块数据可以直接被处理,prev就为空即可。
最后一定要注意,还有一种概率很小但是会出现的问题,就是最后一次切割时候,正好text和rest都是完整数据,而循环里把text处理好了,接着退出循环,但是prev不能忘记处理。(这种情况出现在整个内容结尾没有\n\n的时候)
3. python大文件处理的性能优化
既然是做大文件处理,那么不仅考验内存,还考验处理速度。
(1) 可以优化每次读入的字节数
这里用第二种方法举例,函数read_by_bytes里定义的total_index实际没有用到,这个总字节数实际是在函数外面用到的,函数里真正决定读取字节数的是传进来的参数size_in_bytes。
根据size_in_bytes我们其实可以进行性能调优,我测试是几百个字节的情况下要远远优于几百万字节,当然这跟你的程序process_data有关系,可以自行调整。
而在函数外面,则可以计算文件总体大小,即total_index,然后可以根据想要开启的进程数量计算size_in_bytes。或者反过来,根据size_in_bytes计算进程数量。
(2) 结合多进程/多线程
前面介绍的两个方法可以根据需要结合多进程,多进程的处理速度肯定远远优于单进程。只是多进程处理需要读取的数据完整且处理程序不能有所谓的“上下文”,换句话说就是:
- 多进程之间最好没有通信
- 分给每个进程的数据最好是独立的
def read_and_calcu_multiprocess(self, start, end, framerate, points, progress = None):
thds = []
for i in range(start, end):
t = i * 1 / framerate * points
'''读取特定开始,长度的数据'''
f = wave.open(self.__path, "rb")
wave_date = self.wave_read_size(f, points * i, points)
f.close()
thd = self.calcu_thd(wave_date)
if thd >= 1.0:
print("start: {}, time: {:.6f}, thd: {}".format(i, t, thd))
thds.append(thd)
if progress:
progress[0] = i
return thds
def process_file_in_multiprocess(self, process_count, progress, points):
'''打开文件,先读取参数'''
f = wave.open(self.__path, "rb")
params = f.getparams()
nchannels, sampwidth, framerate, nframes = params[:4]
f.close()
self.start_time = time.time()
'''每1024个点计算'''
total = int(nframes / points)
step = total/process_count
'''每个线程total/process_count'''
for p in range(process_count):
start = int(p*step)
end = int((p + 1)*step) if (p + 1) < process_count else total
pro = Process(target=self.read_and_calcu_multiprocess, args=(start,end,framerate,progress)) \
if p == 0 else Process(target=self.read_and_calcu_multiprocess, args=(start,end,framerate))
pro.start()
while progress[0] < int(step) - 1:
if progress[0] % round(step/20) == 0 and progress[0] != 0:
print("progress: {:.2f}, progress: {}".format(progress[0]/int(step)*100, progress[0]))
time.sleep(0.01)
print("cost time: {:.6f}".format(time.time() - self.start_time))
process_file_in_multiprocess函数就是使用多进程进行处理大文件,Process(target=self.read_and_calcu_multiprocess, args=(start,end,framerate,progress))创建一个新进程,并把处理的函数和参数传过去。
最重要的是start和end这两个参数,为了避免进程们重复处理数据,我们需要标记好每个进程处理的部分。在单个进程中我们使用self.wave_read_size读取特定的部分。类似于按照字节数量读取。
以上我们把读取文件部分和处理文件部分一起用多进程进行了分担,但是如果只把处理文件部分摘出来(处理文件部分消耗性能更多),那么就更容易了。
如果是使用之前介绍的第一种方法,就在处理数据时候每次都创建一个进程。
f = wave.open(self.__path, "rb")
with open(_file_path) as f:
for line in f:
process = Process(target=self.read_and_calcu_multiprocess, args=(line))
process.start()
对于每个进程所执行的函数:(这里的process没有单独列出一个类,实际上可能把方法包装在类中更合适)
def read_and_calcu_multiprocess(self, line):
produce_data(line)
其实对于line字节数较少的内容,这种方法的系统开销较大,因为要创建进程、执行方法并销毁进程,会大量消耗系统资源。
上述问题其实可通过把多行内容合并,一起传给进程,比如可以简单地取一个行数作为标准,读取到一定行数就创建个进程并把数据发送过去。
到目前为止,关于多线程的资源和性能问题都可以被有效解决。但是这里面还涉及数据处理的问题,每批次处理的数据应当是独立的,如果有需要存储的信息就必须在进程函数或者类之外存储,那么可以使用全局变量或者自己创建一个资源类专门用于储存,这个资源类可以是全局唯一,也就是单例模式。如果这个资源类是所有进程共享的话,还需要注意锁的应用,避免脏数据。