Apache Flink 学习笔记(五)

通过阅读官方文档,整理一些前面demo中没有用到但是很有用的东西。排序不分先后。

2018.11.1补充

Flink 基于窗口处理流数据时,如果消息延时,数据会落错窗口,最终导致计算结果错误。对于消息延时不敏感的应用可以忽略,如果消息延时敏感,推荐使用event 事件窗口。但是event 窗口在某些场景依然有问题。Flink 提供了水印机制。推荐阅读这两篇博文,作者图文集合,写得很详细。
Flink事件时间处理和水印 --这里介绍了消息延时带来的问题和解决方案
Flink流计算编程--watermark(水位线)简介 --标题说明了一切

(1)

创建StreamExecutionEnvironment 对象有多种方式,如下所示。但是通常用默认方式就可以,它可以根据所处环境自动做出正确的选择。

//默认
StreamExecutionEnvironment.getExecutionEnvironment();
//从本地环境创建
StreamExecutionEnvironment.createLocalEnvironment();
//远程创建
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);

(2)

累加器的使用

//创建累加器对象
private IntCounter numLines = new IntCounter();

//在 RichFunction 的open 方法中使用
getRuntimeContext().addAccumulator("num-lines", this.numLines);

//任何你需要使用的地方
this.numLines.add(1);

//从执行变量的返回结果中获取结果
myJobExecutionResult.getAccumulatorResult("num-lines");

(3)

DataStream 延迟控制。默认情况下,元素不会逐个传输到网络上(这会导致不必要的网络流量),但会被缓冲。要控制吞吐量和延迟,可以使用env.setBufferTimeout(timeoutMillis)设置缓冲区填充的最长等待时间。默认是100ms。请注意,如果设置setBufferTimeout(-1)表示缓冲区满了才刷新,如果想尽可能的缩小延迟,可以设置一个接近于0但不要等于0的数,因为它可能导致严重的性能下降。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

(4)

Flink 提供了从java 集合中获取数据源的接口,这样可以方便在本地测试。请注意,从java 集合获取数据源,并发度只能设置为1,且数据元素必须可序列化。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

(5)

DateSet.project函数。该函数用于元祖数据集,从元组中选择字段的子集。如下

DataSet<Tuple3<Integer, Double, String>> in = // [...]
DataSet<Tuple2<String, Integer>> out = in.project(2,0);

(6)

MinBy / MaxBy函数。该函数用于从元祖数据集中找出指定的字段的最小/最大值组成的新元祖集合。

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// a DataSet with a single tuple with minimum values for the Integer and String fields.
DataSet<Tuple3<Integer, Double, String>> out = in.minBy(0, 2);
// a DataSet with one tuple for each group with the minimum value for the Double field.
DataSet<Tuple3<Integer, Double, String>> out2 = in.groupBy(2).minBy(1);

(7)

使用构造函数或withParameters(Configuration)方法将参数传递给函数。

//使用构造函数
DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
toFilter.filter(new MyFilter(2));
private static class MyFilter implements FilterFunction<Integer> {
  private final int limit;
  public MyFilter(int limit) {
    this.limit = limit;
  }

  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}
//使用withParameters(Configuration)
DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
Configuration config = new Configuration();
config.setInteger("limit", 2);
toFilter.filter(new RichFilterFunction<Integer>() {
    private int limit;
    @Override
    public void open(Configuration parameters) throws Exception {
      limit = parameters.getInteger("limit", 0);
    }

    @Override
    public boolean filter(Integer value) throws Exception {
      return value > limit;
    }
}).withParameters(config);

此外,还可以为全局设置参数

Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);

(8)

DataStream 转换 Table时指定process time 的两种方式。

第一种在之前的demo中有介绍,即扩展一个额外字段,并添加.proctime指定,且改字段必须放在最后。

Table table = tEnv.fromDataStream(stream, "a, b, extra.proctime");

第二种,使用TableSource,如下

// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"a" , "b"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // create stream 
        DataStream<Row> stream = ...;
        return stream;
    }

    @Override
    public String getProctimeAttribute() {
        // field with this name will be appended as a third field 
        return "extra";
    }
}

// register table source
tEnv.registerTableSource("tableName", new UserActionSource());

WindowedTable windowedTable = tEnv
    .scan("tableName")
    .window(Tumble.over("10.minutes").on("extra").as("win"));
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 13,797评论 1 32
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 10,792评论 0 9
  • MySQL逻辑架构 下面是一幅MySQL各组件之间如何协同工作的架构图,有助于我们深入理解MySQL服务器。 如图...
    骑小猪看流星阅读 10,235评论 2 135
  • http://www.jianshu.com/p/fbe6a654604c
    SmallTwo阅读 1,313评论 0 0
  • 科罗拉多大峡谷位于美国亚利桑那州(Arizona)西北部,科罗拉多高原西南部。大峡谷全长446千米,平均宽度16千...
    小强_5941阅读 3,762评论 1 0