最近碰到如下的一个问题 如何快速合并两个大文件?
有A,B两个文件。这两个文件都有以下特性
A,B两个文件每一行都可以被解析为json 格式
A,B两个文件都比较大(至少大于500M)
A,B两个文件可以通过将每行转化为json之后通过某个相同key的值合并
A文件有a行,B文件有b行
-
A,B两个文件两行可以合并的条件是唯一的。
比如 A文件的第一行可以是这样的一个json
{
'id':1,
'test': 2
}
B文件的第二行可以是这样的一个json
{
'id':1,
'test2':3
}
我们可以通过两个相同的key合并为这样一个json
{
'id':1,
'test':2,
'test2':3
}
现在的问题就是如何把他们合并?
第一个想法——暴力合并
- 先读取A文件的一行数据,然后转化为json格式
- 将A某一行的json格式的数据同B文件中的数据一一校验,判断两个数据数据是否可以合并
这个想法的算法复杂度是o(a*b),当文件的行数很大的时候,情况很堪忧。以我目前的处理的两个文件来说,两个文件都是9万多行。对于cpu来说,他最坏的时候需要90000 * 90000次指令。目前的cpu每秒大概能执行百万级的指令,那么执行这个合并操作也需要2个小时的时间
第二个方法-先预处理一下
我们可以先算出A文件中哪一行数据可以同B文件中哪几行的文件。因为合并的条件是唯一的。那么对于我们现在的处理的文件来说是算法复杂度为o(max(a,b))
但是这个算法还是有一个问题,当你已经知道两个文件哪俩行的文件需要合并的。你如何从这两个大文件中快速的读出这两行文件。
我目前的想法是记录每一行的偏移量的位置。
记录偏移量的话我刚开始是想采用以下代码的方法的。直接把文件中的数据读到内存中
执行以下代码,276M 需要2秒,17G 需要10秒
def test_merge(filename):
start_time = time.time()
print "start {} function is {}".format(test_merge.__name__, start_time)
f = open(filename, "r")
line = []
line.append(0)
filename_size = os.path.getsize(filename)
chars = f.read(filename_size)
count = 0
for char in chars:
if char == '\n':
line.append(count)
count = count + 1
但是上面的代码在当文件很大的时候是很容易爆内存的.
所以在此我们可以采用第二个函数可以通过,先计算出每个文件的大小,然后每个读出根号大小的数
def merge_by_file(filename):
start_time = time.time()
print "start {} function is {}".format(merge_by_file.__name__,start_time)
f = open(filename, "r")
line = []
line.append(0)
filename_size = os.path.getsize(filename)
filename_length = int(math.sqrt(filename_size))
count = 0
print filename_length
for i in xrange(filename_length + 2):
if count + filename_length < filename_size:
chars = f.read(filename_length)
for char in chars:
count = count + 1
if char == '\n':
line.append(count)
else:
chars = f.read(filename_size - count)
for char in chars:
count = count + 1
if char == '\n' and count != filename_size:
line.append(count)
break
f.close()
由此,我在处理两个9万行文件的脚本每次运行的时间缩短到2分钟。