Redisson实现延迟队列

一、添加POM依赖

            <!--redisson-->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson-spring-boot-starter</artifactId>
                <version>3.12.5</version>
            </dependency>

二、增加redisson配置

package com.zensun.framework.config;

import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.io.IOException;

/**
 * redis配置
 *
 * @author gmk
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private int port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.database}")
    private int database;


    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() throws IOException {
        Config config = new Config();
        if (StrUtil.isNotBlank(password)) {
            config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password).setDatabase(database);
        } else {
            config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(database);
        }
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

三、编写队列服务类


import java.util.concurrent.TimeUnit;

/**
 * @Description 延迟队列服务接口 TODO
 * @Author yuqianwei
 * @Date 2020 2020/12/10 11:58
 */

public interface DelayQueueService<T> {

    /**
     * 推送数据
     *
     * @param data
     * @param queueName
     */
    void pushData(T data, String queueName);

    /**
     * 推送数据
     *
     * @param data
     * @param time
     * @param timeUnit
     * @param queueName
     */
    void pushData(T data, long time, TimeUnit timeUnit, String queueName);

    /**
     * 拉取数据
     *
     * @param queueName
     * @return
     */
    T pullData(String queueName);
}









import com.zensun.common.utils.spring.SpringUtils;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * 
 * 每一个RedisDelayQueueServiceImpl对象 是一个队列通道  多个队列通道就用多个RedisDelayQueueServiceImpl对象
 * @Description redis延迟队列服务实现类 TODO
 * @Author yuqianwei
 * @Date 2020 2020/12/10 12:01
 */

public class RedisDelayQueueServiceImpl<T> implements DelayQueueService<T> {

    private static RedissonClient redissonClient;

    static {
        redissonClient = SpringUtils.getBean(RedissonClient.class);


    }

    private final String queueName;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private RBlockingQueue<T> blockingQueue;
    private RDelayedQueue<T> delayedQueue;

    public RedisDelayQueueServiceImpl(String queueName) {
        this.queueName = queueName;
        this.blockingQueue = getBlockingQueue();
        this.delayedQueue = getDelayQueue(blockingQueue);
    }

    /**
     * 获取阻塞队列
     *
     * @return
     */
    private RBlockingQueue<T> getBlockingQueue() {
        return redissonClient.getBlockingQueue(queueName);
    }

    /**
     * 获取延迟队列
     *
     * @param blockingQueue
     * @return
     */
    private RDelayedQueue<T> getDelayQueue(RBlockingQueue<T> blockingQueue) {
        return redissonClient.getDelayedQueue(blockingQueue);
    }

    /**
     * 推送数据
     *
     * @param data
     * @param queueName
     */
    @Override
    public void pushData(T data, String queueName) {
        pushData(data, 0, TimeUnit.MILLISECONDS, queueName);
    }

    /**
     * 推送数据
     *
     * @param data
     * @param time
     * @param timeUnit
     * @param queueName
     */
    @Override
    public void pushData(T data, long time, TimeUnit timeUnit, String queueName) {
        delayedQueue.offerAsync(data, time < 0 ? 0 : time, timeUnit);
    }

    /**
     * 拉取数据
     *
     * @param queueName
     * @return
     */
    @Override
    public T pullData(String queueName) {

        T currentData = null;
        try {
            currentData = blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
        }
        logger.info("获取到的数据 : {}", currentData);

        return currentData;
    }

}


四、DEMO

生产者:

        long currentTimeMillis = System.currentTimeMillis();
        long delayStartTime = advertInfo.getStartTime().getTime() - currentTimeMillis;
        //开始时间在现在之后
            try {
                delayToStartQueueService.pushData(String.valueOf(advertInfo.getAdvertId()), delayStartTime, TimeUnit.MILLISECONDS, Constants.ADVERT_TOSTART_QUEUEKEY);
                log.info("成功推送到队列 : {}, 广告信息id : {}, 延迟时间 : {} 秒", Constants.ADVERT_TOSTART_QUEUENAME, advertInfo.getAdvertId(), delayStartTime / 1000);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("失败推送到队列 : {}, 广告信息id : {}", Constants.ADVERT_TOSTART_QUEUENAME, advertInfo.getAdvertId());
            }  

消费者:



import cn.hutool.core.util.StrUtil;
import com.zensun.business.domain.AdvertInfo;
import com.zensun.business.domain.dto.PatchAdvertInfo;
import com.zensun.business.queue.DelayQueueService;
import com.zensun.business.queue.RedisDelayQueueServiceImpl;
import com.zensun.business.service.IAdvertInfoService;
import com.zensun.common.constant.Constants;
import com.zensun.common.enums.domain.AdvertInfoEnum;
import com.zensun.common.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
 * @Description 自动更改广告开始状态定时任务 TODO
 * @Author gmk
 * @Date 2020 2021/3/17 16:50
 */

@Slf4j
@Component
public class AutoNotStartToStartAdvertInfoStatusTask implements SmartInitializingSingleton {

    @Autowired
    private IAdvertInfoService advertInfoService;

    /**
     * 在所有单例bean实例化后 进行赋值, 否则有空指针错误, 后期优化
     * 见此方法 afterSingletonsInstantiated
     */
    private DelayQueueService<String> delayQueueService;

   //定时任务是用来扫描数据内 没有被加入到队列的数据 用来兜底
    /**
     * 广告更改状态
     //@Scheduled(fixedRate = SCHEDULE_RATE)
     public void autoOffSaleGoodsSource() {
     log.info("启动广告开始自动修改状态扫描--------------->");
     try {
     long currentTimeMillis = System.currentTimeMillis();
     Date now = new Date(currentTimeMillis);
     //扫描区间 扫描按照定时任务时间 只扫描离开始时间最近的一个区间
     // (如 定时任务每10分钟执行一次 只扫描现在时间+10分钟大于等于开始时间的数据 )
     //long nextTime = SCHEDULE_RATE + currentTimeMillis;
     long nextTime = currentTimeMillis;
     Date futureTime = new Date(nextTime);
     LambdaQueryWrapper<AdvertInfo> a = new LambdaQueryWrapper<>();
     a.eq(AdvertInfo::getAdvertStatus, AdvertInfoEnum.Status.NOTSTART.getCode())
     .apply("unix_timestamp(start_time) >= {0}", currentTimeMillis / 1000)
     .apply("unix_timestamp(start_time) <= {0}", nextTime / 1000);

     List<AdvertInfo> advertInfoList = advertInfoService.list(a);

     log.info("扫描到开始时间在 : {}与 : {}之间 的广告信息 : {} 条", DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, now), DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, futureTime), advertInfoList.size());

     if (CollectionUtils.isNotEmpty(advertInfoList)) {
     advertInfoList.forEach(advertInfo -> {
     long delayTime = advertInfo.getEndTime().getTime() - currentTimeMillis;
     try {
     // delayQueueService.pushData(String.valueOf(advertInfo.getAdvertId()), delayTime, TimeUnit.MILLISECONDS, queueName);
     //log.info("推送到队列 : {}, 开始时间即将和现在时间一样的广告信息id : {}, 延迟时间 : {} 秒", queueName, advertInfo.getAdvertId(), delayTime / 1000);
     } catch (Exception e) {
     //log.error("推送到队列 : {}, 开始时间即将和现在时间一样的广告信息id : {}, 延迟时间 : {} 秒, 发生异常 : {}",
     //        queueName, advertInfo.getAdvertId(), delayTime / 1000, e.getMessage(), e);
     }
     });
     }
     } catch (Exception e) {
     log.error("自动更改广告开始状态定时任务执行异常 : {}", e.getMessage(), e);
     }

     } */

    /**
     * 启动延迟队列监听
     */
    public void listenerDelayQueueStart() {
        Thread thread = new Thread(() -> {
            log.info("启动监听{}:的数据...", Constants.ADVERT_TOSTART_QUEUENAME);
            while (true) {
                try {
                    String data = delayQueueService.pullData(Constants.ADVERT_TOSTART_QUEUEKEY);
                    log.info("取到 : {}, 的数据 : {}", Constants.ADVERT_TOSTART_QUEUENAME, data);
                    if (StrUtil.isNotBlank(data)) {
                        Date nowDate = DateUtils.getNowDate();
                        Long advertId = Long.valueOf(data);
                        AdvertInfo byId = Optional.ofNullable(advertInfoService.getById(advertId)).orElse(new AdvertInfo());
                        //消费端做幂等性 避免重复消费
                        if (byId.getAdvertStatus() != null && AdvertInfoEnum.Status.NOTSTART.getCode().equals(byId.getAdvertStatus()) && DateUtils.compareTo(nowDate, byId.getStartTime()) == 0) {
                            //修改广告的状态
                            PatchAdvertInfo patchAdvertInfo = new PatchAdvertInfo();
                            patchAdvertInfo.setAdvertId(advertId);
                            patchAdvertInfo.setAdvertStatus(AdvertInfoEnum.Status.START.getCode());
                            advertInfoService.updateAdvertInfoStatusById(patchAdvertInfo);
                            log.info("队列执行成功:{},时间为:{},广告id : {}", Constants.ADVERT_TOSTART_QUEUENAME, DateUtils.getTime(), advertId);
                        } else {
                            log.info("队列执行失败:{},时间为:{},广告id : {},未获取到数据或者数据不符合消费规则", Constants.ADVERT_TOSTART_QUEUENAME, DateUtils.getTime(), advertId);
                        }

                    }
                } catch (Exception e) {
                    log.info("队列执行失败:{},时间为:{}", Constants.ADVERT_TOSTART_QUEUENAME, e.getMessage());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }
        });
        thread.setName(Constants.ADVERT_TOSTART_QUEUENAME + "-线程");
        thread.setDaemon(true);
        thread.start();
    }

    @Override
    public void afterSingletonsInstantiated() {
        delayQueueService = new RedisDelayQueueServiceImpl(Constants.ADVERT_TOSTART_QUEUEKEY);
        listenerDelayQueueStart();
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容