一、Maven项目的pom.xml
<properties>
<flink.version>1.9.1</flink.version>
</properties>
<!--引入flink依赖-->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
二、测试数据
LonginLog.csv
5402,83.149.11.115,success,1558430815
23064,66.249.3.15,fail,1558430826
5692,80.149.25.29,fail,1558430833
7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
29607,66.249.73.135,success,1558430841
1035,83.149.9.216,fail,1558430842
1035,83.149.9.216,fail,1558430843
1035,83.149.24.26,fail,1558430844
7328,193.114.45.13,success,1558430848
29607,66.249.73.135,success,1558430847
2133,50.16.19.13,success,1558430857
6745,66.249.73.185,success,1558430859
76456,110.136.166.128,success,1558430853
8345,46.105.14.53,success,1558430855
76456,110.136.166.128,success,1558430857
76456,110.136.166.128,success,1558430854
76456,110.136.166.128,fail,1558430859
76456,110.136.166.128,success,1558430861
3464,123.125.71.35,success,1558430860
76456,110.136.166.128,success,1558430865
65322,50.150.204.184,success,1558430866
23565,207.241.237.225,fail,1558430862
8455,200.49.190.101,success,1558430867
8455,200.49.190.100,success,1558430865
8455,200.49.190.101,success,1558430869
8455,200.49.190.101,success,1558430872
32031,66.249.73.185,success,1558430875
12018,66.249.73.135,success,1558430874
12018,66.249.73.135,success,1558430879
12018,66.249.73.135,success,1558430881
21419,67.214.178.190,success,1558430882
21419,67.214.178.190,success,1558430880
23565,207.241.237.220,success,1558430881
2386,46.105.14.53,success,1558430883
23565,207.241.237.227,success,1558430884
83419,91.177.205.119,success,1558430881
83419,91.177.205.119,fail,1558430882
83419,91.177.205.119,success,1558430885
83419,91.177.205.119,fail,1558430886
83419,91.177.205.119,success,1558430884
83419,91.177.205.119,success,1558430886
4325,26.249.73.15,success,1558430888
2123,207.241.237.228,success,1558430887
21083,207.241.237.101,success,1558430889
13490,87.169.99.232,success,1558430886
93765,209.85.238.199,success,1558430890
93765,209.85.238.199,success,1558430892
三、Flink 恶意登录 Java版本
/*
* flink-cep实现恶意登录
* */
package com.cn.Logindetect;
import com.cn.Logindetect.beans.LoginEvent;
import com.cn.Logindetect.beans.LoginFailWarning;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.net.URL;
import java.util.List;
import java.util.Map;
public class LoginFail {
public static void main(String[] args) throws Exception {
// 设置流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(2);
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 读数据时间延迟,并处理乱序时间戳
// Time.seconds() 延迟的时间
URL resource = LoginFail.class.getResource("/LoginLog.csv");
DataStream<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 定义匹配模
// 匹配连续3次登录失败
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("failEvents").where(
new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
}
).times(3).consecutive()
.within(Time.seconds(5));
// 匹配,我们要对同一个userid进行检测
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(
new PatternSelectFunction<LoginEvent, LoginFailWarning>() {
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
LoginEvent firstFailEvent = pattern.get("failEvents").get(0);
LoginEvent lastFailEvent = pattern.get("failEvents").get(pattern.get("failEvents").size() - 1);
return new LoginFailWarning(firstFailEvent.getUserId(), firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp(), "login fail 3 times");
}
}
);
// 数据流打印至控制台
//loginEventStream.print();
warningStream.print();
env.execute("login fail detectwith cep job");
}
}