Oozie 使用 CallableQueueService 来异步执行操作;
特点:
Callables can be queued for immediate execution or for delayed execution (some time in the future).
加入执行队列的任务可能是可以立即被吊起的,也可能是未来某个时间才触发的。
Callables are consumed from the queue for execution based on their priority.
执行线程池根据 任务的执行时间和任务的优先级别来选取任务吊起。
When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops queuing callables.
执行线程池的任务队列大小可配置,当到达队列最大值,线程池将不再接收任务。
参数说明:
int maxCallableConcurrency:并发数控制
Map<String, AtomicInteger> activeCallables:每种类型的任务的实时并发数;
Map<String, Date> uniqueCallables:标识在队列中的任务,并且还没有开始执行完成的,任务加入队列前,需要和改map的任务做重复检查,用来防止任务的重复提交,执行。
int interruptMapMaxSize:
ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap:
int queueSize:任务队列大小
ThreadPoolExecutor executor:任务执行器
线程池选取的队列是oozie自定义的队列 PriorityDelayQueue:
特点:
根据队列中元素的延时时间以及其执行优先级出队列:
实现策略:
PriorityDelayQueue 中为每个优先级别的任务设置一个 延时队列 DelayQueue
因为使用的是jdk自带的延时队列 DelayQueue,可以保证的是如果任务在该队列中的延时时间满足条件,我们
通过poll()方法即可得到满足延时条件的任务,如果 poll()得到的是null,说明该队列的中任务没有满足时间条件的任务。
如何编排多个优先级的队列:
每次从PriorityDelayQueue去选取任务,都优先从最高优先级的队列来poll出任务,如果最高的优先级队列中没有满足条件的任务,则次优先级队列poll出任务,如果仍未获取
将按照队列优先等级以此类推。
饿死现象:假如高优先级中的任务在每次获取的时候都满足条件,这样容易将低优先级的队列中满足条件的任务活活饿死,为了防止这种情况的产生,在每次选取任务之前,遍历
低优先级队列任务,如果任务早已经满足出队列条件,如果超时时间超过了我们设定的最大值,我们会为这个任务提高优先级,将这个任务优先级加一,添加到上个优先级队列中进行
排队。
优化队列 PollablePriorityDelayQueue:
特点:
在从队列中选取任务的时候,先判断满足时间的任务是否满足并发等限制,如果满足再从队列中取出,而不是像PriorityDelayQueue那样,先取出如果不满足并发等限制,再将该任务重新放置回去。
任务类型:
使用线程池异步执行任务,任务和任务之间是无序的,针对具体的业务场景,可能执行的单元是需要串序执行的。oozie中封装了 CompositeCallable 和 一般的 XCallable的任务
类型,前者是XCallable的一个集合,它能保证的是这个集合里面的XCallable是顺序执行的。