# 起因
最近懒癌犯的有点长,学了一些东西正好再系统梳理下。
之前需要对大量字符串进行模糊匹配,网上查了查发现fuzzyset第三方库刚好满足我的需求,而且原理简单(核心代码只有100行不到!)。但是由于业务自身需求,需要对源码进行一定的修改,这里记录下对源码的学习,方便自己以后查阅。
# 背景知识
Q:如何度量两个字符串之间的相似度或者差异程度?
Levenshtein距离
便是一种方法,又叫编辑距离(Edit Distance),是指两个字符串之间,通过替换、插入、删除的方式(每次仅限编辑一个字符),从一个转换成另一个所需的最小编辑次数。
如将abcd
一字转成acdb
可以有很多种方法:
abcd->accd->acdd->acdb
abcd->acd->acdb
最小次数显然是第二种,所以从abcd
转成acdb
的Levenshtein距离就是2,具体如何求解可通过动态规划的思想来完成,这里不深入下去。
# fuzzyset算法思想
通过n-grams模型把字符串简单拆分,计算两个字符串之间的余弦相似度。若选择use_levenshtein(需要手动安装python-Levenshtein 包),则在余弦相似度的基础上再通过Levenshtein距离计算出一个新的得分,取得分最高的字符串输出(具体细节实现与技巧优化见源码分析)
# 源码解析
1. 切分字符串
给定一个英文单词(string),是如何切分的?作者给了例子说明,比如有个字符串michaelich
,我们设置gram_size=3,则切分成如下形式:
'-mi'
'mic'
'ich'
'cha'
'hae'
'ael'
'eli'
'lic'
'ich'
'ch-'
对应代码也容易懂,同时为了节省内存,产生了一个生成器。
def _iterate_grams(value, gram_size=2):
simplified = '-' + _non_word_re.sub('', value.lower()) + '-'
len_diff = gram_size - len(simplified)
if len_diff > 0:
simplified += '-' * len_diff
for i in range(len(simplified) - gram_size + 1):
yield simplified[i:i + gram_size]
接着,作者对每个被切分出来的最小单元进行词频统计(text='a-CAc9f')
,形式如下:
{'-a': 1, 'ac': 2, 'ca': 1, 'c9': 1, '9f': 1, 'f-': 1}
代码easy
def _gram_counter(value, gram_size=2):
result = collections.defaultdict(int)
for value in _iterate_grams(value, gram_size):
result[value] += 1
return result
2. 添加语料库(字符串)
当我输入一个英文单词(str),内部发生了什么?来看代码
def __init__(self, iterable=(), gram_size_lower=2, gram_size_upper=3, use_levenshtein=True):
self.exact_set = {}
self.match_dict = collections.defaultdict(list)
self.items = {}
self.use_levenshtein = use_levenshtein
self.gram_size_lower = gram_size_lower
self.gram_size_upper = gram_size_upper
for i in range(gram_size_lower, gram_size_upper + 1):
self.items[i] = []
for value in iterable:
self.add(value)
这里只是初始化,那我们具体看看add()
与__add()
方法
def add(self, value):
lvalue = value.lower()
if lvalue in self.exact_set:
return False
for i in range(self.gram_size_lower, self.gram_size_upper + 1):
self.__add(value, i)
def __add(self, value, gram_size):
lvalue = value.lower()
items = self.items[gram_size] #这里要注意:items的改动会传递给 self.items
idx = len(items)
items.append(0)
grams = _gram_counter(lvalue, gram_size)
norm = math.sqrt(sum(x**2 for x in grams.values()))
for gram, occ in grams.items():
self.match_dict[gram].append((idx, occ))
items[idx] = (norm, lvalue)
self.exact_set[lvalue] = value
如果默认n-grams
是从2到3的情况下:
-
self.items会存放小写的英文字符串(输入)与对应被
2-grams
与3-grams
切分后的模,形式如下:{ 2: [(3.0 , 'a-cac9f') , (., .) , (., .)],
3: [(2.449489742783178 , 'a-cac9f') , (. , .), (. , .)]} -
self.match_dict形式如下:
{ '-a': [(0, 1)],
'ac': [(0, 2)], #注意:若之后另一个单词被同样切出'ac',这里可能为 'ac':[(0,2), (1,1)]
'ca': [(0, 1)],
'c9': [(0, 1)],
'9f': [(0, 1)],
'f-': [(0, 1)],
'-ac': [(0, 1)],
'aca': [(0, 1)],
'cac': [(0, 1)],
'ac9': [(0, 1)],
'c9f': [(0, 1)],
'9f-': [(0, 1)]} -
self.exact_set为原输入的英文字符串与小写之后的字符串形式,如:
{'a-cac9f': 'a-CAc9f'}
3. 模糊匹配
我们来看一下是如何对目标字符串进行模糊匹配的,主要关注__get()方法
def _distance(str1, str2):
distance = Levenshtein.distance(str1, str2)
if len(str1) > len(str2):
return 1 - float(distance) / len(str1)
else:
return 1 - float(distance) / len(str2)
def __get(self, value, gram_size):
lvalue = value.lower()
matches = collections.defaultdict(float)
grams = _gram_counter(lvalue, gram_size)
items = self.items[gram_size]
norm = math.sqrt(sum(x**2 for x in grams.values()))
for gram, occ in grams.items(): #(切分最小单元,出现次数)
for idx, other_occ in self.match_dict.get(gram, ()): #属于第几个输入字符串,出现次数
matches[idx] += occ * other_occ
if not matches:
return None
# cosine similarity
results = [(match_score / (norm * items[idx][0]), items[idx][1])
for idx, match_score in matches.items()]
results.sort(reverse=True, key=operator.itemgetter(0))
if self.use_levenshtein:
results = [(_distance(matched, lvalue), matched)
for _, matched in results[:50]]
results.sort(reverse=True, key=operator.itemgetter(0))
return [(score, self.exact_set[lval]) for score, lval in results
if score == results[0][0]]
先对目标字符串进行同样的切分方式,再分别与每个输入字符串进行“点乘”•
,并计算出余弦相似度。
我们看到,若不使用Levenshtein距离
,最后输出的是余弦相似度取值最大的一组或多组字符串。若使用Levenshtein距离
,首先为了减少运算量,只取余弦相似度排名前50个字符串,再然后对其计算Levenshtein距离
,并通过简单的运算得出另一个分值,最后输出得分最高的一组或多组字符串。
后续的源码不难,但还是一并贴出来比较方便看
def get(self, key, default=None):
try:
return self[key] # 调用 __getitem__
except KeyError:
return default
def __getitem__(self, value):
lvalue = value.lower()
result = self.exact_set.get(lvalue) #若完全一样,返回(1,原词语)
if result:
return [(1, result)]
for i in range(self.gram_size_upper, self.gram_size_lower - 1, -1):
results = self.__get(value, i)
if results is not None:
return results
raise KeyError(value)