在无内置支持时实现的最少使用cache
# coding: utf8
try:
from functools import lru_cache
except ImportError:
from collections import namedtuple
from functools import update_wrapper
from threading import RLock
# 缓存情况说明 这个是在函数内
# 每个被lru_cache包裹的函数都有其自身的缓存
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
class _HashedSeq(list):
__slots__ = 'hashvalue'
def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)
def __hash__(self):
return self.hashvalue
def _make_key(args, kwds, typed,
kwd_mark = (object(),),
fasttypes = {int, str, frozenset, type(None)},
sorted=sorted, tuple=tuple, type=type, len=len):
key = args
if kwds:
sorted_items = sorted(kwds.items())
key += kwd_mark
for item in sorted_items:
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for k, v in sorted_items)
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return _HashedSeq(key)
# 最少使用的 使用次数决定是否剔除
def lru_cache(maxsize=100, typed=False):
def decorating_function(user_function):
"""
函数装饰器 user_functions即为被包裹的函数
:param user_function:
:return:
"""
stats = [0, 0] # 统计
HITS, MISSES = 0, 1 # HITS -> stats[0]
# MISSES -> stats[1]
# 缓存就是用dict,key则为_make_key计算出来的,获取方法则为dict的get方法
cache = dict()
make_key = _make_key
cache_get = cache.get
_len = len # 把全局函数len局部化 提升性能
# 用于指定长度的cache的
# 使用长度为4的list作为节点 包含两个指针和key value
lock = RLock()
root = []
root[:] = [root, root, None, None] # 分别指向 前 后 和 key value
nonlocal_root = [root]
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
if maxsize == 0:
def wrapper(*args, **kwds):
"""
如果cache大小为0 就是没缓存
:param args:
:param kwds:
:return:
"""
result = user_function(*args, **kwds) # 每次都是调用函数计算
stats[MISSES] += 1 # 每次都是未命中的情况
return result
elif maxsize is None:
def wrapper(*args, **kwds):
"""
无限大小的cache
:param args:
:param kwds:
:return:
"""
key = make_key(args, kwds, typed) # 计算出这些参数对应的key
result = cache_get(key, root) # root作为哨兵
if result is not root:
# 找到了
stats[HITS] += 1 # 命中
return result
# 没找到
result = user_function(*args, **kwds)
cache[key] = result
stats[MISSES] += 1
return result
else:
def wrapper(*args, **kwds):
"""
有大小限制的cache
:param args:
:param kwds:
:return:
"""
key = make_key(args, kwds, typed) if kwds or typed else args
with lock:
link = cache_get(key)
# 找到了
if link is not None:
root, = nonlocal_root
link_prev, link_next, key, result = link # 解出来 4个值
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
# 把这个值从链中解下来
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
# 连接在链最后
stats[HITS] += 1
return result
result = user_function(*args, **kwds)
with lock:
root, = nonlocal_root
if key in cache:
# 这个可能是在刚刚释放锁后有人计算出来了,所以这边就返回就行了,不用再进行缓存相关的处理了
pass
elif _len(cache) >= maxsize:
# 缓存区满的情况
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
root = nonlocal_root[0] = oldroot[NEXT]
oldkey = root[KEY]
oldvalue = root[RESULT]
root[KEY] = root[RESULT] = None
del cache[oldkey]
cache[key] = oldroot
else:
last = root[PREV]
link = [last, root, key, result]
last[NEXT] = root[PREV] = cache[key] = link
stats[MISSES] += 1
return result
def cache_info():
with lock:
return _CacheInfo(stats[HITS], stats[MISSES], maxsize, len(cache))
def cache_clear():
"""
清空缓存
:return:
"""
with lock:
cache.clear()
root = nonlocal_root[0]
root[:] = [root, root, None, None]
stats[:] = [0, 0]
wrapper.__wrapped__ = user_function
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return update_wrapper(wrapper, user_function)
return decorating_function