Spring Boot定时任务在分布式环境下的轻量级解决方案

       Spring Boot提供了一个叫做Spring Task的任务调度工具,支持注解和配置文件形式,支持Cron表达式,使用简单且功能强大。正好在项目中使用到了这个工具,并且遇到了问题,现把遇到的问题以及解决方案与大家分享,欢迎批评指正!

一、问题背景

       首先介绍一下如何在Spring Boot项目中使用定时任务(Spring Task),本文以Maven项目为例,使用dubbo框架,使用MySQL数据库和MyBatis ORM,以基于注解(annotation)的方式来实现。
       在Spring Boot项目中,我们可以很优雅的使用注解来实现定时任务,首先创建项目,导入依赖:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>
</dependencies>

       启动类中加入@EnableScheduling让注解@Scheduled生效。基于注解的@Scheduled默认为单线程,开启多个定时任务时,任务的执行和调度时机会受上一个任务执行时间的影响。庆幸地是,Spring Task支持多线程异步执行,只需简单地使用config配置类的方式添加相应的配置即可。新建一个AsyncTaskConfig类,以便使Spring Task多线程异步执行方式生效,同时对其进行相应的配置。AsyncTaskConfig类如下:

@Configuration
@EnableAsync
public class AsyncTaskConfig {
     /*
    此处成员变量应该使用@Value从配置中读取,此处仅为演示目的。
     */
    private int corePoolSize = 5;
    private int maxPoolSize = 10;
    private int queueCapacity = 5;
    
@Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.initialize();
        return executor;
    }
}

注解说明如下:
@Configuration:表明该类是一个配置类
@EnableAsync:开启异步事件的支持
       然后在定时任务的类或者方法上添加@Async,之后每一个定时任务都会跑在不同的线程中。
       OK!Spring Task就是这么简单!到此为止,定时任务在本地执行一点问题都没有!
       本以为大功告成,谁知在测试环境部署的时候出了问题!同一个定时任务被执行了多次,造成数据库中数据混乱(由于业务保密需要,具体不详述)。为什么会这样呢?突然想起原来测试环境部署在多个节点之上,每个节点都会在相同的时间执行相同的任务。因为dubbo服务部署在不同节点的JVM里面,而不同节点的JVM之间并没有通信机制,都是各自独立的,所以每个节点都会根据定时任务的配置来执行同样的任务。考虑问题还是不周全啊!
       顺便多说一句,这里强烈建议测试环境(test env)要和生产环境(product env)完全一致,这样才能尽早发现问题,以免在生产环境中造成不必要的损失,那可以真金白银哦!

二、解决方案

       前面已经找到了问题的根源,那接下来我们就要考虑如何解决这个问题。Spring Task并不是为分布式环境设计的,在分布式环境下,这种定时任务是不支持集群配置的,如果部署到多个节点上,各个节点之间并没有任何协调通讯机制,因为集群的节点之间是不会共享任务信息的,每个节点上的任务都会按时执行。
       当然可以使用比较重量级的分布式定时任务框架,比如:Elastic Job。还有一种比较直接的方案就是把定时任务分离出来,成为一个单独的服务,只在一个节点上部署。但是这样既要修改项目工程又得改变部署方式,故未采用这种方案。
       另一种想法是在全局缓存中设置一个全局锁,拿到这个锁的节点执行相应的任务。不过这样就得依赖Redis,我们还是希望项目能够比较独立的运行,减少对其他服务的依赖。
       最终我们选择了用数据库+乐观锁的方式来解决任务互斥访问的问题。大致的思路是这样的,声明一把全局的“锁”作为互斥量,哪个应用服务器拿到这把“锁”,就有执行任务的权利,未拿到“锁”的应用服务器不进行任何任务相关的操作。另外就是这把“锁”最好还能在下次任务执行时间点前失效。
       具体实现中,我们在MySQL中新建了一张表job_locks,其中每一条记录代表了一个定时任务的全局锁,同时这条记录也存储了任务当前的状态(status字段,运行还是空闲),另外每条记录还有一个version字段来标识版本,以便实现乐观锁机制。当定时任务触发的时刻,集群中的节点同时读取该条数据,将version字段的值一同读出,然后再更新该条数据,将任务状态由空闲更新为运行,同时对version值加一。提交更新的时候,判断数据库表对应记录的当前版本信息与取出来的version值进行比对,如果数据库表当前版本号与取出来的version值相等,则予以更新,否则更新失败。在这种竞争的情况下,只有一个节点可以成功更新数据库记录,我们认为它获取了全局锁,只有它可以执行定时任务,那些更新数据库记录失败的节点则认为未能获取全局锁,不再执行定时任务。任务执行完毕之后,获取全局锁的节点要释放全局锁,即将对应数据库记录的status字段由运行改为空闲,以便下次继续竞争全局锁。

三、使用方法

       本小结来对如何使用该方案进行详细介绍。
       首先在Mysql数据库中新建job_locks表,对应一个定时任务新增一条记录。

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for job_locks
-- ----------------------------
DROP TABLE IF EXISTS `job_locks`;
CREATE TABLE `job_locks` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'job名称',
  `status` tinyint(4) DEFAULT '0' COMMENT 'job状态:0-空闲,1-运行',
  `description` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'job描述',
  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  `version` bigint(20) NOT NULL DEFAULT '0' COMMENT '版本',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

-- ----------------------------
-- Records of job_locks
-- ----------------------------
BEGIN;
INSERT INTO `job_locks` VALUES (1, 'job-name', 0, 'job描述 ', '2019-05-19 21:22:38', '2019-05-19 21:22:38', 0);
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

注意事项:id为主键且自增。

       在Mybatis的Mapper XML(JobLocksMapper.xml)文件中新增如下两个update语句,我是使用Mybatis Generator(简称MBG)来生成实体类,Mapper接口类,Mapper XML文件的。

 <update id="requireLock" parameterType=" Entity.JobLocks">
    <![CDATA[
        update job_locks
        set status = 1, version=version + 1
        where id = #{id,jdbcType=INTEGER} and version =#{version,jdbcType=BIGINT} and status = 0
]]>
  </update>
  <update id="releaseLock" parameterType=" Entity.JobLocks">
    <![CDATA[
        update job_locks
        set status = 0
        where id = #{id,jdbcType=INTEGER} and status = 1
    ]]>
  </update>

注意事项:更新时主键id作为第一个查询条件,这样保证更新时使用行锁而不是表锁。因为MySQL的innoDB引擎是支持行锁的,但是行锁建立在索引之上。

       同时在Mapper接口类(JobLocksMapper.java)中增加两个接口,分别对应两个update语句,如下所示:

int requireLock(JobLocks record);
int releaseLock(JobLocks record);

       接下来在Dubbo框架内新建JobLocksService.java接口定义以及实现类JobLocksServiceImpl.java。
JobLocksService.java文件:

package <yourpackage>.;

import com.alibaba.dubbo.rpc.protocol.rest.support.ContentType;
import <yourpackage>.Entity.JobLocks;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("jobLocksService")
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.TEXT_HTML, MediaType.APPLICATION_XHTML_XML})
@Produces({ContentType.APPLICATION_JSON_UTF_8, ContentType.TEXT_XML_UTF_8})
public interface JobLocksService {

    @POST
    @Path(value = "selectByPrimaryKey")
    JobLocks selectByPrimaryKey(Integer id);

    @POST
    @Path(value = "requireLock")
    int requireLock(JobLocks record);

    @POST
    @Path(value = "releaseLock")
    int releaseLock(JobLocks record);
}

JobLocksServiceImpl.java文件:

package <yourpackage>;

import lombok.extern.slf4j.Slf4j;
import <yourpackage>.Entity.JobLocks;
import <yourpackage>.Repository.JobLocksMapper;
import <yourpackage>.Service.JobLocksService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class JobLocksServiceImpl implements JobLocksService {

    @Autowired
    JobLocksMapper jobLocksMapper;

    @Override
    public JobLocks selectByPrimaryKey(Integer id) {
        return jobLocksMapper.selectByPrimaryKey(id);
    }

    @Override
    public int requireLock(JobLocks record) {
        return jobLocksMapper.requireLock(record);
    }

    @Override
    public int releaseLock(JobLocks record) {
        return jobLocksMapper.releaseLock(record);
    }
}

       在Provider和Consumer中配置JobLocksService,通过Dubbo实现服务的调用与消费。

       最后按照下面的例子中的方式进行调用。

@Component
@Slf4j
public class ScheduledUtil {

    @Autowired
    JobLocksService jobLocksService;

    @Scheduled(cron = "0/5 * * * * *")
    @Async
    public void scheduled(){
        JobLocks jobLocks = null;
        log.info("Current Thread:" + Thread.currentThread().getName());

        
        // 查询id=1的记录,对应一个定时任务
        jobLocks = jobLocksService.selectByPrimaryKey(1);
        if(null != jobLocks && jobLocks.getStatus() == 0) {
            int result = jobLocksService.requireLock(jobLocks);
            if(result == 1) {   // get the lock successfully
                try{
                    log.info(Thread.currentThread().getName() + " got the lock of job " + jobLocks.getName());
                    
                    // 在此执行你的定时任务

                    // 用来客服集群中多个节点的时间漂移问题
                    Thread.sleep(10 * 1000);
                     
                    log.info(jobLocks.getName() + " is done!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    if(null != jobLocks)
                        jobLocksService.releaseLock(jobLocks);
                }
            }
        }   
    }
}

需要强调和注意的事项如下:

  1. requireLock和releaseLock必须成对使用;

  2. releaseLock必须放在finally语句块中,以保证锁能够被释放;

  3. 集群中的节点可能存在时间不同步的现象,所以可以酌情在定时任务执行完成之后添加一定时间的sleep,以客服节点服务器的时间漂移问题。

    2019年5月21日星期二 于团结湖瑞辰国际中心

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354

推荐阅读更多精彩内容