线程池(一):阻塞队列

线程

线程的状态

Java线程在运行的声明周期中可能会处于6种不同的状态,这6种线程状态分别为如下所示。
• New:新创建状态。线程被创建,还没有调用 start 方法,在线程运行之前还有一些基础工作要做。
• Runnable:可运行状态。一旦调用start方法,线程就处于Runnable状态。一个可运行的线程可能正在 运行也可能没有运行,这取决于操作系统给线程提供运行的时间。
• Blocked:阻塞状态。表示线程被锁阻塞,它暂时不活动。
• Waiting:等待状态。线程暂时不活动,并且不运行任何代码,这消耗最少的资源,直到线程调度器 重新激活它。
• Timed waiting:超时等待状态。和等待状态不同的是,它是可以在指定的时间自行返回的。
• Terminated:终止状态。表示当前线程已经执行完毕。导致线程终止有两种情况:第一种就是run方 法执行完毕正常退出;第二种就是因为一个没有捕获的异常而终止了run方法,导致线程进入终止状态。

image

线程创建后,调用 Thread 的 start 方法,开始进入运行状态,
当线程执行wait 方法后, 线程进入等待状态,
进入等待状态的线程需要其他线程通知才能返回运行状态。
超时等待相当于在等待状 态加上了时间限制,如果超过时间限制,则线程返回运行状态。
当线程调用到同步方法时,如果线程没有 获得锁则进入阻塞状态,当阻塞状态的线程获取到锁时则重新回到运行状态。
当线程执行完毕或者遇到意外异常终止时,都会进入终止状态。

创建线程的三种方法

  1. 创建类,继承Thread类,重写run() 方法
  2. 创建类,实现Runnable接口,实现run()方法,然后Thread的构造中,传入此类
  3. 直接创建Thread的实例,调用Start方法
  4. 实现Callable 接口,实现call()方法
public class CallableTest {
    public static class ThreadCallAble implements Callable {

        @Override
        public String call() throws Exception {
            return "hello world";
        }
    }
    public static void main(String args[]) {
        ThreadCallAble mCall = new ThreadCallAble();
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        Future mFuture = singleThreadPool.submit(mCall);
        try {
            Log.d("hh", "mFuture.get() = " + mFuture.get());
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

(1)Callable可以在任务接受后提供一个返回值,Runnable无法提供这个功能。
(2)Callable中的call()方法可以抛出异常,而Runnable的run()方法不能抛出异常。
(3)运行Callable可以拿到一个Future对象,Future对象表示异步计算的结果,它提供了检查计算是否 完成的方法。由于线程属于异步计算模型,因此无法从别的线程中得到函数的返回值,在这种情况下就可 以使用 Future 来监视目标线程调用 call()方法的情况。但调用 Future的get()方法以获取结果时,当前 线程就会阻塞,直到call()方法返回结果。

线程的终止

当线程的run方法执行完毕,或者在方法中出现没有捕获的异常时,线程将终止。
在Java早期版本中有 一个stop方法,其他线程可以调用它终止线程,但是这个方法现在已经被弃用了。
interrupt 方法可以用来请 求中断线程。当一个线程调用 interrupt 方法时,线程的中断标识位将被置位(中断标识位为true),线程会 不时地检测这个中断标识位,以判断线程是否应该被中断。要想知道线程是否被置位,可以调用 Thread.currentThread().isInterrupted()
调用Thread.interrupted()来对中断标识位进行复位。
如果一个线程被阻塞,就无法检测中 断状态。如果一个线程处于阻塞状态,线程在检查中断标识位时如果发现中断标识位为true,则会在阻塞方 法调用处抛出InterruptedException异常,并且在抛出异常前将线程的中断标识位复位,即重新设置为 false。
需要注意的是被中断的线程不一定会终止,中断线程是为了引起线程的注意,被中断的线程可以决定如何 去响应中断。所以如果要终止进程,就需要退出run方法

线程 同步

在多线程应用中,两个或者两个以上的线程需要 共享对同一个数据的存取。
synchronized
volatile

阻塞队列

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元 素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素
阻塞队列有两个常见的阻塞场景,它们分别是:
(1)当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
(2)当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的 位置,线程被自动唤醒。
支持以上两种阻塞场景的队列被称为阻塞队列。

BlockingQueue的核心方法 放入数据:

  • offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里。即如果BlockingQueue可以 容纳,则返回true,否则返回false。(本方法不阻塞当前执行方法的线程。)
  • offer(E o,long timeout,TimeUnit unit):可以设定等待的时间。如果在指定的时间内还不能往队列 中加入BlockingQueue,则返回失败。 * put(anObject):将anObject加到BlockingQueue里。如果BlockQueue没有空间,则调用此方法的线程 被阻断,直到BlockingQueue里面有空间再继续。 获取数据:
  • poll(time):取走 BlockingQueue 里排在首位的对象。若不能立即取出,则可以等 time参数规定的时 间。取不到时返回null。
  • poll(long timeout,TimeUnit unit):从BlockingQueue中取出一个队首的对象。如果在指定时间内, 队列一旦有数据可取,则立即返回队列中的数据;否则直到时间超时还没有数据可取,返回失败。
  • take():取走BlockingQueue里排在首位的对象。若BlockingQueue为空,则阻断进入等待状态,直 到 BlockingQueue有新的数据被加入。
  • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)。通 过该方法,可以提升获取数据的效率;无须多次分批加锁或释放锁。

在Java中提供了7个阻塞队列,它们分别如下所示。

• ArrayBlockingQueue:由数组结构组成的 有界 阻塞队列。
• LinkedBlockingQueue:由链表结构组成的 有界 阻塞队列。
• PriorityBlockingQueue:支持优先级排序的 无界 阻塞队列。
• DelayQueue:使用优先级队列实现的 无界 阻塞队列。
• SynchronousQueue:不存储元素的阻塞队列。
• LinkedTransferQueue:由链表结构组成的 无界 阻塞队列。
• LinkedBlockingDeque:由链表结构组成的 双向 阻塞队列

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author 付影影
 * @desc 阻塞队列
 * @date 2019/10/15
 */
public class BlockingQueueTest {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> mArrayBlockingQueue = new ArrayBlockingQueue<>(queueSize);

    public static void main(String[] args) {
        BlockingQueueTest mBlockingQueue = new BlockingQueueTest();
        Consumer consumer = mBlockingQueue.new Consumer();
        Producer producer = mBlockingQueue.new Producer();
        consumer.start();
        producer.start();

    }

    /**
     * 使用阻塞队列实现 生产者和消费者模式,无需考虑同步和线程间同步的问题
     */

    class Consumer extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    mArrayBlockingQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Producer extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    mArrayBlockingQueue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

线程池

在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次 执行一个任务都需要开一个新线程去执行,则这些线程的创建和销毁将消耗大量的资源;并且线程都是“各 自为政”的,很难对其进行控制,更何况有一堆的线程在执行。这时就需要线程池来对线程进行管理

可以通过ThreadPoolExecutor来创建一个线程池,

ThreadPoolExecutor类一共有4个构造方法。其中,拥 有最多参数的构造方法如下所示:
这些参数的作用如下所示。
corePoolSize:核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运 行的线程数少于corePoolSize,则创建新线程来处理任务;如果等于或者多于corePoolSize,则不再创建。如 果调用线程池的prestartAllcoreThread方法,线程池会提前创建并启动所有的核心线程来等待任务。
maximumPoolSize:线程池允许创建的最大线程数。如果任务队列满了并且线程数小于 maximumPoolSize时,则线程池仍旧会创建新的线程来处理任务。
keepAliveTime:非核心线程闲置的超时时间。超过这个时间则回收。如果任务很多,并且每个任务 的执行事件很短,则可以调大keepAliveTime来提高线程的利用率。另外,如果设置 allowCoreThreadTimeOut属性为true时,keepAliveTime也会应用到核心线程上,
TimeUnit:keepAliveTime参数的时间单位。可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)等。
workQueue:任务队列。如果当前线程数大于corePoolSize,则将任务添加到此任务队列中。该任务 队列是BlockingQueue类型的,也就是阻塞队列。
ThreadFactory:线程工厂。可以用线程工厂给每个创建出来的线程设置名字。一般情况下无须设置 该参数。
RejectedExecutionHandler:饱和策略。这是当任务队列和线程池都满了时所采取的应对策略,默认 是AbordPolicy,表示无法处理新任务,并抛出RejectedExecutionException异常。
此外还有3种策略,它们分 别如下。
(1)CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓 新任务的提交速度。 (2)DiscardPolicy:不能执行的任务,并将该任务删除。
(3)DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。

线程池执行流程

image

(1)如果线程池中的线程数未达到核心线程数,则创建核心线程处理任务。
(2)如果线程数大于或者等于核心线程数,则将任务加入任务队列,线程池中的空闲线程会不断地从 任务队列中取出任务进行处理。
(3)如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。
(4)如果线程数超过了最大线程数,则执行饱和策略

线程池的种类

通过直接或者间接地配置ThreadPoolExecutor的参数可以创建不同类型的ThreadPoolExecutor,其中有 4 种线程池比较常用,它们分别是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor和 ScheduledThreadPool

/**
 *线程池
 */
class ThreadPoolActivity : AppCompatActivity() {
    lateinit var fixThreadPool: ExecutorService
    lateinit var cacheThreadPool: ExecutorService
    lateinit var singleThreadPool: ExecutorService
    lateinit var scheduledThreadPool: ExecutorService
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_thread_pool)
        //固定核心线程和最大线程个数,阻塞队列无界
        fixThreadPool = Executors.newFixedThreadPool(5)
        //没有核心线程 最大线程个数不限制,阻塞队列不存储
        cacheThreadPool = Executors.newCachedThreadPool(object : ThreadFactory {
            var count = 0
            override fun newThread(r: Runnable?): Thread {
                Log.d("hh", "新开的线程: newThreadPool_$count")
                val thread = Thread(r, "newThreadPool_${count++}")
                thread.setUncaughtExceptionHandler { t, e ->
                    //处理非正常的线程中止,多线程中通过trycatch试图捕获线程的异常是不可取的
                    Log.d("hh", t.name)
                    e.printStackTrace()
                }
                return thread
            }
        })

        //核心线程和最大线程个数 只有一个,阻塞队列无界
        singleThreadPool = Executors.newSingleThreadExecutor()
        //定时任务的线程池
        scheduledThreadPool = Executors.newScheduledThreadPool(5)

        tvFixThreadPool.setOnClickListener {
            for (i in 0..10) {
                cacheThreadPoolExecutor(Runnable {
                    Log.d("hh", "i = $i")
                })
            }
        }

        ivDownImg.setOnClickListener {
            //downLoadImg("https://y3.cnliveimg.com:8080/image/itv/2016/1112/7e151ac5d7f84f58a4653e879c973192_161409_100.jpg")
            arrayBlockingQueue()
        }
    }

    /**
     * 线程池 调用
     */
    private fun cacheThreadPoolExecutor(r: Runnable) {
        cacheThreadPool.execute(r)
    }

    /**
     * 下载图片
     */
    private fun downLoadImg(path: String) {
        //它有3个泛型参数,分别为Params、Progress和Result,
        // 其中Params为 参数类型,
        // Progress为后台任务执行进度的类型,
        // Result为返回结果的类型。
        // 如果不需要某个参数,可以将 其设置为Void类型
        val asynctask =
                object : AsyncTask<String, Int, Bitmap>() {

                    //在主线程中执行。一般在任务执行前做准备工作,比如对 UI 做一些标记。
                    override fun onPreExecute() {
                        Log.d("hh", "onPreExecute")
                    }

                    //在线程池中执行。在 onPreExecute方法执行后运行,用来执 行较为耗时的操作。在执行过程中可以调用publishProgress(Progress...values)来更新进度信息。
                    override fun doInBackground(vararg params: String?): Bitmap? {
                        Log.d("hh", "doInBackground")
                        val url = URL(params[0])
                        var bitmap: Bitmap? = null
                        /* val inputStream = url.openStream()
                         val bitmap = BitmapFactory.decodeStream(inputStream)*/

                        //开启连接
                        /* val conn = url.openConnection() as HttpURLConnection
                         //设置连接超时
                         conn.connectTimeout = 5000
                         //设置请求方式
                         conn.requestMethod = "GET"
                         //conn.connect();
                         if (conn.responseCode === 200) {
                             val inputStream = conn.getInputStream()
                             bitmap = BitmapFactory.decodeStream(inputStream)
                             inputStream.close()
                         }
                         conn.disconnect()*/

                        val client = OkHttpClient()
                        //获取请求对象
                        val request = Request.Builder().url(url).build()

                        //获取响应体
                        val body = client.newCall(request).execute().body()

                        //获取流
                        val inputStream = body!!.byteStream()
                        //转化为bitmap

                        return BitmapFactory.decodeStream(inputStream)
                    }

                    //:在主线程中执行。当后台任务执行完成后,它会被执行。 doInBackground方法得到的结果就是返回的result的值。此方法一般做任务执行后的收尾工作,比如更新UI 和数据。
                    override fun onPostExecute(result: Bitmap?) {
                        Log.d("hh", "onPostExecute")
                        ivDownImg.setImageBitmap(result)
                    }

                    //在主线程中执行。当调用 publishProgress(Progress...values)时,此方法会将进度更新到UI组件上
                    override fun onProgressUpdate(vararg values: Int?) {
                        Log.d("hh", "onProgressUpdate")
                    }

                }.execute(path)
    }

    fun arrayBlockingQueue() {
        val arrayBlockingQueue = ArrayBlockingQueue<String>(128)
        arrayBlockingQueue.put("hello")
        arrayBlockingQueue.put("world")
        arrayBlockingQueue.offer("hello")
        arrayBlockingQueue.offer("shadow")
        Log.d("hh", "first = ${arrayBlockingQueue.first()}")
        Log.d("hh", "poll = ${arrayBlockingQueue.poll()}")
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容