基于典型的生产者-消费者方案。 (BlockingQueue)可以安全地与多个生产者和多个消费者一起使用 生产者负责数据的生产,而消费者则
负责
数据的消费。
如图所示 工人负责生产数据, 超市就像是消费模式 消费者从工厂获取商品 生产者和消费者之间存在一定比例。 它另外支持以下操作:在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用。但还支持{@link java.util.Collection}接口。
作者重点使用 LinkedBlockingQueue
1.基于链表的阻塞队列,主要的核心方法如下,这些方法都支持中断,其中put方法和take方法我们都很熟悉的了,另外offer和poll两个方法其实对应的是put和take两个方法提供了阻塞超时机制
2.LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量
@PostConstruct
PostConstruct批注用于需要依赖注入完成以执行任何初始化之后要执行的方法上。在类投入使用之前必须调用此方法。所有支持依赖注入的类都必须支持该释。即使该类不要求注入任何资源,也必须调用用PostConstruct注释的方法。此注释只能注释一种方法。应用PostConstruct批注的方法必须满足以下所有条件:
1.除了拦截器的情况外,该方法不得具有任何参数,在这种情况下,该方法将采用Interceptors规范定义的InvocationContext对象。2在拦截器类上定义的方法必须具有以下签名之一
2.1void <METHOD>(InvocationContext) 2.2 Object <METHOD>(InvocationContext) throws Exception
注意:PostConstruct拦截器方法不得引发应用程序异常,但如果除生命周期事件之外,同一拦截器方法还对业务或超时方法进行了干预,则可以声明它引发已检查的异常,包括java.lang.Exception。如果PostConstruct拦截器方法返回值,则容器将忽略它
3.在非拦截器类上定义的方法必须具有以下签名:
void <METHOD>
4.应用PostConstruct的方法可以是公共的,受保护的,私有的或私有的
5.除了应用程序客户端,该方法一定不能是静态的
6.方法可能是最终的
7.如果该方法抛出未经检查的异常,则该类不得投入使用,除非是在EJB可以处理异常甚至从异常中恢复的EJB的情况下。
以上是源码的翻译
总结
1.因为当调用构造函数时,bean还没有初始化-即没有注入依赖项。在@PostConstruct方法完全初始化bean,您可以使用依赖项。因为这是保证在bean生命周期中只调用一次此方法的契约。一个bean可能会在其内部工作过程中被容器多次实例化,但它保证@PostConstruct只会被调用一次。
2.要在依赖加载后,对象使用前执行,而且只执行一次
实现来了~~~~ 上代码
在Service编写
void recordJob(User job);
void delJob(User job);
@PostConstruct
void init();
@PostConstruct
void delInit();
在ServiceImpl中写
//定义一个容量为10000的阻塞队列,BlockingQueue线程安全可以多个生产者同时put
private BlockingQueuedataQueue =new LinkedBlockingQueue(10000);
//定义一个容量为10000的阻塞队列,BlockingQueue线程安全可以多个生产者同时put
private BlockingQueuedelDataQueue =new LinkedBlockingQueue(10000);
private Listlist =new ArrayList();
private ListdelList =new ArrayList();
//put任务的方法,供生产者调用
@Override
public void recordJob(User job) {
try {
dataQueue.put(job);
}catch (InterruptedException e) {
log.info("dataQueue,批量更新Job入队列异常");
Thread.currentThread().interrupt();
}
}
@Override
public void delJob(User job) {
try {
log.info("delDataQueue,批量删除Job入队列成功!Job --> Id :" + job.getUserId());
delDataQueue.put(job.getUserId().longValue());
}catch (InterruptedException e) {
log.info("delDataQueue,批量删除Job入队列异常");
Thread.currentThread().interrupt();
}
}
@PostConstruct
@Override
public void init() {
Thread thread =new Thread(() -> {
log.info("启动批量(新增或者更新)守护线程,启动时间{}", new Date(System.currentTimeMillis()));
while (Boolean.TRUE) {
User poll =null;
boolean pollTimeOut =false;
long startTime;
long endTime;
try {
// poll时设置超时时间为2秒
poll =dataQueue.poll(2, TimeUnit.SECONDS);
}catch (InterruptedException e) {
log.info("批量更新Job异常");
Thread.currentThread().interrupt();
}
if (null != poll) {
// poll到任务添加到List中
list.add(poll);
}else {
// poll超时,设置超时标志位
pollTimeOut =true;
}
// 如果任务List等于5000或poll超时且List中还有任务就批量更新
if (list.size() ==300 ||
(pollTimeOut && !CollectionUtils.isEmpty(list))) {
startTime = System.currentTimeMillis();
saveOrUpdateBatch(list);
log.info("Job任务批量更新{}条任务,耗时{}毫秒", list.size(),
System.currentTimeMillis() - startTime);
list.clear();
}
}
});
}
@PostConstruct
@Override
public void delInit() {
Thread thread =new Thread(() -> {
log.info("启动批量删除守护线程,启动时间{}", new Date(System.currentTimeMillis()));
while (Boolean.TRUE) {
Long poll =null;
boolean pollTimeOut =false;
long startTime;
long endTime;
try {
// poll时设置超时时间为2秒
poll =delDataQueue.poll(2, TimeUnit.SECONDS);
}catch (InterruptedException e) {
log.info("批量删除Job异常");
Thread.currentThread().interrupt();
}
if (null != poll) {
// poll到任务添加到List中
delList.add(poll);
}else {
// poll超时,设置超时标志位
pollTimeOut =true;
}
// 如果任务List等于5000或poll超时且List中还有任务就批量更新
if (delList.size() ==300 ||
(pollTimeOut && !CollectionUtils.isEmpty(delList))) {
startTime = System.currentTimeMillis();
removeByIds(delList);
log.info("Job任务批量删除{}条任务,耗时{}毫秒", delList.size(),
System.currentTimeMillis() - startTime);
delList.clear();
}
}
});
thread.setName("job-batchUpdate-deamon");
// 设置启动的线程为守护线程 直到jvm停了才停止
thread.setDaemon(true);
thread.start();
}
~~ 在需要的地方调用 recordJob () delJob() 方法存入队列