数据分析实践 | flink | 流程优化篇

0x01flink执行流程了解一下

流程如下:

flink执行流程
由一个Source数据处理,结果分发到四个窗口进行处理。

0x02表象:

flink需要优化,最先表现出来的现状就是:
窗口中使用metric体现出每秒的数据处理量很低,或停止。

1.代码中添加metric使用方法可参考:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
2.如果使用flink dashboard也可以使用metric�功能进行统计

此处以flink dashboard为例。

0x03问题点及优化:

1.数据反压

数据体现(背压(Backpressure)机制):
-> 一个window中数据处理的速率慢
-> 导致Source数据处理过程越来越慢
-> 再导致所有窗口处理越来越慢。
dashboard体现:

dashboard可以在背压这里看到HIGH时,则存在数据反压问题。


flink数据反压

反压逻辑:
若流程为A-B-C-D-E-F ,ABCDE出现反压(即这里status为high),则表示F处理流程导致E -> D-> C->B ->A 相继慢。

优化方式:

1.数据标记分流[详细代码见通用优化]
2.窗口优化[详细代码见通用优化]

2.数据倾斜

在多进程环境下:

数据体现:
-> 每个窗口中所有数据的分布不平均,某个窗口处理数据量太大导致速率慢。
-> 导致Source数据处理过程越来越慢
-> 再导致所有窗口处理越来越慢。
dashboard体现:
dashboard中Subtasks中打开每个窗口可以看到每个窗口进程的运行情况:
flink数据倾斜

如上图,数据分布很不均匀,导致部分窗口数据处理缓慢。

优化方式:

1.数据标记分流[详细代码见通用优化]
2.窗口优化[详细代码见通用优化]
3.在不影响逻辑的前提下,keyby对数据分流时选择较为均匀的数据。

3.消费滞后

尚未出现数据反压和数据倾斜的状况,但是flink的watermarks追不上实时时间,不能实时处理。

需单进程确认点:

1. flink读取的数据是否产生的及时。
2. 窗口Aggregate处理是否存在死循环或较慢的点
    (如:正则/redis/http等)
3. flink计算结果的输出处理慢。
    (如:使用.disablechain.addsink()后再在dashboard中查看窗口和输出分别处理的速率)

可优化点:

  1. 将窗口的处理逻辑优化的简单一些,将较长时间的处理放在数据处理部分或windowFunction部分。

4.需在窗口内做大量的外连情况,如redis/es等,redis连接过多会慢或直接报错。[2019.11.17更新]

解决方案:

1.可以在窗口外面申请全局redis连接池作为全局变量。

class MyProcessWindowFunction extends RichWindowFunction[Accumulator,String,String,TimeWindow] {
  @transient var config_redis = new JedisPoolConfig()
  config_redis.setMaxTotal(300)
  config_redis.setMaxWaitMillis (2*1000)

  @transient var jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
  @transient var client = Esinit() // 此处为es外联的申明
  @transient var log = LoggerFactory.getLogger(getClass)
  //其他的一些全局变量也可以在这里定义,如log
  LoginCheck_api.KeepSession() 
  //检查保持状态的函数也可以在这里处理,这样不会每个窗口都处理一遍。

  override def apply (key: String, window: TimeWindow, input: Iterable[Accumulator], out: Collector[String]): Unit = {
    ... 
    //窗口如果定义为null则重新做定义
    if(jedisPool==null){
      w_log = LoggerFactory.getLogger(getClass)
    
      config_redis = new JedisPoolConfig()
      config_redis.setMaxTotal(300)
      config_redis.setMaxWaitMillis (2*1000)
      jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
      LoginCheck_api.KeepSession()
    }
    if(client==null){
      client = Esinit()
    }
    ...

2.网络延时问题[2019.12.4更新]
场景:flink反压,且排查redis无太多慢查日志
检查提交集群对redis的延时情况,正常应该在0.099ms以内不会影响到程序的处理过程。

3.将对外操作放进单独多线程操作(如果上述两个问题都解决不了问题)[2019.12.4更新]
以redis举例:

import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}

import redis.clients.jedis.{JedisPool, JedisPoolConfig}

object ThreadPool {
  var config_redis = new JedisPoolConfig()
  config_redis.setMaxTotal(500)
  config_redis.setMaxIdle(500)
  config_redis.setBlockWhenExhausted(false)
  config_redis.setMaxWaitMillis (1000)
  config_redis.setMinEvictableIdleTimeMillis(6000)
  config_redis.setTimeBetweenEvictionRunsMillis(3000)
  var jedisPool = new JedisPool(config_redis,"10.10.10.10",1234,0,"yourpassword")
  val threadPool:ExecutorService=Executors.newFixedThreadPool(500)
  def sadd(key:String,value:String):Int= {
    var r = 1
    try {
      val future=new FutureTask[String](new Callable[String] {
        override def call():String = {
          var isexists = 1L //sadd返回1为添加成功,0为已存在/添加不成功
          var jedis = jedisPool.getResource
          try{
            isexists = jedis.sadd(bolt_url,id_str)
          }catch{
            case e=>
          }finally {
            jedis.close()
          }
          return isexists.toString
        }
      })
      threadPool.execute(future)
      r = future.get().toInt //导出结果
      if(r==1){
      ...//逻辑操作
      }else{
      ...//逻辑操作
      }
    }finally {
//      threadPool.shutdown()
    }
    return r//可选择是否返回结果
  }

  def main (args: Array[String]): Unit = {
    var t =sadd("a","b")
    println(t) 
    threadPool.shutdown()
  }


}

而后在窗口中调用ThreadPool.sadd方法,获取到redis操作结果后的逻辑操作也可在窗口外进行,窗口只负责调度。

5.通用优化:

1.数据标记分流:

使用数据标记过滤进入窗口的数据,
而非使用filter,map等方式去筛选数据。
split分流 select选择分流. 

val frequency_ = Features.split(
        (s:Map[String,Any])=>
          s.get("method").get.toString  match{
          
            case "a"|"b"|"c"|
                  => List("str")
            case "1"|"2"
                  =>List("int")
            case _
                  =>List("normal")
          }

      )

val all = frequency_.select("str","int").assignTimestampsAndWatermarks(new TimestampExtractor())

all.keyby().aggregate()
      
      ...
      
     
    
Ps. https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
  
2.窗口聚合计算

window apply窗口最后触发时进行一次性计算 aggregate来一条数据计算一次。

Ps.https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
3.keyby关键词无法自行选择较均匀的情况下,
可以采用keyby(Random(20)+key)的形式进行分配窗口。

最好的方式:
原有DataStream中添加专门用于分窗口的字段,但是可能会影响你窗口聚合的结果。

def dealing_input(str):(String,String){
    val keyby_key = scala.util.Random.nextInt(20).toString+"-"+key
    return (data,keyby_key)
}

input.keyby(_._2).window().xxx
如何在处理完将随时数去掉请参考另一篇文章:
https://www.jianshu.com/p/1bca3c2758c1


遇坑待更新

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容