布隆过滤器的应用
我们先来看下布隆过滤器的应用场景,让大家知道神奇的布隆过滤器到底能做什么。
缓存穿透
我们经常会把一部分数据放在Redis等缓存,比如产品详情。这样有查询请求进来,我们可以根据产品Id直接去缓存中取数据,而不用读取数据库,这是提升性能最简单,最普遍,也是最有效的做法。一般的查询请求流程是这样的:先查缓存,有缓存的话直接返回,如果缓存中没有,再去数据库查询,然后再把数据库取出来的数据放入缓存,一切看起来很美好。但是如果现在有大量请求进来,而且都在请求一个不存在的产品Id,会发生什么?既然产品Id都不存在,那么肯定没有缓存,没有缓存,那么大量的请求都怼到数据库,数据库的压力一下子就上来了,还有可能把数据库打死。
虽然有很多办法都可以解决这问题,但是我们的主角是“布隆过滤器”,没错,“布隆过滤器”就可以解决(缓解)缓存穿透问题。至于为什么说是“缓解”,看下去你就明白了。
大量数据,判断给定的是否在其中
现在有大量的数据,而这些数据的大小已经远远超出了服务器的内存,现在再给你一个数据,如何判断给你的数据在不在其中。如果服务器的内存足够大,那么用HashMap是一个不错的解决方案,理论上的时间复杂度可以达到O(1),但是现在数据的大小已经远远超出了服务器的内存,所以无法使用HashMap,这个时候就可以使用“布隆过滤器”来解决这个问题。但是还是同样的,会有一定的“误判率”。
什么是布隆过滤器
布隆过滤器是一个叫“布隆”的人提出的,它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是0,就是1。
现在我们新建一个长度为16的布隆过滤器,默认值都是0,就像下面这样:
现在需要添加一个数据:
我们通过某种计算方式,比如Hash1,计算出了Hash1(数据)=5,我们就把下标为5的格子改成1,就像下面这样:
我们又通过某种计算方式,比如Hash2,计算出了Hash2(数据)=9,我们就把下标为9的格子改成1,就像下面这样:
还是通过某种计算方式,比如Hash3,计算出了Hash3(数据)=2,我们就把下标为2的格子改成1,就像下面这样:
这样,刚才添加的数据就占据了布隆过滤器“5”,“9”,“2”三个格子。
可以看出,仅仅从布隆过滤器本身而言,根本没有存放完整的数据,只是运用一系列随机映射函数计算出位置,然后填充二进制向量。
这有什么用呢?比如现在再给你一个数据,你要判断这个数据是否重复,你怎么做?
你只需利用上面的三种固定的计算方式,计算出这个数据占据哪些格子,然后看看这些格子里面放置的是否都是1,如果有一个格子不为1,那么就代表这个数字不在其中。这很好理解吧,比如现在又给你了刚才你添加进去的数据,你通过三种固定的计算方式,算出的结果肯定和上面的是一模一样的,也是占据了布隆过滤器“5”,“9”,“2”三个格子。
但是有一个问题需要注意,如果这些格子里面放置的都是1,不一定代表给定的数据一定重复,也许其他数据经过三种固定的计算方式算出来的结果也是相同的。这也很好理解吧,比如我们需要判断对象是否相等,是不可以仅仅判断他们的哈希值是否相等的。
也就是说布隆过滤器只能判断数据是否一定不存在,而无法判断数据是否一定存在。
按理来说,介绍完了新增、查询的流程,就要介绍删除的流程了,但是很遗憾的是布隆过滤器是很难做到删除数据的,为什么?你想想,比如你要删除刚才给你的数据,你把“5”,“9”,“2”三个格子都改成了0,但是可能其他的数据也映射到了“5”,“9”,“2”三个格子啊,这不就乱套了吗?
相信经过我这么一介绍,大家对布隆过滤器应该有一个浅显的认识了,至少你应该清楚布隆过滤器的优缺点了:
优点:由于存放的不是完整的数据,所以占用的内存很少,而且新增,查询速度够快;
缺点: 随着数据的增加,误判率随之增加;无法做到删除数据;只能判断数据是否一定不存在,而无法判断数据是否一定存在。
可以看到,布隆过滤器的优点和缺点一样明显。
在上文中,我举的例子二进制向量长度为16,由三个随机映射函数计算位置,在实际开发中,如果你要添加大量的数据,仅仅16位是远远不够的,为了让误判率降低,我们还可以用更多的随机映射函数、更长的二进制向量去计算位置。
package com.zxd.Config.CheckDuplicatesUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.util.Hashing;
/**
* @类名 BloomCheck
* @名称 布隆过滤器 查重用
* @功能描述 剃重五个字段的拼接key
* @创建时间 2022/1/26 14:21
* @创建人 ZXD
*/
@Slf4j
@PropertySource(value = "classpath:Config.properties")
@Component
public class BloomCheck {
@Autowired
private Jedis jedis;
//要插入多少数据
public static Integer expectedInsertions ;
//期望的误判率
public static Double fpp ;
//bit数组长度
public static Long numBits;
//hash函数数量
public static Integer numHashFunctions;
/**
* 从配置文件中读取判重数据总数,并且赋值给静态变量
* @param expectedInsertions_number
*/
@Value("${bloom.expectedInsertions_number}")
public void setExpectedInsertions(Integer expectedInsertions_number){
expectedInsertions = expectedInsertions_number;
}
/**
* 从配置文件中读取精度值,并且赋值给静态变量
* @param fpp_number
*/
@Value("${bloom.fpp_number}")
public void setFpp(Double fpp_number){
fpp = fpp_number;
}
/**
* 计算bit数组长度和hash函数数量
*/
@Bean
public void setNumBitsAndNumHashFunctions(){
numBits = optimalNumOfBits(expectedInsertions, fpp);
numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
}
/**
* 查询key是否存在 如果不重复则插入库中
* @param key
* @return true 重复 false 不重复
* 输入key 得到n个下标,遍历下标,当有一个下标不存在时则判定key一定不存在
*/
public boolean getkey(String tableName, String key){
if(null == key) return true;
long[] indexArray = getIndexArray(key);
for(long index : indexArray){
if(!jedis.getbit(tableName+":bloom", index)){
//如果不存在就插入库中
return false;
}
}
log.info("{}已存在!",key);
return true;
}
/**
* 插入判重key
* @param tableName 插入的表名称
* @param key 插入的判重key
*/
public void setKey(String tableName, String key) {
long[] indexArray = getIndexArray(String.valueOf(key));
log.info("插入的key为:{}", key);
for (long index : indexArray) {
jedis.setbit(tableName+":bloom", index, true);
}
}
/**
* 根据key获取bitmap下标
*/
private static long[] getIndexArray(String key) {
long hash1 = Hashing.MURMUR_HASH.hash(key);
long hash2 = hash1 >>> 16;
long[] result = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
long combinedHash = hash1 + i * hash2;
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
result[i] = combinedHash % numBits;
}
return result;
}
//计算hash函数个数
private static int optimalNumOfHashFunctions(long n, long m) {
if(0 == n && 0 == m) System.out.println("boolm关键参数为空");
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
//计算bit数组长度
private static long optimalNumOfBits(long n, double p) {
if(0 == n ) System.out.println("boolm关键参数为空");
if (p == 0) {
p = Double.MIN_VALUE;
}
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}
}