漏桶算法
水(请求)先进入漏桶,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会溢出(访问频率超过接口响应速率),拒绝请求。
"""An implementation of the leaky bucket algorithm.
"""
from time import time
class LeakyBucket(object):
def __init__(self, capacity, fill_rate, is_lock=False):
"""
:param capacity: The total tokens in the bucket.
:param fill_rate: The rate in tokens/second that the bucket will be refilled
"""
self._capacity = float(capacity)
self._cur_tokens = 0
self._fill_rate = float(fill_rate)
self._is_lock = is_lock
self._last_time = time()
def get_cur_tokens(self):
now = time()
delta = self._fill_rate * (now - self._last_time)
self._cur_tokens = max(0, self._cur_tokens + delta)
return self._cur_tokens
def _consume(self, tokens):
if tokens <= self.get_cur_tokens():
self._cur_tokens += tokens
return True
return False
def consume(self, tokens):
return self._consume(tokens)
令牌桶算法
随着时间的流逝,系统会按恒定1/QPS的时间间隔往桶里加如Token,如果桶已经满了就不再加入了,新请求来临时,会拿走一个Token,若没有Token可以拿了,就阻塞或拒绝请求。
"""An implementation of the token bucket algorithm.
"""
from time import time
from threading import RLock
class TokenBucket(object):
def __init__(self, capacity, fill_rate, is_lock=False):
"""
:param capacity: The total tokens in the bucket.
:param fill_rate: The rate in tokens/second that the bucket will be refilled
"""
self._capacity = float(capacity)
self._tokens = float(capacity)
self._fill_rate = float(fill_rate)
self._last_time = time()
self._is_lock = is_lock
self._lock = RLock()
def _get_cur_tokens(self):
if self._tokens < self._capacity:
now = time()
delta = self._fill_rate * (now - self._last_time)
self._tokens = min(self._capacity, self._tokens + delta)
self._last_time = now
return self._tokens
def get_cur_tokens(self):
if self._is_lock:
with self._lock:
return self._get_cur_tokens()
else:
return self._get_cur_tokens()
def _consume(self, tokens):
if tokens <= self.get_cur_tokens():
self._tokens -= tokens
return True
return False
def consume(self, tokens):
if self._is_lock:
with self._lock:
return self._consume(tokens)
else:
return self._consume(tokens)