场景: 我们需要创建一个job,这个job是异步执行的,且任务有多个状态,每个状态需要不同的处理。
实现: 在服务里创建一个生产消费模型,job在创建后,设置初始状态,并放在队列里由消费者消费,处理业务逻辑。消费成功后,更改状态再次放入队列中,等待下一次消费。
实现一: wait && notify
最朴素也是最简单的方案:wait && notify
机制 。
队列中有数据就阻塞生产者线程,消费者消费后就唤醒生产者。反之,队列中没有数据就阻塞消费者线程,生产者添加数据后唤醒消费者线程。wait && notify机制虽然足够简单,但是不够灵活,并发效率也不佳,不能满足实际场景需求。
// 存储生产者产生的数据
static List<String> list = new ArrayList<>();
public static void main(String[] args) {
new Thread(() -> {
while (true) {
synchronized (list) {
// 判断 list 中是否有数据,如果有数据的话,就进入等待状态,等数据消费完
if (list.size() != 0) {
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// list 中没有数据时,产生数据添加到 list 中
try {
Thread.sleep(5000);
list.add(UUID.randomUUID().toString());
list.notify();
System.out.println(Thread.currentThread().getName() + list);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "生产者线程 A ").start();
new Thread(() -> {
while (true) {
synchronized (list) {
// 如果 list 中没有数据,则进入等待状态,等收到有数据通知后再继续运行
if (list.size() == 0) {
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 有数据时,读取数据
System.out.println(Thread.currentThread().getName() + list);
list.notify();
// 读取完毕,将当前这条 UUID 数据进行清除
list.clear();
}
}
}, "消费者线程 B ").start();
}
实现二: BlockingQueue
BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中。
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
for(int i =1;i<=10;i++){
queue.put("第"+i+"条消息");
}
System.out.println("当前队列还有"+queue.size()+"消息");
new Thread(()->{
try {
System.out.println("睡眠中");
Thread.sleep(10000);
for(int i =1;i<=10;i++){
queue.put("新的消息:第"+i+"条消息");
}
} catch (InterruptedException e) {
}
}).start();
int nThreads = 1 ;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
for(int i = 0 ;i<nThreads;i++){
executorService.submit(()->{
while (true){
System.out.println("消费者");
String poll = null;
try {
poll = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者:"+poll==null?"":poll);
System.out.println(Thread.currentThread().getName());
}
});
}
}
demo中通过put
方法生产数据,take
方法消费数据。这个两个方法都有阻塞线程的效果,我们来看下:
2.1 put()
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 以可中断的形式获取put锁
putLock.lockInterruptibly();
try {
// 与offer(e, timeout, unit)相比,采用了无限等待的方式
while (count.get() == capacity) {
// 当执行了移除元素操作后,会通过signal操作来唤醒notFull队列中的一个线程
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
2.2 take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
// 出队,并自减
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 只要队列还有元素,就唤醒一个take操作
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
// 如果在队列满的情况下移除一个元素,会唤醒一个put操作
signalNotFull();
return x;
}
这里能看到take其实就是put的一个翻版。这里也不难发现wait && notify机制实际上也是在模拟实现一个BlockingQueue
。使用BlockingQueue
不枉为最佳选择。
实战:
利用@PostConstruct
在服务启动时加载生产者/消费者线程。exportRunnerListener
循环执行asyncJobRunner()
消费队列,使用take()方法阻塞线程,避免资源浪费。执行消费者之前,我们需要在db里面捞一下未完成的任务,避免因服务重启造成的任务丢失。
private final ExecutorService executorService;
private static final LinkedBlockingQueue<String> jobQueue = new LinkedBlockingQueue<>();
...
@PostConstruct
public void NeoExportRunner() {
int nThreads = 3;
//每次重启找出未完成的job
exportJobRepository.findByStatusIn(Arrays.asList(new String[]{PENDING, LOADED, UPLOADED, NOTIFIED}))
.orElse(new ArrayList<>()).forEach(
//加入队列
o -> jobQueue.add(o.getId())
);
this.executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
executorService.execute(() -> {
exportRunnerListener();
});
}
}
public void exportRunnerListener() {
while (true) {
log.info("asyncJobRunner is working... {}", Thread.currentThread().getName());
try {
//消费者
asyncJobRunner();
} catch (Exception e) {
log.error("asyncJobRunner is error...{}",e);
}
}
}
asyncJobRunner()
是一个任务调度器。拿到队列里的消息,根据状态来处理不同的业务逻辑。每个job执行完后,变更任务状态,重新写回队列,下次消费时进行下一个状态的处理,从而实现状态扭转。
public static void asyncJobRunner() throws InterruptedException {
//消费jobQueue中的数据
Optional.ofNullable(jobQueue.take()).flatMap(id -> exportJobRepository.findById(id)).ifPresent(job -> {
switch (job.getStatus()) {
case PENDING:
loadData(job);
log.info("Job had been loaded.. job -> " + job.toString());
break;
case LOADED:
upload(job);
log.info("Job had been uploaded.. job -> " + job.toString());
break;
case UPLOADED:
notify(job);
log.info("Job had been notify.. job -> " + job.toString());
break;
case FAILED:
retry(job);
log.info("Job had been failed.. will be retry. .job -> " + job.toString());
break;
case NOTIFIED:
finish(job);
log.info("Job had been finished. . job -> " + job.toString());
break;
case FINISHED:
log.warn("Finished job should not appear in job queue, check for logical error. job -> " + job.toString());
break;
case CANCELED:
log.info("Job had been canceled. Nothing to do here. job -> " + job.toString());
break;
default:
log.error("Unrecognized job status. job -> " + job.toString());
}
});
}
这里说一下retry机制,当任务在某个状态发生异常,并未执行成功,我们来设置一个retry机制在任务FAILED
的时候进行补偿。某个状态异常时,将当前状态保存在LastStatus中并设置当前状态为FAILED
,同时记录retry的次数。这样以来下次我们拿到这个job的状态是FAILED
,在调用retry方法时把失败时的状态在写回去丢到队列里,下一次就可以继续执行了。
public static void retry(ExportJob job) {
//判断重试次数是否<最大重试次数
if (job.getRetry() < maximumRetry) {
job.setStatus(job.getLastStatus());
job.setRetry(job.getRetry() + 1);
jobQueue.add(job.getId());
} else {
//save db -> error status
log.error("Max retry exceeds. job -> " + job.toString());
}
}
addJob() 和 cancelJob() 提供给我们的业务代码调用,用来创建任务和取消任务,这里的取消任务做不到实时性,具体代码需要根据实际业务场景进行调整。
public static String addJob(String id, String type, String channel) {
ExportJob exportJob = ExportJob.builder()
.channel(channel)
.jobId(id)
//初始化任务
.status(PENDING)
.createAt(sdf.format(System.currentTimeMillis()))
.type(type)
.id(UUID.randomUUID().toString().replaceAll("-", ""))
.retry(0)
.build();
ExportJob saved = exportJobRepository.saveAndFlush(exportJob);
jobQueue.add(saved.getId());
return saved.getId();
}
public static String cancelJob(String id) {
canceled.add(id);
exportJobRepository.findById(id).ifPresent(job -> {
//取消任务
job.setStatus(CANCELED);
exportJobRepository.saveAndFlush(job);
});
return id;
}