flink实战-使用自定义聚合函数统计网站TP指标

背景

在网站性能测试中,我们经常会选择 TP50、TP95 或者 TP99 等作为性能指标。接下来我们讲讲这些指标的含义、以及在flink中如何实时统计:

  • TP50,top percent 50,即 50% 的数据都满足某一条件;
  • TP95,top percent 95,即 95% 的数据都满足某一条件;
  • TP99,top percent 99,即 99% 的数据都满足某一条件;

我们举一个例子,我们要统计网站一分钟之内的的响应时间的TP90,正常的处理逻辑就是把这一分钟之内所有的网站的响应时间从小到大排序,然后计算出总条数count,然后计算出排名在90%的响应时间是多少(count*0.9),就是我们要的值。

自定义聚合函数

这个需求很明显就是一个使用聚合函数来做的案例,Flink中提供了大量的聚合函数,比如count,max,min等等,但是对于这个需求,却无法满足,所以我们需要自定义一个聚合函数来实现我们的需求。

在前段时间,我们聊了聊flink的聚合算子,具体可参考: flink实战-聊一聊flink中的聚合算子 , 聚合算子是我们在写代码的时候用来实现一个聚合功能,聚合函数其实和聚合算子类似,只不过聚合函数用于在写sql的时候使用。

自定义聚合函数需要继承抽象类org.apache.flink.table.functions.AggregateFunction。并实现下面几个方法。

  • createAccumulator():这个方法会在一次聚合操作的开始调用一次,主要用于构造一个Accumulator,用于存储在聚合过程中的临时对象。
  • accumulate() 这个方法,每来一条数据会调用一次这个方法,我们就在这个方法里实现我们的聚合函数的具体逻辑。
  • getValue() 这个方法是在聚合结束以后,对中间结果做处理,然后将结果返回,最终sql中得到的结果数据就是这个值。

实例讲解

对于TP指标,正常的思路我们可以先创建一个临时变量,里面有一个list,每来一个数据,就放到这个list里面,在getValue方法里,进行排序,取相应的TP值。

但是这种思路会有一个问题,就是如果要聚合的时间范围内,数据过多的话。就会在list存储大量的数据,会造成checkpoint过大,时间过长,最后导致程序失败。得不到正确的结果。

所以我们需要换一个思路,既然最后我们想要的是一个有序列表,那么我们是不是可以把这个list结构优化一下,使用Treemap来存储,map的key就是指标,比如响应时间。value就是对应的指标出现的次数。这样getValue方法里,只需要将map的value值累加,就能得到总数count,然后计算出来相应的tp值的位置position,最后我们再从头累加map的value,直到累加结果大于相应的位置position,则map的key即为所求。

示例如下:
我们先构建一个source,只是随机生成一个变量,网站的相应时间response_time。

    String sql = "CREATE TABLE source (\n" +
                     " response_time INT,\n" +
                     " ts AS localtimestamp,\n" +
                     " WATERMARK FOR ts AS ts," +
                     "proctime as proctime()\n" +
                     ") WITH (\n" +
                     " 'connector' = 'datagen',\n" +
                     " 'rows-per-second'='1000',\n" +
                     " 'fields.response_time.min'='1',\n" +
                     " 'fields.response_time.max'='1000'" +
                     ")";

定义一个聚合函数用的临时变量:

    public static class TPAccum{
        public Integer tp;
        public Map<Integer,Integer> map = new HashMap<>();
    }

实现自定义聚合函数类

    public static class TP extends AggregateFunction<Integer,TPAccum>{

        @Override
        public TPAccum createAccumulator(){
            return new TPAccum();
        }

        @Override
        public Integer getValue(TPAccum acc){
            if (acc.map.size() == 0){
                return null;
            } else {
                Map<Integer,Integer> map = new TreeMap<>(acc.map);
                int sum = map.values().stream().reduce(0, Integer::sum);

                int tp = acc.tp;
                int responseTime = 0;
                int p = 0;
                Double d = sum * (tp / 100D);
                for (Map.Entry<Integer,Integer> entry: map.entrySet()){
                    p += entry.getValue();
                    int position = d.intValue() - 1;
                    if (p >= position){
                        responseTime = entry.getKey();
                        break;
                    }

                }
                return responseTime;
            }
        }

        public void accumulate(TPAccum acc, Integer iValue, Integer tp){
            acc.tp = tp;
            if (acc.map.containsKey(iValue)){
                acc.map.put(iValue, acc.map.get(iValue) + 1);
            } else {
                acc.map.put(iValue, 1);
            }
        }

    }

实际的查询sql如下:

    String sqlSelect =
                "select TUMBLE_START(proctime,INTERVAL '1' SECOND)  as starttime,mytp(response_time,50) from source" +
                " group by TUMBLE(proctime,INTERVAL '1' SECOND)";

完整代码请参考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/UdafTP.java

更多内容,欢迎关注我的公众号【大数据技术与应用实战】

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350