一个结合数据库的任务队列管理方案(iOS实现)

前言

在开发中如果碰到需要执行一些耗时比较长的任务,但是又要保证任务不能丢失,比如执行过程中由于某种原因app发生了crash,需要在下次app启动后的适当时机重新执行任务的情况,该如何解决?

解决方案

  • 假设我们需要在后台执行的任务类型有上传图片的任务A,发送聊天消息的任务B,视频编辑处理的任务C,所有耗时比较久并且重要性很高,需要保证不能丢失的任务都可以抽象为一个TaskModel。这个TaskModel保留了任务执行需要的基本信息,以及用于队列管理的必要属性。方案的主要思路就是维护一个用于管理TaskModel的串行任务队列TaskQueueService(后面简称为TaskQueue),这个TaskQueue结合数据库对外提供管理TaskModel的各类方法,比如AddTask等,在一个任务需要被执行时,就会通过TaskQueue的add方法添加到队列中,并且同时将TaskModel持久化到数据库中,保证任务不会丢失。然后TaskQueue根据自己的某种规则从数据库中取出TaskModel,对任务进行调度执行。

类结构说明

  • 我们抽象出任务的基类为TaskModel,需要执行的具体任务类型A、B、C为继承TaskModel的类TaskModelA、TaskModelB、TaskModelC,然后在各自的类中添加业务需要的基本信息,以及根据需要重写基类的方法。TaskModel结构如下类图所示:


    TaskModel.png

taskID为自增Id,当task被添加到队列,写入数据库中时会自动加1。
status表示任务执行的状态,是一个枚举值,枚举类型分为init、running、suspend、finish、fail、remove六种状态,在taskmodel刚被写入数据中时是init的状态,在taskModel执行失败后会将状态置为fail。
priority表示Task的优先级,默认值为low,在taskQueue中可以根据task的优先级对高优先级的任务优先调度执行。
customID用于对task做某种标记,方便从数据库查询。
className存储具体任务的类型如TaskModelA。
data内存储TaskModel归档后的data数据
runCount用于存储task执行的次数,可以用于控制重试次数

run()方法是子类继承TaskModel后必须实现的方法,里面写任务执行的逻辑
prepareForAddToQueue()方法是在run之前会执行的方法,如果重写了该方法,那么在taskQueue执行task之前会先调用prepare方法,返回为yes才会继续执行task的run方法,prepare方法可以用于对task的一些校验。
retryNextTime()方法通过调用taskQueue的retryTask方法将自身的status重置为init状态,那么在taskQueue执行接下来的任务时,就会重新执行到该task。
suspend()方法是将自身的status改为suspend挂起状态,那么taskQueue在执行接下来的task时,由于只会取status为init的task执行,就不会执行到suspend状态的task。当需要重新执行已挂起的task时,调用retry方法就可以重新将该task添加到执行队列中。

  • TaskQueueService是我们维护的任务队列,它是一个单例对象。TaskQueueService的结构如下所示:


    TaskQueueService.png

taskSignal用于在task执行改变状态时对外发送信号,在对应的controller中可以通过taskSignal传出的task做一些ui或者业务逻辑。
runningTask表示当前队列正在执行的taskModel
suspendTasks保存了所有被挂起的taskModel

  • runNextTask()方法为TaskQueue最核心的方法,该方法中用异步执行通过某种规则从数据库中取出的最合适的taskModel。

架构图

主要架构图.png
  • 如上图所示,假设所有的task都为相同优先级,TaskQueue的runNextTask方法中取从数据库筛选出来status为init状态并且根据taskId排序,拿到最先进入的一个task去执行。在task执行完成后,又会触发runNextTask方法,从而继续从数据库读取下一个最合适的taskModel去执行任务。

实现

TaskModel
  • 首先TaskModel是需要存储在数据库中的,结合上一章WCDB的使用,我们将TaskModel继承自RSModel类,以支持数据库写入。TaskModel的.h文件十分简洁,主要是一些关键属性的暴露和task操作方法的抽象:
#import "RSModel.h"
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelStatus) {
    RSTaskQueueTaskModelStatusInit,
    RSTaskQueueTaskModelStatusRunning,
    RSTaskQueueTaskModelStatusSuspend,
    RSTaskQueueTaskModelStatusFinish,
    RSTaskQueueTaskModelStatusFail,
    RSTaskQueueTaskModelStatusRemove,
};
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelPriority) {
    RSTaskQueueTaskModelPriorityLow,
    RSTaskQueueTaskModelPriorityMiddle,
    RSTaskQueueTaskModelPriorityHigh,
};
@interface RSTaskQueueTaskModel : RSModel
@property (nonatomic, assign) NSInteger taskId;
@property (nonatomic, assign) RSTaskQueueTaskModelStatus status;
@property (nonatomic, assign) RSTaskQueueTaskModelPriority priority;
@property (nonatomic, strong) NSString *customId;
@property (nonatomic, strong) NSString *customType;
@property (nonatomic, strong) NSString *className;
@property (nonatomic, strong) NSData *data;
@property (nonatomic, assign) NSInteger runCount;
-(void)run;//任务运行主入口,子类需要实现它
-(void)suspend;//执行异步操作时调用,可以挂起当前任务,防止任务队列被阻塞
-(void)pop;//任务执行成功需要显式调用,将任务从队列中移除
-(void)fail;//任务执行失败需要显式调用, 修改任务的状态
-(void)retryNextTime;//重新入队,等待重试
-(BOOL)prepareForAddToQueue;//任务被插入任务队列前,自动调用,子类可以重载它。返回失败则任务不用被加入任务队列
-(BOOL)remove;//从任务队列中移除任务
@end

TaskModel的.m文件需要定义类文件中绑定到数据库表的字段以及主键的设置、默认值的设置以及约束等。

#import "RSTaskQueueTaskModel+WCTTableCoding.h"
#import "RSTaskQueueTaskModel.h"
#import <WCDB/WCDB.h>
#import "RSDBService.h"
#import "RSTaskQueueService.h"
#import <YYModel.h>

@implementation RSTaskQueueTaskModel

WCDB_IMPLEMENTATION(RSTaskQueueTaskModel)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, taskId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customType)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, className)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, data)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, runCount)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, status, RSTaskQueueTaskModelStatusInit)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, priority, RSTaskQueueTaskModelPriorityLow)
WCDB_PRIMARY_ASC_AUTO_INCREMENT(RSTaskQueueTaskModel, taskId)
//设置status和priority的默认值,以及taskId自增
-(instancetype)init {
    self = [super init];
    if (self) {
        static dispatch_once_t token;
        dispatch_once(&token, ^{
            [RSTaskQueueTaskModel createDBTable];
        });
        self.isAutoIncrement = YES;
        self.runCount = 0;
    }
    return self;
}
-(instancetype)initWithCoder:(NSCoder *)aDecoder {
    self = [super initWithCoder:aDecoder];
    if (self) {
        
    }
    return self;
}

+(void)createDBTable {
    if ([[RSDBService db] createTableAndIndexesOfName:NSStringFromClass([RSTaskQueueTaskModel class]) withClass:[RSTaskQueueTaskModel class]]) {
        NSLog(@"creat table RSTaskQueueTaskModel success");
    } else {
        NSLog(@"creat table RSTaskQueueTaskModel fail");
    }
}

-(BOOL)prepareForAddToQueue {
    self.className = NSStringFromClass([self class]);
    self.data = [self yy_modelToJSONData];
//这里使用YYModel将model转换为data存入数据库
    return YES;
}

-(void)run {
    
}
-(void)suspend {
    [[RSTaskQueueService shareInstance] suspendTask:self];
}
-(void)pop {
    [[RSTaskQueueService shareInstance] popTask:self];
}
-(void)retryNextTime {
    [[RSTaskQueueService shareInstance] retryTask:self];
}

-(void)fail {
    [[RSTaskQueueService shareInstance] failTask:self];
}
-(BOOL)remove {
    return [[RSTaskQueueService shareInstance] removeTask:self];
}
//以上的操作方法实现实际上就是调用TaskQueue的方法,传入参数为本身,所以重点还是在TaskQueue对任务的调度。TaskModel只是一个支持写入数据库的对Task抽象的Model
@end
TaskQueueService

TaskQueueService的.h基本和类图中的属性方法一致,按照实际需求多写了几个从数据库查询taskModel的接口。

#import <Foundation/Foundation.h>
#import "RSTaskQueueTaskModel.h"
@interface RSTaskQueueService : NSObject
@property (nonatomic, strong) RACSubject *taskSignal;
@property (nonatomic, strong)  RSTaskQueueTaskModel *runingTask;
@property (nonatomic, strong)  NSMutableArray<RSTaskQueueTaskModel *> *suspendTasks;
+(RSTaskQueueService *)shareInstance;

-(BOOL)addTask:(RSTaskQueueTaskModel *)task;
-(BOOL)retryTask:(RSTaskQueueTaskModel *)task;
-(void)runNextTask;
-(void)popTask:(RSTaskQueueTaskModel *)task;
-(BOOL)failTask:(RSTaskQueueTaskModel *)task;
-(BOOL)removeTask:(RSTaskQueueTaskModel *)task;
//-(void)finishTaskWaitForNextTime:(RSTaskQueueTaskModel *)task;
-(BOOL)suspendTask:(RSTaskQueueTaskModel *)task;
-(void)resetAllTasks;

-(void)retryAllFailTasks;

- (BOOL)deleteTaskWithClassName:(NSString *)className customId:(NSString*)customId;

-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className customId:(NSString*)customId;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className status:(RSTaskQueueTaskModelStatus)status;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className;

-(BOOL)isAllTaskFinish;
@end

在.m中,需要实现taskQueue的单例初始化方法,通过调用[RSTaskQueueService shareInstance]来获取taskQueue。

+(RSTaskQueueService *)shareInstance {
    static dispatch_once_t once;
    static id sharedInstance;
    dispatch_once(&once, ^{
        sharedInstance = [[RSTaskQueueService alloc] init];
    });
    return sharedInstance;
}

-(instancetype)init {
    self = [super init];
    if (self) {
        self.runingTask = nil;
        self.taskSignal = [RACSubject subject];
        self.suspendTasks = [[NSMutableArray alloc] init];
    }
    return self;
}

在一个任务TaskModel需要被执行是,会调用TaskQueue的addTask方法,如果该任务是符合执行规定的,那么就会将任务写入TaskModel数据库,同时对外发送一个taskSignal信号,以同步当前task的状态,然后调用runNextTask方法去做任务的调度执行。

-(BOOL)addTask:(RSTaskQueueTaskModel *)task {
    if ([task prepareForAddToQueue]) {
        @synchronized(self) {
            BOOL result = [[RSDBService db] insertObject:task into:NSStringFromClass([RSTaskQueueTaskModel class])];
            if (result) {
                [self.taskSignal sendNext:task];
                [self runNextTask];
            }
            return result;
        }
    }
    return NO;
}

在runNextTask方法中,首先是开辟了一个异步线程,然后判断是否有task正在执行,如果有的话就返回,没有的话从数据库中读取最合适的taskModel来跑。如何判断是否是最合适的筛选条件,也就是task调度的方法了。在这里选取的是status为init并且taskId最小,也就是最先加入的任务来执行。

-(void)runNextTask {
    [RSUtils dispatch_async_background:^{
        @synchronized(self) {
            if (self.runingTask) {
                //有任务在执行中
                return;
            }
            NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusInit orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
            if ([tmp count] > 0) {
                RSLogInfo(@"RSTaskQueueService task count:%ld", tmp.count);
                RSTaskQueueTaskModel *task = [tmp firstObject];
                Class clazz = NSClassFromString(task.className);
                RSTaskQueueTaskModel * object = [[clazz alloc] init];
                [object yy_modelSetWithJSON:task.data];
                [object setTaskId:task.taskId];
                [object setStatus:RSTaskQueueTaskModelStatusRunning];
                object.runCount = object.runCount + 1;
               BOOL dbResult = [[RSDBService db] updateRowsInTable:DBTableName(RSTaskQueueTaskModel) onProperties:{RSTaskQueueTaskModel.status, RSTaskQueueTaskModel.runCount} withObject:object where:RSTaskQueueTaskModel.taskId==object.taskId];
                if (dbResult) {
                    self.runingTask = object;
                    [self.taskSignal sendNext:object];
                    RSLogInfo(@"RSTaskQueueService run taskId:%d", object.taskId);
                    [object run];
                } else {
                    RSLogError(@"RSTaskQueueService update task status fail taskId:%d", object.taskId);
                }
            } else {
                RSLogInfo(@"RSTaskQueueService is empty");
            }
        }
    }];
}

resetAllTasks是将所有未完成的任务重置的方法,主要用于app初始化的时候调用,以重置之前未执行成功的tasks。

-(void)resetAllTasks {
    @synchronized(self) {
        self.runingTask = nil;
        [self.suspendTasks removeAllObjects];
        RSTaskQueueTaskModel *task = [RSTaskQueueTaskModel new];
        task.status = RSTaskQueueTaskModelStatusInit;
        [[RSDBService db] updateRowsInTable:NSStringFromClass([RSTaskQueueTaskModel class]) onProperty:RSTaskQueueTaskModel.status withObject:task where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusRunning||RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusSuspend];
    }
}

retryAllFailTasks方法将所有执行失败的task重新执行,原理还是将status状态由fail改为init状态写入数据库,下一次taskQueue开始run的时候就会考虑执行到这些task了。

-(void)retryAllFailTasks {
    NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusFail orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
    for (RSTaskQueueTaskModel *task in tmp) {
        Class clazz = NSClassFromString(task.className);
        RSTaskQueueTaskModel * object = [[clazz alloc] init];
        [object yy_modelSetWithJSON:task.data];
        [object setTaskId:task.taskId];
        [object setStatus:task.status];
        [object retryNextTime];
    }
}

其他的方法就不列出了,代码都差不多,核心点还是在于结合数据库修改taskModel的状态,然后taskQueue在执行runNextTask的时候就会自动执行最合适的task。

结束

其实整个方案的架构并不复杂,通俗易懂,主要是在于实现以及解决应用中的问题。方案是可以根据需求随时修改的,没有最好的方案,只有最合适的方案。当然文中的taskQueueService还有更多优化的空间,有不合理的地方大家可以提出,一起交流,共同进步。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容