flink框架开发5大步

flink框架开发5大步

[基于 DataStream API 实现欺诈检测](https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/try-flink/datastream/)

## 1. 设置执行环境

## 2. 设置数据源,取得数据流

## 3.分区、开窗、计算逻辑

## 4. 给数据流设置下家sink

## 5. 显式声明启动执行

```

public class FraudDetectionJob {

public static void main(String[] args) throws Exception {

//1、设置执行环境 env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2、添加数据源 source

DataStream<Transaction> transactions = env

// 通常数据源从外部系统例如 Apache Kafka、Rabbit MQ 或者 Apache Pulsar 接收数据,然后将数据送到 Flink 程序中

// 这里TransactionSource是一个无限循环生成信用卡模拟交易数据的数据源

// 每条交易数据包括了信用卡 ID (accountId),交易发生的时间 (timestamp) 以及交易的金额(amount)

.addSource(new TransactionSource())

// 设置env的name属性

.name("transactions");

//3、添加处理逻辑 process

DataStream<Alert> alerts = transactions

// 对数据流进行按id分区

// transactions 这个数据流包含了大量的用户交易数据,需要被划分到多个并发上进行欺诈检测处理。由于欺诈行为的发生是基于某一个账户的,所以,必须要要保证同一个账户的所有交易行为数据要被同一个并发的 task 进行处理。

.keyBy(Transaction::getAccountId)

// 给分区后的数据流处理绑定一个处理逻辑FraudDetector

.process(new FraudDetector())

.name("fraud-detector");

//4、给数据流添加 sink

// sink 会将 DataStream 写出到外部系统,例如 Apache Kafka、Cassandra 或者 AWS Kinesis 等。

// AlertSink 使用 INFO 的日志级别打印每一个 Alert 的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。

alerts

.addSink(new AlertSink())

.name("send-alerts");

//5、显式声明启动执行

env.execute("Fraud Detection");

}

}

```

## 核心处理逻辑对象

```

/**

* Skeleton code for implementing a fraud detector.

*/

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    //定义的小额交易的最大值

    private static final double SMALL_AMOUNT = 1.00;

    //定义的大额交易的最小值

    private static final double LARGE_AMOUNT = 500.00;

    //定义的要做欺诈检测的两次交易时间间隔

    private static final long ONE_MINUTE = 60 * 1000;

    // ValueState 是一个包装类,类似于 Java 标准库里边的 AtomicReference 和 AtomicLong

    //声明一个瞬态(不会序列化)的记录检测内容的状态变量

    private transient ValueState<Boolean> flagState;

    //声明一个瞬态(不会序列化)的记录定时器的状态变量

    private transient ValueState<Long> timerState ;

    /**

    * (状态需要使用 open() 函数来注册状态。)

    * 注册声明是否为小额的标记状态

    * 注册声明记录定时器的状态

    *

    * @param parameters

    */

    @Override

    public void open(Configuration parameters) {

        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);

        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerFlagDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);

        timerState  = getRuntimeContext().getState(timerFlagDescriptor);

    }

    /**

    * 欺诈检测核心处理方法

    * 会在数据流的每一个数据对象上执行如下业务逻辑

    * @param transaction

    * @param context

    * @param collector

    * @throws Exception

    */

    @Override

    public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {

        // 获取当前key账号下的交易数额类型状态是否是小额

        // Get the current state for the current key

        Boolean lastTransactionWasSmall = flagState.value();

        if (lastTransactionWasSmall != null) {

            if (transaction.getAmount() > LARGE_AMOUNT ) {

                Alert alert = new Alert();

                alert.setId(transaction.getAccountId());

                collector.collect(alert);

            }

        }

        //使用过valueStatus后清空,以便下次使用

        // Clean up our state

        flagState.clear();

        //如果这次交易额是小额,将状态标记为true,为下一笔到来的交易检测做准备

        if (transaction.getAmount() < SMALL_AMOUNT) {

            flagState.update(true);

            // set the timer and timer state

            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;

            //注册定时器到context

            context.timerService().registerProcessingTimeTimer(timer);

            //定时器状态变量记录定时时间

            timerState.update(timer);

        }

    }

    /**

    * 注册的定时器到时会执行的回调方法

    * @param timestamp

    * @param ctx

    * @param out

    * @throws Exception

    */

    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {

        // remove flag after 1 minute

        timerState.clear();

        //移除前一个交易是小额交易的标记

        flagState.clear();

    }

    /**

    * 如果要取消定时器,你需要删除已经注册的定时器,并同时清空保存定时器的状态。

    * 你可以把这些逻辑封装到一个助手函数中,而不是直接调用 flagState.clear()。

    * @param ctx

    * @throws Exception

    */

    private void cleanUp(Context ctx) throws Exception {

        // delete timer

        Long timer = timerState.value();

        ctx.timerService().deleteProcessingTimeTimer(timer);

        // clean up all state

        timerState.clear();

        flagState.clear();

    }

}

```

动态规划一·基本概念2022-11-25

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容