flink 并行度设置 优先级从低到高排序,排序为api>env>p>file,parallelism不能多与slot个数,设置合适的并行度,能提高运算效率
flink 并行度大于kafka分区数,则有任务会空转,解决办法参考
http://yangaidi.com/2019/05/24/flink%E6%B6%88%E8%B4%B9kafka%E6%97%B6%E5%87%BA%E7%8E%B0%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C%E7%9A%84%E5%8E%9F%E5%9B%A0%E5%92%8C%E5%A4%84%E7%90%86%E6%96%B9%E5%BC%8F/
rescale 性能高于Rebalancing(全局性通过网络传输实现) ,但是不能完全解决流式数据倾斜flink 如果某个任务执行一直报错,那么,会导致整个flink反压会非常高,并且checkpoint 一直失败,参考https://www.jianshu.com/p/539587047ad4
每个算子的一个并行度实例就是一个subtask。那么,带来很多问题,由于flink的taskmanager运行task的时候是每个task采用一个单独的线程,这就会带来很多线程切换开销,进而影响吞吐量。为了减轻这种情况,flink进行了优化,也即对subtask进行chain操作,chain操作结束之后得到的task,再作为一个
调度执行单元
,放到一个线程里执行,如下图,一共是6个调度执行单元
,需要6个线程来完成任务
2019-11-25 19:04:35,989 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /export11/hadoop/yarn/local/usercache/xcardata/appcache/application_1571902955229_72425/flink-io-0f70d123-659f-4d4a-8377-0e8b8820129b/job_c2b9ef1d6bb94e5bb71b85c16ce77b7f_op_WindowOperator_47d89856a1cf553f16e7063d953b7d42__3_8__uuid_1408db12-99ce-4e56-8b31-f4d190b9bfe4.
2019-11-25 19:04:35,990 INFO org.apache.flink.runtime.taskmanager.Task - Window(GlobalWindows(), PurgingTrigger, ScalaProcessWindowFunctionWrapper) (3/8) (92e2021161941cd65e0e010c1bdf8697) switched from RUNNING to FAILED.
java.lang.NullPointerException
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'shyt-hadoop-4024.xxx.com.cn/xx.xx.xx.xx:28212'. This might indicate that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
- 观察是否存在数据倾斜
参考
http://yangaidi.com/2019/07/01/keyBy%E5%88%86%E7%BB%84%E9%94%AE%E5%87%8F%E5%B0%91%E4%BD%86%E6%95%88%E7%8E%87%E5%8D%B4%E9%99%8D%E4%BD%8E%E6%82%96%E8%AE%BA%E2%80%94%E2%80%94flink%E7%AA%97%E5%8F%A3%E8%81%9A%E5%90%88%E6%97%B6%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C%E5%88%86%E6%9E%90/
- kafka 和 flink 周期性的产生消息堆积,问题和checkpoint 周期有关系
参考
flink https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
http://yangaidi.com/2019/06/06/%E8%AE%B0%E4%B8%80%E6%AC%A1flink%E5%A4%84%E7%90%86kafka%E6%B6%88%E6%81%AF%E6%97%B6%E7%9A%84%E5%91%A8%E6%9C%9F%E6%80%A7%E5%A0%86%E7%A7%AF%E9%97%AE%E9%A2%98/ - flink processWindowFunction checkpoint state 状态过大,也可能因为processWindowFunction函数使用,因为它是有状态算子
https://stackoverflow.com/questions/53943989/flink-window-operator-checkpointing