bloom_filter服务
bloom_filter的简介与使用请看这里。
bloom_filter的配置文件conf.ini如下
[filter]
cnt=4
num_bits=4294967296
num_hash_func=7
[server]
pid_file=/var/run/bloom_filter.pid
daemonlize=1
port=8000
cnt表示启动的bloom_filter的数量
num_bits表示位数组大小
cnt* num_bits/(8 * 1024 * 1024 * 1024)即为bloom_filter运行时所占用的内存大小(GB)。
数据插入
提供两种方式:
1、远程http请求。curl http://w21*********.8000/?ks=1,2 -X POST 。这种方式网络开销较大,数据量很大时不推荐。
2、本地文件导入。当启动bloom_filter服务时,会默认读取当前目录下名为new_users的文件(待改进),将每一行当作一条记录插入bloom_filter中。速度可达100,000条/秒。
数据查询
http请求,curl http://w21********:8000/?ks=1,2。
在w6机器上,qps可达4000/s。由于每个http请求中可以带多个参数,因此可以支持每秒上万个记录的查询任务。
storm计算新用户
任务:计算2014年1月1号的游戏级新用户。
启动bloom_filter服务。
在机器上启动bloom_filter服务,并将2014年1月1号前的所有游戏级新用户采用读取本地文件的方式导入bloom_filter中。
编写storm的bolt脚本。
程序从标准输入中,每读取30条记录,将就其拼接成一条curl请求,然后访问bloom_filter服务,根据返回值计算这里面有多少个新用户。
$filter_service = "http://w21*******:8000/?ks=";
$uv_count = 0;
$record_combine = array();
while(($line = fgets(STDIN)) !== false) {
$line = trim($line);
$record = explode("\t", $line);
$id = $record[0];
if(count($record)%3 !== 0) {
echo "$id\tack\n";
fwrite(STDERR, "wrong data".$line."\n");
flush();
continue;
}
for($i = 0; $i<count($record); $i+=3){
$qid = $record[$i+1];
$gkey = $record[$i+2];
$record_combine[] = $qid.$gkey;}
if(count($record_combine)===30){
$request = $filter_service;
$request .= implode(",", $record_combine);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $request);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
$filter_result = curl_exec($ch);
curl_close($ch);
$uv_count +=substr_count($filter_result,"false");
fwrite(STDERR, "uv is ".$uv_count."\n");
unset($record_combine);
}
echo "$id\tack\n"; flush();
}
一次查询多条记录,能够大幅度减少网络压力与耗时。
提交storm任务到测试集群,并发送数据。
运行run.sh脚本提交任务。然后发送异步数据,
cat glogin_daily_2014_1_1.dat | async_send -batch 100 webgame_statistic_49 >/dev/null
这里采用 -batch指令,将日志每300行发送一次。
单个spout的发送数据的速度有限,为30~40万次/小时,若一次只发送一行日志,将大幅度降低storm的吞吐量
误判率
理论误判率计算公式在storm中使用bloom_filter消重服务。
m为位数组大小,k为哈希函数个数,n为记录数。
令m=235,k=7,n=240,000,000。
带入公式计算得到的理论误判率为:5.64e-10。
从程序中查询得到的程序理论误判率为:1.4e-4。
storm运行结果与线上数据比对后,得到程序实际误判率为:1.86e-3。
令m=236,k=7,n=240,000,000。
带入公式计算得到的理论误判率为:4.79e-12。
从程序中查询得到的程序理论误判率为:1.95e-6。
storm运行结果与线上数据比对后,得到程序实际误判率为:9.3e-4。
程序实际误判率比理论值大
结果比对
1、将storm任务运行时中bloom_fliter的返回结果(json)保存成日志
2、去掉json中的花括号{}。
cat log | awk '/"*"/ {print $0}' >process_log
3、找出返回结果为true的记录,并去掉引号""。
cat process_log | awk -F ":" '{gsub(/\"/, ""); if(match($2,/true/)) print $1}' >process_log_true
4、去掉记录的空格
sed 's/ //g' process_log_true > process_log_final
5、从hadoop中导出2014年1月1号的新用户的记录,并存成文本。
hive -e
"SELECT gp.qid, gp.gkey FROM
(SELECT distinct qid, gkey FROM glogin_daily WHERE year!=2014) np
RIGHT OUTER JOIN
(SELECT distinct qid, gkey FROM glogin_daily where year=2014 and month = 1 and day = 1) gp
ON np.qid = gp.qid AND np.gkey = gp.gkey
WHERE np.qid IS NOT NULL and np.gkey IS NOT NULL and gp.qid IS NOT NULL and gp.gkey IS NOT NULL;"
> exists_game_2014_01_01.dat
6、比对storm与hadoop中的记录。发现确实是bloom_filter误判了。
例如记录为"101326892zwj",并没有在以往的日志中出现过,确实是新用户,但bloom_filter中却认为是已存在的用户,返回结果为true。
注意的问题
1、bloom_filter的误判率,与占用的总内存有关。就是说,存储2.4亿数据,使用4个512M的bloom_filter,与使用1个2G的bloom_filter的误判率是一样的。从公式也可以看出来。
2、storm集群若与bloom_filter的服务器跨机房,则会因为网络延迟,导致bolt中的数据没有被及时处理而堵塞。导致spout经常性失败 。
3、为防止iptable的设置导致bloom_filter不可用,可以开放8000端口。 sudo iptables -I INPUT -p tcp --dport 8000 -j ACCEPT
4、hadoop中的日志,不能只看分区,由于hydra任务的堆积,会使今天的部分日志记录到明天的分区中。因此需要指定year,month,day来导出日志