java线程、线程池的使用

一、如何创建线程

1、将类声明为Thread的子类,重写Thread类的run方法,使用子类的实例调用start()方法启动线程。
class MyThread extends Thread {
  public void run(){
      //code
  }
}

MyThread m = new MyThread();
m.start();
2、让类实现Runnable接口,该类实现run方法,然后在Thread构造方法中传入Runnable接口的实现类,使用Thread对象调start()方法启动线程。
class MyThread implements Runnable {
  public void run(){
      //code
  }
}

MyThread m = new MyThread();
//new Thread(m).start();
Thread t = new Thread(m);
t.start();

实现Runnable接口,避免了继承Thread类的单继承局限性。

3、匿名内部类的方式
public class Test {
  public static void main(String[] args){
    new Thread(new Runnable(){
      public void run(){
        //code
      }
    }).start();
  }
}
4、让类实现Callable接口,该类实现call()方法,使用FutureTask类来包装Callable实现类的对象,然后再Thread构造方法中传入FutureTask对象,然后再使用Thread对象调start()方法启动线程。
public class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "data";
    }
}

public class Test{
    public static void main(String[] args){
        Callable<String> myThread = new MyThread();
        FutureTask<String> ft = new FutureTask<>(myThread);
        Thread t = new Thread(ft);
        t.start();
        String s = ft.get();
        System.out.println("callable线程返回值:" + s);
    }
}
打印结果:callable线程返回值:data

二、线程池

jdk中线程池通过线程池工厂类创建,再通过线程去执行任务方法。

  • Executors:线程池创建工厂类
  • ExecutorService:线程池类
  • Future:用来记录线程任务执行完毕后产生的结果
1、线程池的使用

使用Executors中的newFixedThreadPool()静态方法得到线程池对象,然后调用submit()方法提交线程执行任务。

submit(Runable task):下面的例子是提交一个Runnable的任务,返回的Future,该Future的get()方法在成功完成时返回null。

public class MyThread implements Runnable{ 
    //run方法 
}

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(5);
        es.submit(new MyThread());
    }
}

submit(Runable task,T result):下面的例子是提交一个Runnable任务,返回的Future,该Future的get()方法在成功完成时将会返回给定的结果。 可以将传入的参数在run里处理并返回结果。

public class Data {
    int num = 1;
    String str = "threadData";
    //省略
}

public class MyThread implements Runnable{ 
    Data data;
    public void run(){
        Data.setStr("data")
    }
}

public class ThreadPoolTest {
    public static void main(String[] args)  throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(5);
        Future<Data> runableResult = es.submit(new MyThread(data), data);
        System.out.println(runableResult.get());
    }
}
打印结果:Data{num=1, str='data'}

submit(Callable<T> task):下面的例子是提交一个Callable的任务,返回一个表示该任务的Future,然后通过Future中的get()方法可以获取到返回值。

public class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "aaa";
    }
}

public class ThreadPoolTest {
    public static void main(String[] args)  throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(5);
        Future future = es.submit(new MyThread());
        System.out.println(future.get());
    }
}

打印的结果为:aaa
2、java中Executors创建的四种线程池
  • newCachedThreadPool()

创建一个可缓存的线程池。如果有新任务提交时,有空闲线程则直接处理任务,如果没有就创建新的线程处理任务,线程池不对线程池大小做限制。

newCachedThreadPool(ThreadFactory threadFactory):创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory创建新线程。

注:也可以使用ThreadPoolExecutor创建个性化设置的线程池。这里讲的四种线程池看源码,里面都是使用ThreadPoolExecutor构建的。

ExecutorService es = Executors.newCachedThreadPool();
  • newFixedThreadPool(int nThreads)

创建一个固定大小的线程池。当线程达到线程池的最大值,再提交任务则进入队列中,等有线程空闲时,再从队列中取出任务继续执行。

newFixedThreadPool(int nThreads,ThreadFactory threadFactory)threadFactory是创建新线程时使用的工厂。

  • newScheduledThreadPool(int corePoolSize)

创建一个固定大小的线程池,支持定时及周期性任务执行。corePoolSize是池中所保存的线程数,即使线程是空闲的也包括在内。

newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory)threadFactory是创建新线程时使用的工厂。

下面代码中的schedule(Runnable<V> command,long delay,TimeUnit unit)方法的参数:第一个参数是要执行的任务,第二个是从现在开始延迟执行时间,unit是延迟参数的时间单位。

ThreadPoolRunnable tpr = new ThreadPoolRunnable();
ScheduledExecutorService es = Executors.newScheduledThreadPool(3);
es.schedule(tpr,3,TimeUnit.SECONDS);

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)方法可以创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期。

  • newSingleThreadExecutor()
    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。这里的单线程执行指的是线程池内部,从线程池外看,提交任务到线程池时并没有阻塞,仍然是异步的。
3、为什么使用ThreadFactory及用法

使用ThreadFactory有几点好处:

  • 我们可以自己设置一个线程名,而不用使用默认的pool-thread-n这种名字。
  • 可以给线程设置成守护线程。
  • 可以设置优先级。
  • 可以处理为捕捉的异常。
    ThreadFactory使用方法:
MyRunnable:
public class MyRunnable implements Runnable{
    @Override
    public void run() {
        System.out.println("当前线程为:" + Thread.currentThread().getName());
    }
}
MyThreadFactory:
public class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("threadFactory线程");
        return thread;
    }
}
public class Test {
    public static void main(String[] args) {
        MyThreadFactory tf = new MyThreadFactory();
        MyRunnable runnable = new MyRunnable();
        ExecutorService es = Executors.newFixedThreadPool(tf);
        es.submit(runnable);
    }
}

三、ThreadPoolExecutor用法

1、Executors创建的四种线程池可能会出现的问题

上面说了Executors创建的几种线程池,但是不建议使用Executors去创建线程池,这是为什么呢?

我们来看一下Executors创建的线程池源码:

  • Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

源码中corePoolSize被设置为0,maximumPoolSize被设置为Integer.MAX_VALUE,keepAliveTime被设置为60秒,因为最大线程池是Integer.MAX_VALUE,可能会因为创建大量的线程导致OOM。

  • Executors.newScheduledThreadPool(n)
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

其问题和newCachedThreadPool一样,源码中maximumPoolSize被设置为Integer.MAX_VALUE,所以可能会因为创建大量的线程导致OOM。

  • Executors.newFixedThreadPool(n)
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

源码中使用的是LinkedBlockingQueue无界队列作为线程池的工作队列,因为没有设置容量,其默认队列的容量是Integer.MAX_VALUE,所以也可能会堆积大量的请求,从而导致OOM。

  • Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

newFixedThreadPool一样,因为使用LinkedBlockingQueue也可能堆积大量的请求,从而导致OOM。

注:如果使用Executors的静态方法创建ThreadPoolExecutor对象,可以通过使用Semaphore对任务的执行进行限流也可以避免出现OOM异常。

2、ThreadPoolExecutor配置:

有下面7个参数:
int corePoolSize:核心线程池大小
int maximumPoolSize:最大线程池大小
long keepAliveTime:非核心线程最大空闲时间
TimeUnit unit:时间单位
BlockingQueue<Runnable> workQueue:线程排队策略
RejectedExecutionHandler handler:拒绝策略
ThreadFactory threadFactory:线程工厂

  • corePoolSize和maximumPoolSize

ThreadPoolExecutor将根据corePoolSizemaximumPoolSize设置的边界自动调整线程池大小。

corePoolSize和maximumPoolSize:
当新任务在方法 execute(Runnable)中提交任务时,如果运行的线程少于corePoolSize时,哪怕线程池中有空闲状态的线程,也会创建一个新的线程来处理任务。如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新线程。如果设置corePoolSizemaximumPoolSize相同,则创建了固定大小的线程池。如果将 maximumPoolSize设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int)setMaximumPoolSize(int)进行动态更改。

按需构造
默认情况下,即使核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 prestartCoreThread()prestartAllCoreThreads()对其进行动态重写。如果构造带有非空队列的池,则可能希望预先启动线程。

创建新线程
使用ThreadFactory创建新线程,如果没有另外说明,则在同一个 ThreadGroup中一律使用 Executors.defaultThreadFactory()创建线程,这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

  • keepAliveTime线程存活时间

此参数是在池中当前有多于corePoolSize的线程起作用,这些多出的线程在空闲时间超过keepAliveTime时终止。目的是为了在池处于非活跃状态时减少资源消耗的方法。

可以使用setKeepAliveTime(long keepAliveTime,TimeUnit unit)方法动态更改此参数。使用Long.MAX_VALUETimeUnit.NANOSECONDS这两个参数时可以使空闲线程永远不会在关闭之前终止。

只要keepAliveTime值非0,allowCoreThreadTimeOut(boolean)方法可以将此超时策略应用于核心线程(corePool)。

  • TimeUnit时间单位

用于设置keepAliveTime设置的超时时间的单位。keepAliveTime:60L,TimeUnit.SECONDS代表:60秒。

注:TimeUnit.SECONDS.sleep(1); //相当于Thread.sleep(1000);
即可以使用TimeUnit里的sleep()方法来代替Thread.sleep()方法

  • BlockingQueue<Runnable>任务排队策略

所有的BlockingQueue都可用于传输和保持提交的任务。此队列的使用时与池大小进行交互,下面是排队时机:

  • 如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。
  • 如果运行的线程等于或多于corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

主要有三种排队策略:

直接提交队列:
 直接提交队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列:
 使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

有界队列:
 当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

  • RejectedExecutionHandler任务拒绝策略

Executo已经关闭,或Executor将有限边界用于最大线程数和工作对列容量,并且两者已饱和时,在execute方法中提交新任务将被拒绝。以上两种情况下,exeute方法都将调用RejectedExecutionHandlerrejectedExecution()方法。线程池提供了下面四种预定的拒绝策略:

中止策略(默认):ThreadPoolExecutor.AbortPolicy()
 线程数超过maximumPoolSize时,直接拒绝,抛出运行时RejectedExecutionException的异常。

调用者运行策略:ThreadPoolExecutor.CallerRunsPolicy()
 用调用者的线程来调用execute,除非executor被关闭,否则任务不会被丢弃。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。注意:当最大线程池过小时,此策略下,任务会交上层线程(即调用executor的线程)执行,导致上层线程既要处理其他任务,又要处理排队中的大量任务,如果遇到长连接,上层线程将长时间阻塞,出现故障。

抛弃旧任务策略:ThreadPoolExecutor.DiscardOldestPolicy()
 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。注意:如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

直接丢弃策略:ThreadPoolExecutor.DiscardPolicy()
 线程数超过maximumPoolSize时,任务被直接被丢弃。和AbortPolicy一样,但不抛出异常。


写在最后:

  • 如果文章中有错误或是表达不准确的地方,欢迎大家评论中指正,以便我完善。
  • 文章我也会根据所学到新的知识不断更新。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352