通过阅读官方文档,整理一些前面demo
中没有用到但是很有用的东西。排序不分先后。
2018.11.1补充
Flink 基于窗口处理流数据时,如果消息延时,数据会落错窗口,最终导致计算结果错误。对于消息延时不敏感的应用可以忽略,如果消息延时敏感,推荐使用event 事件窗口。但是event 窗口在某些场景依然有问题。Flink 提供了水印机制。推荐阅读这两篇博文,作者图文集合,写得很详细。
Flink事件时间处理和水印 --这里介绍了消息延时带来的问题和解决方案
Flink流计算编程--watermark(水位线)简介 --标题说明了一切
(1)
创建StreamExecutionEnvironment
对象有多种方式,如下所示。但是通常用默认方式就可以,它可以根据所处环境自动做出正确的选择。
//默认
StreamExecutionEnvironment.getExecutionEnvironment();
//从本地环境创建
StreamExecutionEnvironment.createLocalEnvironment();
//远程创建
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
(2)
累加器的使用
//创建累加器对象
private IntCounter numLines = new IntCounter();
//在 RichFunction 的open 方法中使用
getRuntimeContext().addAccumulator("num-lines", this.numLines);
//任何你需要使用的地方
this.numLines.add(1);
//从执行变量的返回结果中获取结果
myJobExecutionResult.getAccumulatorResult("num-lines");
(3)
DataStream 延迟控制
。默认情况下,元素不会逐个传输到网络上(这会导致不必要的网络流量),但会被缓冲。要控制吞吐量和延迟,可以使用env.setBufferTimeout(timeoutMillis)
设置缓冲区填充的最长等待时间。默认是100ms
。请注意,如果设置setBufferTimeout(-1)
表示缓冲区满了才刷新,如果想尽可能的缩小延迟,可以设置一个接近于0
但不要等于0
的数,因为它可能导致严重的性能下降。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
(4)
Flink
提供了从java
集合中获取数据源的接口,这样可以方便在本地测试。请注意,从java
集合获取数据源,并发度只能设置为1
,且数据元素必须可序列化。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
(5)
DateSet.project
函数。该函数用于元祖数据集,从元组中选择字段的子集。如下
DataSet<Tuple3<Integer, Double, String>> in = // [...]
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
(6)
MinBy / MaxBy
函数。该函数用于从元祖数据集中找出指定的字段的最小/最大值组成的新元祖集合。
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// a DataSet with a single tuple with minimum values for the Integer and String fields.
DataSet<Tuple3<Integer, Double, String>> out = in.minBy(0, 2);
// a DataSet with one tuple for each group with the minimum value for the Double field.
DataSet<Tuple3<Integer, Double, String>> out2 = in.groupBy(2).minBy(1);
(7)
使用构造函数或withParameters(Configuration)
方法将参数传递给函数。
//使用构造函数
DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
toFilter.filter(new MyFilter(2));
private static class MyFilter implements FilterFunction<Integer> {
private final int limit;
public MyFilter(int limit) {
this.limit = limit;
}
@Override
public boolean filter(Integer value) throws Exception {
return value > limit;
}
}
//使用withParameters(Configuration)
DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
Configuration config = new Configuration();
config.setInteger("limit", 2);
toFilter.filter(new RichFilterFunction<Integer>() {
private int limit;
@Override
public void open(Configuration parameters) throws Exception {
limit = parameters.getInteger("limit", 0);
}
@Override
public boolean filter(Integer value) throws Exception {
return value > limit;
}
}).withParameters(config);
此外,还可以为全局设置参数
Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);
(8)
DataStream
转换 Table
时指定process time
的两种方式。
第一种在之前的demo
中有介绍,即扩展一个额外字段,并添加.proctime
指定,且改字段必须放在最后。
Table table = tEnv.fromDataStream(stream, "a, b, extra.proctime");
第二种,使用TableSource
,如下
// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"a" , "b"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
DataStream<Row> stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// field with this name will be appended as a third field
return "extra";
}
}
// register table source
tEnv.registerTableSource("tableName", new UserActionSource());
WindowedTable windowedTable = tEnv
.scan("tableName")
.window(Tumble.over("10.minutes").on("extra").as("win"));