思考问题:
1.怎么样实现并行计算?
答:设置并行度。多线程,不同任务放到不同线程上。
2.并行的任务,需要占用多少slot?
3.一个流处理程序,到底包含多少个任务?
一、TaskManager和Slot的关系介绍
process:进程
Treads:线程
- 图中算子并行度最大的(算子后面的中括号数字代表并行度)为6,需要至少6个slot。
- 默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务。 这样的结果是,一个 slot 可以保存作业的整个管道。
- Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力
Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过Task Slot来进行控制(一个worker至少有一个Task Slot)。
这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的,但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到可以将这个Slot给并行的其他Job,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。
二、并行度(parallelism)
每一个线程占用一个slot,上图一中任务合并为上图二所示(任务链,后续讲解),图中算子并行度最大的(算子后面的中括号数字代表并行度)为2,所以整个flink程序的并行度为2,所以只需要2个slot就可以跑起来。
所有的Flink程序都是由三部分组成的: Source 、Transformation 和 Sink。
Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink 负责输出一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度。
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。
One-to-one:
stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖
Redistributing:
stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖
图中:A4 代表 A任务有4个,C2表示C任务2个,以此类推
taskmanager.numberOfTaskSlots:3 每个taskmanager设置了并行度为3
设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。
三、思考
假设当前可用的slot只有1个,任务有4个,slot不够用的时候,则会一直等待分配资源,直到超时报错。
- 解决方法1:调大集群资源(设置taskmanager的slot个数为4)
- 解决方法2:将当前的job的并行度调低
slot推荐设置为当前机器的核心数,假设cpu核心数为4核,则设置4。
slot占用数量与并行度最大的算子一致。