1.Flink简介:
Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源的有状态的流处理框架”。
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
1.1 Flink的重要特点
1.1.1 事件驱动型(Event-driven)
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以kafka为代表的消息队列几乎都是事件驱动型应用。(Flink的计算也是事件驱动型)
1.1.2 流与批的世界观
批处理的特点是有界、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
无界数据流:
无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。
有界数据流:
有界数据流有明确定义的 开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。
2 Flink流处理核心编程
和其他所有的计算框架一样,Flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分
2.1 Environment
Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单
// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流式数据处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.2 Source
Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源(Source)。
准备工作
- 导入注解工具依赖, 方便生产POJO类
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
- 准备一个WaterSensor类方便演示
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 水位传感器:用于接收水位数据
*
* id:传感器编号
* ts:时间戳
* vc:水位
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
//上面的这三个需要在下载lombok插件,然后插入依赖
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
从Java的集合中读取数据
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;
public class Flink01_Source_Collection {
public static void main(String[] args) throws Exception {
List<WaterSensor> waterSensors = Arrays.asList(
new WaterSensor("ws_001", 1577844001L, 45),
new WaterSensor("ws_002", 1577844015L, 43),
new WaterSensor("ws_003", 1577844020L, 42));
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromCollection(waterSensors)
.print();
env.execute();
}
}
从文件读取数据
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink02_Source_File {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.readTextFile("input")
.print();
env.execute();
}
}
说明:
- 参数可以是目录也可以是文件
- 路径可以是相对路径也可以是绝对路径
- 相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
- 也可以从hdfs目录下读取, 使用路径:hdfs://hadoop102:8020/...., 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
从Socket读取数据
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 9999);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
从Kafka读取数据
添加相应的依赖
<**dependency**> <**groupId**>org.apache.flink</**groupId**> <**artifactId**>flink-connector-kafka_2.12</**artifactId**> <**version**>1.13.0</**version**> </**dependency**>
参考代码
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Flink03_Source_Kafka {
public static void main(String[] args) throws Exception {
// 0.Kafka相关配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
properties.setProperty("group.id", "Flink01_Source_Kafka");
properties.setProperty("auto.offset.reset", "latest");
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties))
.print("kafka source");
env.execute();
}
}
开启kafka生产者,测试消费
kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor
自定义Source
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.
public class Flink01_Source_Custom {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.从自定义的数据源获取数据(自定义Source)
DataStreamSource<WaterSensor> streamSource = env.addSource(new MySource()).setParallelism(2);
streamSource.print();
env.execute();
}
// public static class MySource implements SourceFunction<WaterSensor>{
//如果要设置多并行度,则需要去实现多并行度接口
public static class MySource implements ParallelSourceFunction<WaterSensor>{
private Boolean isRunning = true;
/**
* 生成数据
* */
@Override
public void run(SourceContext<WaterSensor> ctx) throws Exception {
while (isRunning){
ctx.collect(new WaterSensor("sensor"+new Random().nextInt(100),System.currentTimeMillis(),
new Random().nextInt(100)));
Thread.sleep(1000);
}
}
/**
* 取消生成数据
* 一般在run方法中会有个while循环,通过此方法终止while循环
*/
@Override
public void cancel() {
isRunning = false;
}
}
}
自定义 SourceFunction:
1. 实现 SourceFunction相关接口
2. 重写两个方法:
run(): 主要逻辑
cancel(): 停止逻辑
如果希望 Source可以指定并行度,那么就 实现 ParallelSourceFunction 这个接口
2.3 Transform
转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑。
2.3.1 map
作用
将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
参数
lambda表达式或MapFunction实现类
返回
DataStream → DataStream
示例
得到一个新的数据流: 新的流的元素是原来流的元素的平方
匿名内部类对象
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink01_TransForm_Map_Anonymous {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(1, 2, 3, 4, 5)
.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * value;
}
})
.print();
env.execute();
}
}
Lambda表达式表达式
env
.fromElements(1, 2, 3, 4, 5)
.map(ele -> ele * ele)
.print();
静态内部类
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink01_TransForm_Map_StaticClass {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(1, 2, 3, 4, 5)
.map(new MyMapFunction())
.print();
env.execute();
}
public static class MyMapFunction implements MapFunction<Integer, Integer> {
@Override
public Integer map(Integer value) throws Exception {
return value * value;
}
}
}
Rich...Function类
所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink01_TransForm_Map_RichMapFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(5);
env
.fromElements(1, 2, 3, 4, 5)
.map(new MyRichMapFunction()).setParallelism(2)
.print();
env.execute();
}
public static class MyRichMapFunction extends RichMapFunction<Integer, Integer> {
// 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open ... 执行一次");
}
// 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次
@Override
public void close() throws Exception {
System.out.println("close ... 执行一次");
}
@Override
public Integer map(Integer value) throws Exception {
System.out.println("map ... 一个元素执行一次");
return value * value;
}
}
}
- 默认生命周期方法, 初始化方法open(), 在每个并行度上只会被调用一次, 而且先被调用
- 默认生命周期方法, 最后一个方法close(), 做一些清理工作, 在每个并行度上只调用一次, 而且是最后被调用,但读文件时在每个并行度上调用两次。
- 运行时上下文getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态. 开发人员在需要的时候自行调用获取运行时上下文对象.
2.3.2 flatMap
作用
消费一个元素并产生零个或多个元素
参数
FlatMapFunction实现类
返回
DataStream → DataStream
示例
匿名内部类写法
// 新的流存储每个元素的平方和3次方
env
.fromElements(1, 2, 3, 4, 5)
.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
out.collect(value * value);
out.collect(value * value * value);
}
})
.print();
Lambda表达式写法
env
.fromElements(1, 2, 3, 4, 5)
.flatMap((Integer value, Collector<Integer> out) -> {
out.collect(value * value);
out.collect(value * value * value);
}).returns(Types.INT)
.print();
说明: 在使用Lambda表达式表达式的时候, 由于泛型擦除的存在, 在运行的时候无法获取泛型的具体类型, 全部当做Object来处理, 及其低效, 所以Flink要求当参数中有泛型的时候, 必须明确指定泛型的类型.
2.3.3 filter
作用
根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
参数
FlatMapFunction实现类
返回
DataStream → DataStream
示例
匿名内部类写法
// 保留偶数, 舍弃奇数
env
.fromElements(10, 3, 5, 9, 20, 8)
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
})
.print();
Lambda表达式写法
env
.fromElements(10, 3, 5, 9, 20, 8)
.filter(value -> value % 2 == 0)
.print();
2.3.4 keyBy
作用
把流中的数据分到不同的分区中.具有相同key的元素会分到同一个分区中.一个分区中可以有多重不同的key.
在内部是使用的hash分区来实现的.
分组与分区的区别:
分组: 是一个逻辑上的划分,按照key进行区分,经过 keyby,同一个分组的数据肯定会进入同一个分区
分区: 下游算子的一个并行实例(等价于一个slot),同一个分区内,可能有多个分组
参数
Key选择器函数: interface KeySelector<IN, KEY>
注意: 什么值不可以作为KeySelector的Key:
1.没有覆写hashCode方法的POJO, 而是依赖Object的hashCode. 因为这样分组没有任何的意义: 每个元素都会得到一个独立无二的组. 实际情况是:可以运行, 但是分的组没有意义.
2.任何类型的数组
返回
DataStream → KeyedStream
public class Flink05_Transform_KeyBy {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);
//3.使用map将从端口读进来的字符串转为JavaBean
SingleOutputStreamOperator<WaterSensor> map = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(" ");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
//TODO 4.使用keyby将相同id的数据聚和到一块
//第一种方式:使用匿名实现类实现key的选择器
/* KeyedStream<WaterSensor, String> keyedStream = map.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});*/
//第二种方式:定一个类实现KeySelector接口指定key
// KeyedStream<WaterSensor, String> keyedStream = map.keyBy(new MyKey());
//第三种方式:lambda表达式
// KeyedStream<WaterSensor, String> keyedStream = map.keyBy(r -> r.getId());
//第四中方式:通过属性名获取key 常用于pojo
KeyedStream<WaterSensor, Tuple> keyedStream = map.keyBy("id");
map.print("原始数据").setParallelism(2);
keyedStream.print("keyby");
env.execute();
}
public static class MyKey implements KeySelector<WaterSensor,String>{
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
}
}
总结:
第一种方式: 指定 位置索引,只能用于 Tuple 的数据类型
KeyedStream<WaterSensor, Tuple> sensorKS = sensorDS.keyBy(0);
第二种方式:指定 字段名字,适用于 POJO
KeyedStream<WaterSensor, Tuple> sensorKS = sensorDS.keyBy("id");
TODO 第三种方式(推荐):使用 KeySelector
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
2.3.5 shuffle
作用
把流中的元素随机打乱. 对同一个组数据, 每次只需得到的结果都不同.
参数
无
返回
DataStream → DataStream
public class Flink06_Transform_Shuffle {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);
//3.使用map将从端口读进来的字符串转为JavaBean
SingleOutputStreamOperator<WaterSensor> map = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(" ");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
}).setParallelism(2);
//TODO 4.使用shuffle将id随机的分到不同分区中
DataStream<WaterSensor> shuffle = map.shuffle();
map.print("原始数据").setParallelism(2);
shuffle.print("shuffle");
env.execute();
}
}
2.3.6 connect
作用
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
参数
另外一个流
返回
DataStream[A], DataStream[B] -> ConnectedStreams[A,B]
public class Flink07_Transform_Connect {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从元素中获取数据,并创建两条流
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5,6);
DataStreamSource<String> strDStream = env.fromElements("a", "b", "c", "d", "e");
//TODO 3.使用connect连接两条流
ConnectedStreams<Integer, String> connect = integerDataStreamSource.connect(strDStream);
SingleOutputStreamOperator<String> streamOperator = connect.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer integer) throws Exception {
return integer * integer + "";
}
@Override
public String map2(String value) throws Exception {
return value + "aaa";
}
});
streamOperator.print();
env.execute();
}
}
注意:
- 两个流中存储的数据类型可以不同
- 只是机械的合并在一起, 内部仍然是分离的2个流
- 只能2个流进行connect, 不能有第3个参与
2.3.7 union
作用
对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream
public class Flink08_Transform_Union {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从元素中获取数据,并创建两条流
DataStreamSource<String> intDStream = env.fromElements("1", "2", "3", "4", "5", "6");
DataStreamSource<String> strDStream = env.fromElements("a", "b", "c", "d", "e");
//TODO 3.使用union连接两条流
DataStream<String> unionDStream = intDStream.union(strDStream);
unionDStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value+"a";
}
}).print();
env.execute();
}
}
connect与 union 区别:
- union之前两个流的类型必须是一样,connect可以不一样
- connect只能操作两个流,union可以操作多个。
2.3.8 简单滚动聚合算子
常见的滚动聚合算子
sum, min, max, minBy, maxBy
作用
KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
参数
如果流中存储的是POJO或者scala的样例类, 参数使用字段名
如果流中存储的是元组, 参数就是位置(基于0...)
返回
KeyedStream -> SingleOutputStreamOperator
public class Flink09_Transform_Max_MaxBy {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);
//3.使用map将从端口读进来的字符串转为JavaBean
SingleOutputStreamOperator<WaterSensor> map = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(" ");
return new WaterSensor(split[0], Long.parseLong(split[1]),Integer.parseInt(split[2]));
}
});
//4.先对相同的id进行聚合
KeyedStream<WaterSensor, Tuple> keyedStream = map.keyBy("id");
//TODO 5.使用聚和算子
SingleOutputStreamOperator<WaterSensor> max = keyedStream.max("vc");
SingleOutputStreamOperator<WaterSensor> maxBy = keyedStream.maxBy("vc", true);
SingleOutputStreamOperator<WaterSensor> maxByfalse = keyedStream.maxBy("vc", false);
//max.print("max");
//maxBy.print("maxBy");
maxByfalse.print("maxByfalse");
env.execute();
}
}
注意:
滚动聚合算子: 来一条,聚合一条
1、聚合算子在 keyby之后调用,因为这些算子都是属于 KeyedStream里的
2、聚合算子,作用范围,都是分组内。 也就是说,不同分组,要分开算。
3、max、maxBy的区别:
max:取指定字段的当前的最大值,如果有多个字段,其他非比较字段,以第一条为准
maxBy:取指定字段的当前的最大值,如果有多个字段,其他字段以最大值那条数据为准;
如果出现两条数据都是最大值,由第二个参数决定: true => 其他字段取 比较早的值; false => 其他字段,取最新的值
2.3.9 reduce
作用
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值!
参数、
interface ReduceFunction<T>
返回
KeyedStream -> SingleOutputStreamOperator
public class Flink10_Transform_Reduce {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);
//3.使用map将从端口读进来的字符串转为JavaBean
SingleOutputStreamOperator<WaterSensor> map = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(" ");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
KeyedStream<WaterSensor, Tuple> keyedStream = map.keyBy("id");
//TODO 5.使用聚和算子reduce实现max功能
keyedStream.reduce(new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("reducer......");
return new WaterSensor(value1.getId(),value1.getTs(),Math.max(value1.getVc(),value2.getVc()));
}
}).print();
env.execute();
}
}
注意:
1、 一个分组的第一条数据来的时候,不会进入reduce方法。
2、 输入和输出的 数据类型,一定要一样。
2.3.10 process
作用
process算子在Flink算是一个比较底层的算子,很多类型的流上都可以调用,可以从流中获取更多的信息(不仅仅数据本身)
public class Flink11_Transform_Process {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);
//TODO 3.使用process将数据转为JavaBean
SingleOutputStreamOperator<WaterSensor> streamOperator = streamSource.process(new ProcessFunction<String, WaterSensor>() {
@Override
public void processElement(String value, Context ctx, Collector<WaterSensor> out) throws Exception {
String[] split = value.split(" ");
out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
}
});
KeyedStream<WaterSensor, Tuple> keyedStream = streamOperator.keyBy("id");
//TODO 4.使用Process将数据的VC做累加 实现类似Sum的功能
keyedStream.process(new KeyedProcessFunction<Tuple, WaterSensor, WaterSensor>() {
//定义一个累加器计算累加结果
private HashMap<String, Integer> map = new HashMap<>();
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
//判断map集合中是否有对应key的数据
if (map.containsKey(value.getId())){
//key存在
//取出上一次累加过后的vc和
Integer lastVc = map.get(value.getId());
//与当前的vc进行累加
int currentVcSum = lastVc + value.getVc();
//将当前的vc累加过后的结果重新写入map集合
map.put(value.getId(),currentVcSum);
//将数据发送至下游
out.collect(new WaterSensor(value.getId(),value.getTs(),currentVcSum));
}else {
//key不存在,证明这是第一条数据
//将这条数据存入map集合中
map.put(value.getId(),value.getVc());
out.collect(value);
}
}
}).print();
env.execute();
}
}
2.3.11 对流重新分区的几个算子
Ø KeyBy
先按照key分组, 按照key的双重hash来选择后面的分区
Ø shuffle
对流中的元素随机分区
Ø reblance
对流中的元素平均分布到每个区.当处理倾斜数据的时候, 进行性能优化
Ø rescale
同 rebalance一样, 也是平均循环的分布数据。但是要比rebalance更高效, 因为rescale不需要通过网络, 完全走的"管道"。
2.4 Sink
Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作.
之前我们一直在使用的print方法其实就是一种Sink
Flink内置了一些Sink, 除此之外的Sink需要用户自定义! 可以在官网上看https://flink.apache.org/
2.4.1 KafkaSink
Ø 需要添加Kafka Connector依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
Ø 启动Kafka集群
Sink到Kafka的示例代码
public class Flink13_Sink_Kafka {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9991);
//3.将数据转为json字符串
SingleOutputStreamOperator<String> map = streamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] split = value.split(" ");
WaterSensor waterSensor = new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
return JSONObject.toJSONString(waterSensor);
}
});
//TODO 4.将数据写入Kafka
map.addSink(new FlinkKafkaProducer<String>("hadoop102:9092","topic_sensor",new SimpleStringSchema()));
env.execute();
}
}
在linux启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_sensor
2.4.2 RedisSink
Ø 需要添加Redis Connector依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
Ø 启动Redis服务器
Ø Sink到Redis的示例代码
public class Flink01_Sink_Redis {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9991);
//3.将读过来的数据转为JavaBean
SingleOutputStreamOperator<WaterSensor> streamOperator = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(" ");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
//TODO 4.将数据写入Redis
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop102")
.setPort(6379)
.build();
streamOperator.addSink(new RedisSink<>(jedisPoolConfig, new RedisMapper<WaterSensor>() {
/**
* 指定redis的写入命令
* additionalKey是在使用hash或者sort Set的时候需要指定的,hash类型指的是redis的大key
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"1027");
}
/**
* 指定redis的key
* @param waterSensor
* @return
*/
@Override
public String getKeyFromData(WaterSensor waterSensor) {
return waterSensor.getId();
}
/**
* 指定存入redis的value
* @param waterSensor
* @return
*/
@Override
public String getValueFromData(WaterSensor waterSensor) {
return JSONObject.toJSONString(waterSensor);
}
}));
env.execute();
}
}
Redis查看是否收到数据
redis-cli --raw
注意:
发送了5条数据, redis中只有2条数据. 原因是hash的field的重复了, 后面的会把前面的覆盖掉
2.4.3 ElasticsearchSink
Ø 添加Elasticsearch Connector依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.13.0</version>
</dependency>
Ø 启动Elasticsearch集群
Ø Sink到Elasticsearch的示例代码
public class Flink02_Sink_ES {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9991);
//3.将读过来的数据转为JavaBean
SingleOutputStreamOperator<WaterSensor> streamOperator = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(" ");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
//TODO 4.将数据写入ES
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop102",9200));
httpHosts.add(new HttpHost("hadoop103",9200));
httpHosts.add(new HttpHost("hadoop104",9200));
ElasticsearchSink.Builder<WaterSensor> waterSensorBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<WaterSensor>() {
@Override
public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {
//指定要插入的索引,类型,名 docId
IndexRequest indexRequest = new IndexRequest("001_flinnk", "_doc", "001");
String jsonString = JSONObject.toJSONString(element);
//放入数据 显示声明为Json字符串
indexRequest.source(jsonString, XContentType.JSON);
//添加写入请求
indexer.add(indexRequest);
}
});
//因为读的是无界数据流,ES会默认将数据先缓存起来,如果要实现来一条写一条,则将这个参数设置为1, 注意:生产中不要这么设置
waterSensorBuilder.setBulkFlushMaxActions(1);
streamOperator.addSink(waterSensorBuilder.build());
env.execute();
}
}
Elasticsearch查看是否收到数据
注意
Ø 如果出现如下错误:
添加log4j2的依赖:
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
2.4.4 自定义Sink
Ø 在mysql中创建数据库和表
Ø 导入Mysql驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
Ø 写到Mysql的自定义Sink示例代码
public class Flink03_Sink_Custom {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.将读过来的数据转为JavaBean
SingleOutputStreamOperator<WaterSensor> waterSensorDStream = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
//TODO 4.自定义Sink将数据写入Mysql
waterSensorDStream.addSink(new MySinkFun());
env.execute();
}
// public static class MySinkFun implements SinkFunction<WaterSensor>{
public static class MySinkFun extends RichSinkFunction<WaterSensor>{
private Connection connection;
private PreparedStatement pstm;
//使用复函数 声明周期方法优化连接
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("创建连接");
//获取连接
connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false", "root", "000000");
//获取语句预执行者
pstm = connection.prepareStatement("insert into sensor values (?,?,?)");
}
@Override
public void invoke(WaterSensor value, Context context) throws Exception {
//给占位符赋值
pstm.setString(1, value.getId());
pstm.setLong(2, value.getTs());
pstm.setInt(3, value.getVc());
pstm.execute();
}
@Override
public void close() throws Exception {
System.out.println("关闭连接");
pstm.close();
connection.close();
}
}
}
2.4.5 JDBCSink
Ø 添加JDBC Connector依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.0</version>
</dependency>
Ø Sink到MySQL的示例代码
public class Flink04_Sink_JDBC {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口读取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.将读过来的数据转为JavaBean
SingleOutputStreamOperator<WaterSensor> waterSensorDStream = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
//TODO 4.通过JDBC方式将数据写入Mysql
SinkFunction<WaterSensor> sinkFunction = JdbcSink.<WaterSensor>sink("insert into sensor values(?,?,?)",
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
}
},
new JdbcExecutionOptions.Builder()
//数据来一条写一条
.withBatchSize(1)
.build(),
new JdbcConnectionOptions
.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop102:3306/test?useSSL=false")
.withUsername("root")
.withPassword("000000")
.withDriverName(Driver.class.getName())
.build()
);
waterSensorDStream.addSink(sinkFunction);
env.execute();
}
}
2.5 执行模式(Execution Mode)
Flink在1.12.0上对流式API新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes)。
流式API的传统执行模式我们称之为STREAMING 执行模式, 这种模式一般用于无界流, 需要持续的在线处理。
1.12.0新增了一个BATCH执行模式, 这种执行模式在执行方式上类似于MapReduce框架. 这种执行模式一般用于有界数据,目的为了更像流批一体。
默认是使#用的STREAMING 执行模式。
2.5.1 配置BATH执行模式
执行模式有3个选择可配:
1 STREAMING(默认)
2 BATCH
3 AUTOMATIC
Ø 通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
Ø 通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建议: 不要在运行时配置(代码中配置), 而是使用命令行配置, 引用这样会灵活: 同一个应用即可以用于无界数据也可以用于有界数据,无界数据不能使用Batch模式
2.5.2 有界数据用STREAMING和BATCH的区别
STREAMING模式下, 数据是来一条输出一次结果。
BATCH模式下, 数据处理完之后, 一次性输出结果。
下面展示WordCount的程序读取文件内容在不同执行模式下的执行结果对比:
Ø 流式模式
// 默认流式模式, 可以不用配置
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
Ø 批处理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Ø 自动模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
3 Flink流处理高阶编程
3.1 Flink的window机制
3.1.1 窗口概述
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.
3.1.2 窗口的分类
窗口分为2类:
- 基于时间的窗口(时间驱动)
- 基于元素个数的(数据驱动)
基于时间的窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸.
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间差的方法(maxTimestamp())
时间窗口又分4种:
Ø 滚动窗口(Tumbling Windows)
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
示例代码:
public class Flink10_Window_Time_Tumbling {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.将数据转为Tuple
SingleOutputStreamOperator<Tuple2<String, Long>> wordToOneDStream = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] split = value.split(" ");
for (String s : split) {
out.collect(Tuple2.of(s, 1L));
}
}
});
//4.将相同的单词聚合到同一个分区
KeyedStream<Tuple2<String, Long>, Tuple> keyedStream = wordToOneDStream.keyBy(0);
//TODO 5.开启一个基于时间的滚动窗口
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
String msg =
"窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据 ";
out.collect(msg);
}
});
process.print();
window.sum(1).print();
env.execute();
}
}
说明:
- 时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定.
- 我们传递给window函数的对象叫窗口分配器.
Ø 滑动窗口(Sliding Windows)
与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据.
示例代码:
public class Flink11_Window_Time_Slidling {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.从端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//3.将数据转为Tuple
SingleOutputStreamOperator<Tuple2<String, Long>> wordToOneDStream = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] split = value.split(" ");
for (String s : split) {
out.collect(Tuple2.of(s, 1L));
}
}
});
//4.将相同的单词聚合到同一个分区
KeyedStream<Tuple2<String, Long>, Tuple> keyedStream = wordToOneDStream.keyBy(0);
//TODO 5.开启一个基于时间的滑动窗口
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> window = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(6), Time.seconds(3)));
SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
String msg =
"窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据 ";
out.collect(msg);
}
});
process.print();
window.sum(1).print();
env.execute();
}
}
Ø 会话窗口(Session Windows)
会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口
示例代码:
- 静态gap
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
- 动态gap
.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
@Override
public long extract(Tuple2<String, Long> element) { // 返回 gap值, 单位毫秒
return element.f0.length() * 1000;
}
}))
创建原理:
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动, 滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction
Ø 全局窗口(Global Windows)
全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任务计算, 因为这种窗口没有能够处理聚集在一起元素的结束点.
示例代码:
.window(GlobalWindows.create());
基于元素个数的窗口
按照指定的数据条数生成一个Window,与时间无关
分2类:
Ø 滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:那个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
Ø 滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)
3.1.3 Window Function
前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction (全窗口函数)可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素
增量聚合函数是来一条计算一条,而全窗口函数则是等到数据都到了再做计算做一次计算。
Ø ReduceFunction(增量聚合函数----不会改变数据的类型)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
System.out.println(value1 + " ----- " + value2);
// value1是上次聚合的结果. 所以遇到每个窗口的第一个元素时, 这个函数不会进来
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
Ø AggregateFunction(增量聚合函数----可以改变数据的类型)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Tuple2<String, Long>, Long, Long>() {
// 创建累加器: 初始化中间值
@Override
public Long createAccumulator() {
System.out.println("createAccumulator");
return 0L;
}
// 累加器操作
@Override
public Long add(Tuple2<String, Long> value, Long accumulator) {
System.out.println("add");
return accumulator + value.f1;
}
// 获取结果
@Override
public Long getResult(Long accumulator) {
System.out.println("getResult");
return accumulator;
}
// 累加器的合并: 只有会话窗口才会调用
@Override
public Long merge(Long a, Long b) {
System.out.println("merge");
return a + b;
}
})
Ø ProcessWindowFunction(全窗口函数)
.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
// 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
@Override
public void process(String key,
Context context,
Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println(context.window().getStart());
long sum = 0L;
for (Tuple2<String, Long> t : elements) {
sum += t.f1;
}
out.collect(Tuple2.of(key, sum));
}
})
全窗口函数应用场景:可以求百分之多少的数据或者求平均数这种需要把全部数据拿到之后再求的场景。
3.2 Keyed vs Non-Keyed Windows
其实, 在用window前首先需要确认应该是在keyBy后的流上用, 还是在没有keyBy的流上使用.
在keyed streams上使用窗口, 窗口计算被并行的运用在多个task上, 可以认为每个分组都有自己单独窗口. 正如前面的代码所示.
在non-keyed stream上使用窗口,无论并行度设置的是几窗口的并行度都是1, 所有的窗口逻辑只能在一个单独的task上执行.
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
需要注意的是: 非key分区的流, 即使把并行度设置为大于1 的数, 窗口也只能在某个分区上使用
3.3 Flink中的时间语义与WaterMark
3.3.1 Flink中的时间语义
在Flink的流式操作中, 会涉及不同的时间概念
处理时间(process time)
处理时间是指的执行操作的各个设备的时间
对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟.比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据. 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据, 然后下个窗口是10:00am-11:00am, 等等
处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序
在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器
事件时间(event time)
事件时间是指的这个事件发生的时间.
在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在.
在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关. 事件时间程序必须制定如何产生Event Time Watermarks(水印) . 在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟).
在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果. 事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器
注意:
在1.12之前默认的时间语义是处理时间, 从1.12开始, Flink内部已经把默认的语义改成了事件时间
3.3.2 哪种时间更重要
(1)按上映时间看处理时间重要
(2)按剧情的顺序观看电影事件事件重要
3.3.3 Flink中的WaterMark
支持event time的流式处理框架需要一种能够测量event time 进度的方式.比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间, 从而在程序中去关闭这个窗口.
事件时间可以不依赖处理时间来表示时间的进度.例如,在程序中,即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间.另外一方面,使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据, 这些数据可以照样被正确处理,就像实时发生的一样能够进入正确的窗口.
这种在Flink中去测量事件时间的进度的机制就是watermark(水印). watermark作为数据流的一部分在流动, 并且携带一个时间戳t.
一个Watermark(t)表示在这个流里面事件时间已经到了时间t, 意味着此时, 流中不应该存在这样的数据: 他的时间戳t2<=t (时间比较旧或者等于时间戳)
总结:
1、 衡量事件时间的进展。
2、 是一个特殊的时间戳,生成之后随着流的流动而向后传递。
3、 用来处理数据乱序的问题。
4、 触发窗口等得计算、关闭。
5、 单调递增的(时间不能倒退)。
6、 Flink认为,小于Watermark时间戳的数据处理完了,不应该再出现。
Ø 有序流中的水印
在下面的这个图中, 事件是有序的(生成数据的时间和被处理的时间顺序是一致的), watermark是流中一个简单的周期性的标记。
有序场景:
1、 底层调用的也是乱序的Watermark生成器,只是乱序程度传了一个0ms。
2、 Watermark = maxTimestamp – outOfOrdernessMills – 1ms
= maxTimestamp – 0ms – 1ms
=>事件时间 – 1ms
Ø乱序流中的水印
在下图中, 按照他们时间戳来看, 这些事件是乱序的, 则watermark对于这些乱序的流来说至关重要.
通常情况下, 水印是一种标记, 是流中的一个点, 所有在这个时间戳(水印中的时间戳)前的数据应该已经全部到达. 一旦水印到达了算子, 则这个算子会提高他内部的时钟的值为这个水印的值.
乱序场景:
1、 什么是乱序 => 时间戳大的比时间戳小的先来
2、 乱序程度设置多少比较合适?
a) 经验值 => 对自身集群和数据的了解,大概估算。
b) 对数据进行抽样。
c) 肯定不会设置为几小时,一般设为 秒 或者 分钟。
3、 Watermark = maxTimestamp – outOfOrdernessMills – 1ms
=>当前最大的事件时间 – 乱序程度(等待时间)- 1ms
3.3.4 Flink中如何产生水印
在 Flink 中, 水印由应用程序开发人员生成, 这通常需要对相应的领域有 一定的了解。完美的水印永远不会错:在特殊情况下(例如非乱序事件流),最近一次事件的时间戳就可能是完美的水印。时间戳小于水印标记时间的事件不会再出现
启发式水印则相反,它只估计时间,因此有可能出错, 即迟到的事件 (其时间戳小于水印标记时间)晚于水印出现。针对启发式水印, Flink 提供了处理迟到元素的机制。
设定水印通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过 5 秒, 就可以将水印标记时间设为收到的最大时间戳减去 5 秒。 另 一种做法是,采用一个 Flink 作业监控事件流,学习事件的迟到规律,并以此构建水印生成模型。
3.3.5 EventTime和WaterMark的使用
Flink内置了两个WaterMark生成器:
- Monotonously Increasing Timestamps(时间戳单调增长:其实就是允许的延迟为0)
WatermarkStrategy.forMonotonousTimestamps();
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
public class Flink10_Chapter07_OrderedWaterMark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
});
// 创建水印生产策略
WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // // 最大容忍的延迟时间
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
stream
.assignTimestampsAndWatermarks(wms) // 指定水印和时间戳
.keyBy(WaterSensor: :getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
String msg = "当前key: " + key
+ "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd()/1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据 ";
out.collect(msg);
}
})
.print();
env.execute();
}
}
3.3.6 自定义WatermarkStrategy
有2种风格的WaterMark生产方式: periodic(周期性) and punctuated(间歇性).都需要继承接口: WatermarkGenerator
Ø 周期性
public class Flink09_Window_EventTime_Tumbling_CustomerPeriod {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取端口数据并转为javaBean
SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 9999)
.map(date -> {
String[] words = date.split(" ");
return new WaterSensor(words[0], Long.parseLong(words[1]), Integer.parseInt(words[2]));
});
//3.自定义WatermarkStrategy
SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS.assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() {
//生成自定义的watermark
@Override
public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new Myperiod(2000L);
}
}.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {//提取数据的时间戳
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000L;
}
}));
//4.按照id分组
KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(date -> date.getId());
//5.开窗
WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
//6.计算总和
window.sum("vc").print();
env.execute();
}
//自定义周期性的Watermark生成器
public static class Myperiod implements WatermarkGenerator<WaterSensor>{
//最大时间戳
private Long maxTs;
//最大延迟时间
private Long maxDelay;
//构造方法
public Myperiod(Long maxDelay) {
this.maxDelay = maxDelay;
this.maxTs = Long.MIN_VALUE + this.maxDelay + 1;
}
//当数据来的时候调用
@Override
public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
maxTs = Math.max(eventTimestamp, maxTs);
}
//周期性调用
@Override
public void onPeriodicEmit(WatermarkOutput output) {
System.out.println("生成Watermark");
output.emitWatermark(new Watermark(maxTs-maxDelay-1L));
}
}
}
Ø 间歇性
public class Flink12_Chapter07_punctuated {
public static void main(String[] args) throws Exception {
// 省略....
//自定义间歇性的Watermark生成器
public static class Myperiod implements WatermarkGenerator<WaterSensor>{
//最大时间戳
private Long maxTs;
//最大延迟时间
private Long maxDelay;
public Myperiod(Long maxDelay) {
this.maxDelay = maxDelay;
this.maxTs = Long.MIN_VALUE + this.maxDelay + 1;
}
//当数据来的时候调用---间歇性
@Override
public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
//获取当前数据中最大的时间戳并赋值
System.out.println("生成Watermark");
maxTs = Math.max(eventTimestamp, maxTs);
output.emitWatermark(new Watermark(maxTs-maxDelay-1));
}
//周期性调用
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
}
3.3.7 多并行度下WaterMark的传递
总结: 1.多并行度的条件下, 向下游传递WaterMark的时候是以广播的方式传递的2.总是以最小的那个WaterMark为准! 木桶原理!3.并且当watermark值没有增长的时候不会向下游传递,注意:生成不变。
3.4 窗口允许迟到的数据
已经添加了wartemark之后, 仍有数据会迟到怎么办? Flink的窗口, 也允许迟到数据.
当触发了窗口计算后, 会先计算当前的结果, 但是此时并不会关闭窗口.以后每来一条迟到数据, 则触发一次这条数据所在窗口计算(增量计算).
那么什么时候会真正的关闭窗口呢? wartermark 超过了窗口结束时间+等待时间
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
注意:
允许迟到只能运用在event time上
3.5 侧输出流(sideOutput)
3.5.1 处理窗口关闭之后的迟到数据
允许迟到数据, 窗口也会真正的关闭, 如果还有迟到的数据怎么办? Flink提供了一种叫做侧输出流的来处理关窗之后到达的数据.
public class Flink13_Chapter07_SideOutput {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
System.out.println(env.getConfig());
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
});
// 创建水印生产策略
WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 最大容忍的延迟时间
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator<String> result = stream
.assignTimestampsAndWatermarks(wms)
.keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(new OutputTag<WaterSensor>("side_1") {
}) // 设置侧输出流
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
String msg = "当前key: " + key
+ " 窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据" +
"watermark: " + context.currentWatermark();
out.collect(context.window().toString());
out.collect(msg);
}
});
result.print();
result.getSideOutput(new OutputTag<WaterSensor>("side_1"){}).print();
env.execute();
}
}
允许迟到数据+侧输出流作用:
尽量快速提供一个近似准确结果,为了保证时效性,然后加上允许迟到数据+侧输出流得到最终的数据,这样也不用维护大量的窗口,性能也就会好很多。
3.5.2 使用侧输出流把一个流拆成多个流
split算子可以把一个流分成两个流, 从1.12开始已经被移除了. 官方建议我们用侧输出流来替换split算子的功能.
需求: 采集监控传感器水位值,将水位值高于5cm的值输出到side output
SingleOutputStreamOperator<WaterSensor> result =
env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
.keyBy(ws -> ws.getTs())
.process(new KeyedProcessFunction<Long, WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
out.collect(value);
if (value.getVc() > 5) { //水位大于5的写入到侧输出流
ctx.output(new OutputTag<WaterSensor>("警告") {}, value);
}
}
});
result.print("主流");
result.getSideOutput(new OutputTag<WaterSensor>("警告"){}).print("警告");