flink框架开发5大步
[基于 DataStream API 实现欺诈检测](https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/try-flink/datastream/)
## 1. 设置执行环境
## 2. 设置数据源,取得数据流
## 3.分区、开窗、计算逻辑
## 4. 给数据流设置下家sink
## 5. 显式声明启动执行
```
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
//1、设置执行环境 env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、添加数据源 source
DataStream<Transaction> transactions = env
// 通常数据源从外部系统例如 Apache Kafka、Rabbit MQ 或者 Apache Pulsar 接收数据,然后将数据送到 Flink 程序中
// 这里TransactionSource是一个无限循环生成信用卡模拟交易数据的数据源
// 每条交易数据包括了信用卡 ID (accountId),交易发生的时间 (timestamp) 以及交易的金额(amount)
.addSource(new TransactionSource())
// 设置env的name属性
.name("transactions");
//3、添加处理逻辑 process
DataStream<Alert> alerts = transactions
// 对数据流进行按id分区
// transactions 这个数据流包含了大量的用户交易数据,需要被划分到多个并发上进行欺诈检测处理。由于欺诈行为的发生是基于某一个账户的,所以,必须要要保证同一个账户的所有交易行为数据要被同一个并发的 task 进行处理。
.keyBy(Transaction::getAccountId)
// 给分区后的数据流处理绑定一个处理逻辑FraudDetector
.process(new FraudDetector())
.name("fraud-detector");
//4、给数据流添加 sink
// sink 会将 DataStream 写出到外部系统,例如 Apache Kafka、Cassandra 或者 AWS Kinesis 等。
// AlertSink 使用 INFO 的日志级别打印每一个 Alert 的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。
alerts
.addSink(new AlertSink())
.name("send-alerts");
//5、显式声明启动执行
env.execute("Fraud Detection");
}
}
```
## 核心处理逻辑对象
```
/**
* Skeleton code for implementing a fraud detector.
*/
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
//定义的小额交易的最大值
private static final double SMALL_AMOUNT = 1.00;
//定义的大额交易的最小值
private static final double LARGE_AMOUNT = 500.00;
//定义的要做欺诈检测的两次交易时间间隔
private static final long ONE_MINUTE = 60 * 1000;
// ValueState 是一个包装类,类似于 Java 标准库里边的 AtomicReference 和 AtomicLong
//声明一个瞬态(不会序列化)的记录检测内容的状态变量
private transient ValueState<Boolean> flagState;
//声明一个瞬态(不会序列化)的记录定时器的状态变量
private transient ValueState<Long> timerState ;
/**
* (状态需要使用 open() 函数来注册状态。)
* 注册声明是否为小额的标记状态
* 注册声明记录定时器的状态
*
* @param parameters
*/
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerFlagDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);
timerState = getRuntimeContext().getState(timerFlagDescriptor);
}
/**
* 欺诈检测核心处理方法
* 会在数据流的每一个数据对象上执行如下业务逻辑
* @param transaction
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
// 获取当前key账号下的交易数额类型状态是否是小额
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT ) {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}
//使用过valueStatus后清空,以便下次使用
// Clean up our state
flagState.clear();
//如果这次交易额是小额,将状态标记为true,为下一笔到来的交易检测做准备
if (transaction.getAmount() < SMALL_AMOUNT) {
flagState.update(true);
// set the timer and timer state
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
//注册定时器到context
context.timerService().registerProcessingTimeTimer(timer);
//定时器状态变量记录定时时间
timerState.update(timer);
}
}
/**
* 注册的定时器到时会执行的回调方法
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
// remove flag after 1 minute
timerState.clear();
//移除前一个交易是小额交易的标记
flagState.clear();
}
/**
* 如果要取消定时器,你需要删除已经注册的定时器,并同时清空保存定时器的状态。
* 你可以把这些逻辑封装到一个助手函数中,而不是直接调用 flagState.clear()。
* @param ctx
* @throws Exception
*/
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
}
```
动态规划一·基本概念2022-11-25