现在在接手一个BI分析项目,最近呢有一个需求,要提供2小时以内,同一客户对同一功能重复操作十次的日志统计,我们在查询相关的服务类上添加了aop,获取用户名称,ip,操作时间,操作的报表等等信息,因为这个需求偏向于审计或者内审需求,并没有要求完成报表的设计,只需要提供异常的日志,所以,实现的方式就比较的灵活了,在我寻思咋去做这件事情的时候,我们的业务小哥,要去了完整的统计信息,EXCEL大法好,花了半小时给整出来了,汗颜,大体意思上,先按照小时进行统计,后按照相邻小时累加统计,但每个月都要手动整理,也迫使我去完成了这部分的需求(罗列几种方式)。
sql直接完成
因为我们是将这种操作信息存在了数据表中,第一反应就是直接写sql(基于mysql的sql)
SELECT
rfcl1.OP_NAME,
rfcl1.USER_NAME,
rfcl1.UNIT_CODE,
rfcl1.SYSTEM_CODE,
count(*)
FROM
fr_cpt_log rfcl1,
fr_cpt_log rfcl2
where
rfcl1.ip=rfcl2.ip and rfcl1.OP_NAME = rfcl2.OP_NAME
and ADDTIME(rfcl1.OP_DATE,'2:0:0')>=rfcl2.OP_DATE
and rfcl1.id<rfcl2.id
group by
rfcl1.OP_NAME,
rfcl1.USER_NAME,
rfcl1.UNIT_CODE,
rfcl1.SYSTEM_CODE
having count(*)>=10
sql的核心处理方式,使用自连接的方式以ip和操作名称为限制条件进行累加计算,但这种自连接的累计方式有很大的问题,我当时是在测试环境测试并没有发现问题,操作的日志只有几w左右,但实际在生产环境进行的时候,发现跑不出来,不按月切割的情况下报表但操作记录大概有几百万行,而直接进行自连接做笛卡尔积的话,这个数据量可以上hadoop了,看了下,大概数据量有个小十万就不用考虑这种方式去进行了。
当然去按照时间进行切分,多加一些中间临时表去做这些事情肯定也是能搞定了,这里就不多说了。
用缓存计数
这个方案偏向于实时累加计算处理了,之前呢为了做限流操作,写了个在redis中缓存同一ip操作同一个功能一分钟超过5次就进行消息提醒的东西(没办法BI渲染太耗cpu),这里涉及线上代码就不贴了,但实现但方式是相似但,以IP+OP_NAME为key,进行累加,TTL给个2小时,在aop里获取超过10次就刷到异常表里就ok了。
这里就是个redis的简单使用,但是呢,说白了这种日志统计功能没必要有这么强的实时性,或者说去消耗缓存和服务器资源,所以并没去在服务器上做这些事情,动生产环境的代码和生产环境加表过于麻烦了。
拉取日志文件,java解决
因为获取异常信息,整个过程可能不仅仅是获取,还需要考虑的是呈现的方式,所以这里完成了一个获取生成excel的逻辑,因为查询人员的组织不同,按照组织信息将操作的异常记录进行切分
具体的实现步骤为:
1.文件切割
我这边将查询的日志进行规整,用户名,查询时间,查询操作,查询的对象4列,ip因为内部跳转原因获取的不对,这里不做显示。
这里我们将文件做切割处理(应对巨量的log文件),以用户名+查询对象做hash计算,这里假定分布的文件为10个,那么我们输入输出流的形式读取每一行,以每一行除10取模,以这个模为新文件名创建文件,相同的操作+用户名必定分配到同一文件,这样哪小文件进行增量的文件计算可避免oom
private void preProcess() throws IOException {
BufferedInputStream fis = null;
BufferedReader reader = null;
try{
//Path newfile = FileSystems.getDefault().getPath(filename);
fis = new BufferedInputStream(new FileInputStream(new File(filename)));
// 用5M的缓冲读取文本文件
reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),_5M);
//假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断
//如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时,
//当来访的ip地址足够多的情况下,内存开销吃不消
//存放ip的hashcode->accessTimes集合
Map<String, List<String>> hashcodeMap = new HashMap<String,List<String>>();
String line = "";
int count = 0;
while((line = reader.readLine()) != null){
line=line.replaceAll("\t","");
String split[] = line.split(delimiter);
String reportName;
String username;
if(split != null && split.length >= 2){
//根据username的hashcode这样拆分文件,拆分后的文件大小在1G上下波动
//ip+操作内容 取哈希
username = split[0].replaceAll("\"","");
reportName = split[3].replaceAll("\"","");
/*magic 为拆分的细粒度*/
int serial = (username+reportName).hashCode() % MAGIC;
String splitFilename = FILE_PRE + serial;
List<String> lines = hashcodeMap.get(splitFilename);
if(lines == null){
lines = new ArrayList<String>();
hashcodeMap.put(splitFilename, lines);
}
lines.add(line);
}
count ++;
if(count > 0 && count % BATCH_MAGIC == 0){
//每1000行刷一次文件
for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){
//System.out.println(entry.getKey()+"--->"+entry.getValue());
//key是hashcode value是本行的字符串
DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));
}
//一次操作1000之后清空,重新执行
hashcodeMap.clear();
}
}
}finally {
reader.close();
fis.close();
}
}
文件append方法
/**
* 根据给出的数据,往给定的文件形参中追加一行或者几行数据
*
* @param splitFilename
* @throws IOException
*/
public static Path appendFile(String splitFilename,
Iterable<? extends CharSequence> accessTimes, Charset cs) throws IOException {
if(accessTimes != null){
Path target = Paths.get(splitFilename);
File file = new File(splitFilename);
if(!file.exists()){
createFile(splitFilename);
}
return Files.write(target, accessTimes, cs, StandardOpenOption.APPEND);
}
return null;
}
2.读取制定文件目录下的小文件,累计2小时内的重复操作
/**
* 递归执行,将2小时内访问超过阈值的ip找出来
*
* @param parent
* @return
* @throws IOException
*/
private void recurseFile(Path parent, Map<String,List<Date>> resMap) throws IOException{
//Path target = Paths.get(dir);
if(!Files.exists(parent) || !Files.isDirectory(parent)){
return;
}
List<File> fileList= Arrays.asList(new File(root).listFiles());
for(File file:fileList){
if(file.getName().startsWith(FILE_PRE)){
List<String> lines = Files.readAllLines(file.toPath(), Charset.forName("UTF-8"));
judgeAndcollection(lines,resMap);
/*
这里的resMap可以里立即处理了,没必要一直进行迭代,结果集还是全量的,这里增量的append到excel中
*/
if(!CollectionUtils.isEmpty(resMap)){
Map<String,List<OperationInfo>> operationInfoMap = new HashMap<>();
transerToObject(resMap,operationInfoMap);
if(!CollectionUtils.isEmpty(operationInfoMap)){
operationInfoMap.entrySet().forEach(stringEntry -> {
String fileName = result + stringEntry.getKey()+"异常操作日志.xlsx";
File fileCheck = new File(fileName);
if(fileCheck.exists()){
try {
EasyexcelUtils.addExcel(fileName,stringEntry.getValue());
} catch (IOException e) {
e.printStackTrace();
} catch (InvalidFormatException e) {
e.printStackTrace();
}
}else{
EasyExcel.write(fileName, OperationInfo.class)
.sheet(stringEntry.getKey()).doWrite(stringEntry.getValue());
}
});
}
resMap.clear();
}
}
}
}
此处的核心处理逻辑就是在遍历小文件的时候,判断并收集超过操作阈值的对象
/**
* 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip
* 并放入resMap
*
* @param lines
* @param resMap
*/
private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
if(lines != null){
//ip->List<String>accessTimes
Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
for(String line : lines){
line = line.trim();
line=line.replaceAll("\t","");
String split[] = line.split(delimiter);
String userName =split[0];
String opt =split[3].replaceAll("\"","");
List<String> accessTimes = judgeMap.get(userName+"#"+opt);
if(accessTimes == null){
accessTimes = new ArrayList<String>();
}
accessTimes.add(split[1]);
judgeMap.put(userName+"#"+opt, accessTimes);
}
if(judgeMap.size() == 0){
return;
}
for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
List<String> acessTimes = entry.getValue();
//相同ip,先判断整体大于10个
if(acessTimes != null && acessTimes.size() >= 10){
//开始判断在List集合中,120分钟内访问超过MAGIC=10
List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 120 * 60 * 1000, 10);
if(attackTimes != null){
resMap.put(entry.getKey(), attackTimes);
}
}
}
}
}
将小文件的行进行格式化为map<user_name+opt,op_time>
再在hashmap中进行遍历汇总
public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
if(dateStrs == null || dateStrs.size() < magic){
return null;
}
List<Date> dates = new ArrayList<Date>();
for(String date : dateStrs){
if(date != null && !"".equals(date))
dates.add(stringToDate(date,"dd/MM/yyyy hh:mm:ss"));
}
Collections.sort(dates);
return attackTimes(dates,intervalDate,magic);
}
这里对日期格式进行了转换
/**
* 判断在间隔时间内,是否有大于magic的上限的数据集合,
* 如果有,则返回满足条件的集合
* 如果找不到满足条件的,就返回null
*
* @param sequenceDates 已经按照时间顺序排序了的数组
* @param intervalDate
* @param magic
* @return
*/
public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
if(sequenceDates == null || sequenceDates.size() < magic){
return null;
}
List<Date> res = new ArrayList<Date>();
for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
Date souceDate = sequenceDates.get(x);
Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
res.add(souceDate);
for(int i = x + 1;i< sequenceDates.size();i++){
Date compareDate = sequenceDates.get(i);
if(compareDate.before(dateAfter5)){
res.add(compareDate);
}else
break;
}
if(res.size() >= magic)
return res;
else
res.clear();
}
return null;
}
然后就是个循环累计的过程
3.拿到获取信息进行文件渲染
利用hadoop进行处理
手头现在刚好有个大数据集群,目前这部分数据的实时性不高,可以利用ETL拉到hive里跑,目前没有需求去做,准备做。