Java中的延迟任务与周期任务的执行方式(读书笔记)

Timer

代码实现

此处的代码是《Java并发编程实战》中的源码

package edu.wyn.concurrent.chapter6;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;

public class OutofTime {

    public static void main(String[] args) throws InterruptedException {
        Timer timer = new Timer();

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                throw new RuntimeException("test");
            }
        }, 1);
        TimeUnit.SECONDS.sleep(1);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                throw new RuntimeException("test");
            }
        }, 1);
        TimeUnit.SECONDS.sleep(5);
    }
}

运行结果

image.png

存在问题

1、Timer在执行所有的定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将会破坏其他TimerTask的定时精确性(可能会快速连续执行或者丢弃任务)。
2、TimerTask如果抛出了一个未检查的异常,Timer会终止终止定时线程

ScheduledThreadPoolExecutor

使用代码

改造原书中的代码

package edu.wyn.concurrent.chapter6;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.schedule((Runnable) () -> {
            System.out.println("1-运行线程为:" + Thread.currentThread().getName());
            throw new RuntimeException("test");
        }, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(1);
        executor.schedule((Runnable) () -> {
            System.out.println("2-运行线程为:" + Thread.currentThread().getName());
        }, 1, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(5);
        executor.shutdown();
    }
}

运行结果

image.png

即使第一个任务出现了异常,第二个任务还是正常开始执行了

利用DelayQueue自定义实现

此处的代码是参考材料中的,并非笔者实现,写在此处主要是想自己记录下,如有侵权请联系笔者删除

package edu.wyn.concurrent.chapter6;


import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

class DelayedTask implements Runnable, Delayed {

    private static int counter = 0;

    private final int id = counter++;


    //延迟时间
    private final int delta;

    //触发时间
    private final long trigger;

    protected static List<DelayedTask> sequence = new ArrayList<>();

    public DelayedTask(int delayMilli) {
        this.delta = delayMilli;
        //转换为nanoTime
        this.trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delayMilli, TimeUnit.MILLISECONDS);
        sequence.add(this);
    }

    @Override
    public void run() {
        System.out.println(this);
    }

    @Override
    public String toString() {
        return String.format("[%1$-4d]", delta) + "task=" + id;
    }

    public String summary() {
        return "(" + id + ":" + delta + ")";
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(trigger-System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        DelayedTask that = (DelayedTask) o;
        if (trigger < that.trigger) {
            return -1;
        }
        if (trigger > that.trigger) {
            return 1;
        }
        return 0;
    }

    public static class EndSentinel extends DelayedTask {
        private ExecutorService executorService;
        public EndSentinel(int delay, ExecutorService executorService) {
            super(delay);
            this.executorService = executorService;
        }

        @Override
        public void run() {
            for (DelayedTask task : sequence) {
                System.out.print(task.summary() + " ");
            }
            System.out.println();
            System.out.println(this + "调用shutdownNow");
            executorService.shutdownNow();
        }
    }
}

class DelayedTaskConsumer implements Runnable {

    private final DelayQueue<DelayedTask> queue;

    DelayedTaskConsumer(DelayQueue<DelayedTask> queue) {
        this.queue = queue;
    }


    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                queue.take().run();
            }
        } catch (InterruptedException e) {
            System.out.println("DelayedTaskConsumer消费结束");
            throw new RuntimeException(e);
        }
    }
}
public class DelayedQueueDemo {

    public static void main(String[] args) {
        Random random = new Random(400);
        ExecutorService executorService = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue = new DelayQueue<>();
        for (int i = 0; i < 20; i++) {
            queue.add(new DelayedTask(random.nextInt(5000)));
        }
        queue.add(new DelayedTask.EndSentinel(5000, executorService));
        executorService.execute(new DelayedTaskConsumer(queue));
    }
}

参考材料

Java并发编程实战-第二部分 结构化并发应用程序

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容