统计X小时内重复的异常操作

    现在在接手一个BI分析项目,最近呢有一个需求,要提供2小时以内,同一客户对同一功能重复操作十次的日志统计,我们在查询相关的服务类上添加了aop,获取用户名称,ip,操作时间,操作的报表等等信息,因为这个需求偏向于审计或者内审需求,并没有要求完成报表的设计,只需要提供异常的日志,所以,实现的方式就比较的灵活了,在我寻思咋去做这件事情的时候,我们的业务小哥,要去了完整的统计信息,EXCEL大法好,花了半小时给整出来了,汗颜,大体意思上,先按照小时进行统计,后按照相邻小时累加统计,但每个月都要手动整理,也迫使我去完成了这部分的需求(罗列几种方式)。


sql直接完成

    因为我们是将这种操作信息存在了数据表中,第一反应就是直接写sql(基于mysql的sql)

SELECT
 rfcl1.OP_NAME,
 rfcl1.USER_NAME,
 rfcl1.UNIT_CODE,
 rfcl1.SYSTEM_CODE,
 count(*)
FROM
fr_cpt_log rfcl1,
fr_cpt_log rfcl2
where 
 rfcl1.ip=rfcl2.ip and rfcl1.OP_NAME = rfcl2.OP_NAME
 and ADDTIME(rfcl1.OP_DATE,'2:0:0')>=rfcl2.OP_DATE
 and rfcl1.id<rfcl2.id
 group by  
 rfcl1.OP_NAME,
 rfcl1.USER_NAME,
 rfcl1.UNIT_CODE,
 rfcl1.SYSTEM_CODE
 having count(*)>=10

    sql的核心处理方式,使用自连接的方式以ip和操作名称为限制条件进行累加计算,但这种自连接的累计方式有很大的问题,我当时是在测试环境测试并没有发现问题,操作的日志只有几w左右,但实际在生产环境进行的时候,发现跑不出来,不按月切割的情况下报表但操作记录大概有几百万行,而直接进行自连接做笛卡尔积的话,这个数据量可以上hadoop了,看了下,大概数据量有个小十万就不用考虑这种方式去进行了。
    当然去按照时间进行切分,多加一些中间临时表去做这些事情肯定也是能搞定了,这里就不多说了。


用缓存计数

    这个方案偏向于实时累加计算处理了,之前呢为了做限流操作,写了个在redis中缓存同一ip操作同一个功能一分钟超过5次就进行消息提醒的东西(没办法BI渲染太耗cpu),这里涉及线上代码就不贴了,但实现但方式是相似但,以IP+OP_NAME为key,进行累加,TTL给个2小时,在aop里获取超过10次就刷到异常表里就ok了。
    这里就是个redis的简单使用,但是呢,说白了这种日志统计功能没必要有这么强的实时性,或者说去消耗缓存和服务器资源,所以并没去在服务器上做这些事情,动生产环境的代码和生产环境加表过于麻烦了。


拉取日志文件,java解决

    因为获取异常信息,整个过程可能不仅仅是获取,还需要考虑的是呈现的方式,所以这里完成了一个获取生成excel的逻辑,因为查询人员的组织不同,按照组织信息将操作的异常记录进行切分
具体的实现步骤为:
1.文件切割


1576671859974.jpg

我这边将查询的日志进行规整,用户名,查询时间,查询操作,查询的对象4列,ip因为内部跳转原因获取的不对,这里不做显示。
这里我们将文件做切割处理(应对巨量的log文件),以用户名+查询对象做hash计算,这里假定分布的文件为10个,那么我们输入输出流的形式读取每一行,以每一行除10取模,以这个模为新文件名创建文件,相同的操作+用户名必定分配到同一文件,这样哪小文件进行增量的文件计算可避免oom

 private void preProcess() throws IOException {
        BufferedInputStream fis = null;
        BufferedReader reader = null;  
        try{
            //Path newfile = FileSystems.getDefault().getPath(filename);
            fis = new BufferedInputStream(new FileInputStream(new File(filename)));
            // 用5M的缓冲读取文本文件
            reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),_5M);

            //假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断
            //如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时,
            //当来访的ip地址足够多的情况下,内存开销吃不消
            //存放ip的hashcode->accessTimes集合
            Map<String, List<String>> hashcodeMap = new HashMap<String,List<String>>();
            String line = "";
            int count = 0;
            while((line = reader.readLine()) != null){
                line=line.replaceAll("\t","");
                String split[] = line.split(delimiter);
                String reportName;
                String username;
                if(split != null && split.length >= 2){
                    //根据username的hashcode这样拆分文件,拆分后的文件大小在1G上下波动
                    //ip+操作内容 取哈希
                    username = split[0].replaceAll("\"","");
                    reportName = split[3].replaceAll("\"","");
                    /*magic 为拆分的细粒度*/
                    int serial = (username+reportName).hashCode() % MAGIC;

                    String splitFilename = FILE_PRE + serial;
                    List<String> lines = hashcodeMap.get(splitFilename);
                    if(lines == null){
                        lines = new ArrayList<String>();

                        hashcodeMap.put(splitFilename, lines);
                    }
                    lines.add(line);
                }

                count ++;
                if(count > 0 && count % BATCH_MAGIC == 0){
                    //每1000行刷一次文件
                    for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){
                        //System.out.println(entry.getKey()+"--->"+entry.getValue());
                        //key是hashcode value是本行的字符串
                        DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));
                    }
                    //一次操作1000之后清空,重新执行
                    hashcodeMap.clear();
                }
            }
        }finally {
            reader.close();
            fis.close();
        }
    }

文件append方法

    /**
     * 根据给出的数据,往给定的文件形参中追加一行或者几行数据
     *
     * @param splitFilename
     * @throws IOException
     */
    public static Path appendFile(String splitFilename,
                                  Iterable<? extends CharSequence> accessTimes, Charset cs) throws IOException {
        if(accessTimes != null){
            Path target = Paths.get(splitFilename);
            File file = new File(splitFilename);
            if(!file.exists()){
                createFile(splitFilename);
            }
            return Files.write(target, accessTimes, cs, StandardOpenOption.APPEND);
        }

        return null;
    }

2.读取制定文件目录下的小文件,累计2小时内的重复操作


1576673347588.jpg
   /**
    * 递归执行,将2小时内访问超过阈值的ip找出来
    *
    * @param parent
    * @return
    * @throws IOException
    */
   private void recurseFile(Path parent, Map<String,List<Date>> resMap) throws IOException{
       //Path target = Paths.get(dir);
       if(!Files.exists(parent) || !Files.isDirectory(parent)){
           return;
       }
       List<File> fileList= Arrays.asList(new File(root).listFiles());
       for(File file:fileList){
           if(file.getName().startsWith(FILE_PRE)){
               List<String> lines = Files.readAllLines(file.toPath(), Charset.forName("UTF-8"));
               judgeAndcollection(lines,resMap);
/*
               这里的resMap可以里立即处理了,没必要一直进行迭代,结果集还是全量的,这里增量的append到excel中
*/
               if(!CollectionUtils.isEmpty(resMap)){
                   Map<String,List<OperationInfo>> operationInfoMap = new HashMap<>();
                   transerToObject(resMap,operationInfoMap);
                   if(!CollectionUtils.isEmpty(operationInfoMap)){
                       operationInfoMap.entrySet().forEach(stringEntry -> {
                           String fileName = result + stringEntry.getKey()+"异常操作日志.xlsx";
                           File fileCheck = new File(fileName);
                           if(fileCheck.exists()){
                               try {
                                   EasyexcelUtils.addExcel(fileName,stringEntry.getValue());
                               } catch (IOException e) {
                                   e.printStackTrace();
                               } catch (InvalidFormatException e) {
                                   e.printStackTrace();
                               }
                           }else{
                               EasyExcel.write(fileName, OperationInfo.class)
                                        .sheet(stringEntry.getKey()).doWrite(stringEntry.getValue());
                           }
                       });
                   }
                   resMap.clear();
               }
           }
       }
   }

此处的核心处理逻辑就是在遍历小文件的时候,判断并收集超过操作阈值的对象

    /**
     * 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip
     * 并放入resMap
     *
     * @param lines
     * @param resMap
     */
    private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
        if(lines != null){
            //ip->List<String>accessTimes
            Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
            for(String line : lines){
                line = line.trim();
                line=line.replaceAll("\t","");
                String split[] = line.split(delimiter);

                String userName =split[0];
                String opt =split[3].replaceAll("\"","");
                List<String> accessTimes = judgeMap.get(userName+"#"+opt);
                if(accessTimes == null){
                    accessTimes = new ArrayList<String>();
                }
                accessTimes.add(split[1]);
                judgeMap.put(userName+"#"+opt, accessTimes);
            }

            if(judgeMap.size() == 0){
                return;
            }

            for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
                List<String> acessTimes = entry.getValue();
                //相同ip,先判断整体大于10个
                if(acessTimes != null && acessTimes.size() >= 10){
                    //开始判断在List集合中,120分钟内访问超过MAGIC=10
                    List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 120 * 60 * 1000, 10);
                    if(attackTimes != null){
                        resMap.put(entry.getKey(), attackTimes);
                    }
                }
            }
        }
    }

将小文件的行进行格式化为map<user_name+opt,op_time>
再在hashmap中进行遍历汇总

    public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
        if(dateStrs == null || dateStrs.size() < magic){
            return null;
        }

        List<Date> dates = new ArrayList<Date>();
        for(String date : dateStrs){
            if(date != null && !"".equals(date))
                dates.add(stringToDate(date,"dd/MM/yyyy hh:mm:ss"));
        }

        Collections.sort(dates);
        return attackTimes(dates,intervalDate,magic);
    }

这里对日期格式进行了转换

    /**
     * 判断在间隔时间内,是否有大于magic的上限的数据集合,
     * 如果有,则返回满足条件的集合
     * 如果找不到满足条件的,就返回null
     *
     * @param sequenceDates 已经按照时间顺序排序了的数组
     * @param intervalDate
     * @param magic
     * @return
     */
    public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
        if(sequenceDates == null || sequenceDates.size() < magic){
            return null;
        }

        List<Date> res = new ArrayList<Date>();
        for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
            Date souceDate = sequenceDates.get(x);
            Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
            res.add(souceDate);
            for(int i = x + 1;i< sequenceDates.size();i++){
                Date compareDate = sequenceDates.get(i);
                if(compareDate.before(dateAfter5)){
                    res.add(compareDate);
                }else
                    break;
            }
            if(res.size() >= magic)
                return res;
            else
                res.clear();
        }
        return null;
    }

然后就是个循环累计的过程

3.拿到获取信息进行文件渲染


1576673147212.jpg

利用hadoop进行处理

手头现在刚好有个大数据集群,目前这部分数据的实时性不高,可以利用ETL拉到hive里跑,目前没有需求去做,准备做。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容