Flink中实用的小知识点整理

目录

1、Flink使用WaterMark处理乱序事件
2、累加器和计数器
3、Window使用
4、流的切分和合并
5、任务链
6、Flink消费kafka数据起始offset配置
7、Flink消费kafka数据,消费者offset提交配置
8、数据源
9、数据存放
10、运行时环境的区别
11、keyedStream中进行聚合操作

一.Flink使用WaterMark处理乱序事件[1]

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

关于水位线机制这篇博客讲的比较通俗易懂,Flink流计算编程--watermark(水位线)简介

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//将TimeCharactersistic设置为EventTime。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val text = env.socketTextStream("localhost", 9999)

// 指定watemark的实现
text.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator ) 

自定义的实现AssignerWithPeriodicWatermarks接口的类

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
    //最大允许的乱序时间是10s,告诉Flink希望消息最多有10s的延迟,每个窗口仅在waterMark通过时被处理
    val maxOutOfOrderness = 10000L;
    var currentMaxTimestamp: Long; 

    //extractTimestamp方法是从数据本身中提取EventTime
    override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }
   //getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness来获取的
    override def getCurrentWatermark(): Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
使用WaterMark是,window的触发时机?

1.首先
window的设定无关数据本身,而是系统定义好了的。如果window大小是3秒,那么1分钟内会把window划分为如下的形式:

[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)

2.其次
输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。

3.最终
window的触发要符合以下几个条件:

  • watermark时间 >= window_end_time
  • [window_start_time,window_end_time) 中有数据存在

4.注意

  • watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加。
  • 对于late element太多的数据而言,由于Event Time < watermark时间,所以来一条就触发一个window。
  • 也可以使用AllowedLateness功能设置消息的最大允许时间来解决处理延迟的消息。

二.累加器和计数器[2]

Flink和Spark一样,都提供了累加器供我们使用,它们大多用于一些计数,计算一些指标的场景。
计数器也是一种累加器,它是最简单的累加器,作计数功能使用。在Flink类部,内置了很多计数器,比如IntCounter,LongCounter和DoubleCounter等等。

那么如何使用累加器呢?主要分为下面的几部:

  • 第一步:在自定义的转换操作里创建累加器对象:private IntCounter numLines = new IntCounter();
  • 第二步:注册累加器对象,通常是在rich function的open()方法中。这里你还需要定义累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
  • 第三步:在operator函数的任何地方使用累加器,包括在open()和close()方法中this.numLines.add(1);
  • 第四步:结果存储在JobExecutionResult里:JobExecutionResult jobExecutionResult =env.execute("Accumulator"); jobExecutionResult .getAccumulatorResult("num-lines");

如果需要可以选择实现Accumulator或者SimpleAccumulator来自定义累加器。

  • Accumulator<V, R>是最灵活的:它定义了需要进行累加的值的类型V以及最后结果的类型R
  • SimpleAccumulator则是在进行累计数据类型和返回的数据类型一致的情况下使用的,例如计数器。
/**
  * Flink的累加器使用
  */
object flinkBatch {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements("Hello Jason What are you doing Hello world")
    val counts = text
      .flatMap(_.toLowerCase.split(" "))
      .map(new RichMapFunction[String, String] {
        //创建累加器
        val acc = new IntCounter()
        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          //注册累加器
          getRuntimeContext.addAccumulator("accumulator", acc)
        }
        override def map(in: String): String = {
          //使用累加器
          this.acc.add(1)
          in
        }
      }).map((_,1))
      .groupBy(0)
      .sum(1)
    counts.writeAsText("d:/test.txt/").setParallelism(1)
    val res = env.execute("Accumulator Test")
    //获取累加器的结果
    val num = res.getAccumulatorResult[Int]("accumulator")
    println(num)
  }
}

三.Window使用[3]

在KeyedStream中进行使用,根据某个特征针对每个key用windows进行分组

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

dataStream.keyBy(0).timeWindow(Time.seconds(5),Time.seconds(2));

四.流的切分和合并[4]

1.将流分割成多个流

SplitStream split = dataStream.split(new OutputSelector() {
    @Override
    public Iterable select(Integer value) {
        List output = new ArrayList();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

从split stream中选择一个流

// SplitStream split;
DataStream even = split.select("even");
DataStream odd = split.select("odd");
DataStream all = split.select("even","odd");

2.合并多个数据流成一个新的数据流

dataStream.union(otherStream1, otherStream2, ...);

五.任务链[5]

把很多转化操作的任务链接在一起放到同一个thread中执行,可以获得更好的性能。使用 StreamExecutionEnvironment.disableOperatorChaining()可以在整个job中去除某个链节点。
(1)Start new chain

omeStream.filter(...).map(...).startNewChain().map(...);

(2)Disable chaining

someStream.map(...).disableChaining();

(3)Set slot sharing group

someStream.filter(...).slotSharingGroup("name");

六.Flink消费kafka数据起始offset配置[6]

Flink读取Kafka数据确定开始位置有以下几种设置方式:

1.从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromEarliest()

2.从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromLatest()

3.从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…)

4.默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者
offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。
flinkKafkaConsumer.setStartFromGroupOffsets()

七.Flink消费kafka数据,消费者offset提交配置[7]

Flink提供了消费kafka数据的offset如何提交给Kafka的配置。注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。

1.关闭checkpoint
如果禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,例如enable.auto.commit 配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。

2.开启checkpoint
如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(true/false)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。

八.数据源[8]

StreamExecutionEnvironment提供的一些访问数据源的接口
1.基于文件的数据源

readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

2.基于Socket的数据源

socketTextStream  //Linux中启动Socket端口 nc -lk 9999

3.基于Collection的数据源

fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)

4.addSource

添加新的源函数,例如从kafka中读取数据,

// SimpleStringSchema 指定key的格式
FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>(
            "flink-topic",
            new SimpleStringSchema(), 
            props);

env.addSource(consumer);

九.数据存放[9]

writeAsText() / 以字符串的形式逐行写入文件
writeAsCsv(...) / 将元组写出以逗号分隔的csv文件。注意:只能作用到元组数据上。
print() / printToErr() / 控制台直接输出结果,调用对象的toString()方法得到输出结果
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink  / 自定义接收函数或使用FlinkKafkaProducer将数据写入到Kafka

十.运行时环境的区别[10]

创建StreamExecutionEnvironment 对象有多种方式,如下所示。但是通常用默认方式就可以,它可以根据所处环境自动做出正确的选择。

//默认
StreamExecutionEnvironment.getExecutionEnvironment();
//从本地环境创建
StreamExecutionEnvironment.createLocalEnvironment();
//远程创建
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);

十一.keyedStream中进行聚合操作[11]

对于Key的指定方式可以参考我的这篇文章 Flink中指定Key的几种方式

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

这个脚注怎么去掉???


  1. Flink使用WaterMark处理乱序事件

  2. 累加器和计数器

  3. Window使用

  4. 流的切分和合并

  5. 任务链

  6. Flink消费kafka数据起始offset配置

  7. Flink消费kafka数据,消费者offset提交配置

  8. 数据源

  9. 数据存放

  10. 运行时环境的区别

  11. keyedStream中进行聚合操作

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350