十一 .Java并发工具

Java中的锁

锁是一种线程同步机制,类似同步块,但是锁比Java的同步块更复杂。锁(以及其他更高级的同步机制)是使用synchronized块创建的,因此我们不能完全摆脱synchronized关键字。

从Java 5开始,软件包java.util.concurrent.locks包含多个锁实现,因此你不必自己去实现锁。但是你仍然需要知道如何使用它们,知道它们实现背后的理论仍然是有用的。有关更多详细信息,请参阅我在java.util.concurrent.locks.Lock 界面上的教程。

一个简单的锁

让我们先看一个java同步块代码:

public class Counter{
    private int count = 0;

    public int inc(){
        synchronized(this){
            return ++count;
        }
    }
}

注意inc()方法中的synchronized(this)块。这个同步块确保一次只有一个线程能执行return ++count 。同步块中的代码可能更高级更复杂,但简单的++count就足以表达重点。

Counter类可以使用Lock编写,而不是synchronized块:

public class Counter{
    private Lock lock = new Lock();
    private int count = 0;

    public int inc(){
        lock.lock();
        int newCount = ++count;
        lock.unlock();
        return newCount;
    }
}

lock()方法锁定了Lock实例,以便阻止所有调用lock()的线程,直到unlock()执行。

这是一个简单的Lock实现:

public class Lock{
    private boolean isLocked = false;

    public synchronized void lock() throws InterruptedException{
        while(isLocked){
            wait();
        }
        isLocked = true;
    }

    public synchronized void unlock(){
        isLocked = false;
        notify();
    }
}

注意while(isLocked)循环,它被称为“自旋锁”。自旋锁和方法wait()notify()Thread Signaling 线程通信一文中详细介绍过。当isLocked为true时,调用lock()的线程在wait()方法上停止运行并等待。为了防止线程在没有接到notify()调用的情况下从wait()调用中意外返回(也被称为Spurious Wakeup),线程会重新检查isLocked条件以查看是否可以安全进行,而不是仅仅假设被唤醒就意味着继续下去是安全的。如果isLocked为false,则线程退出while(isLocked)循环,并设置isLocked为true,以锁定Lock实例阻止其他线程调用lock()

当线程执行完了在临界区中的代码(lock()unlock()之间的代码),线程调用unlock()。执行unlock()设置isLocked为false,并通知(唤醒)在lock()方法的wait()调用中等待的其中一个线程(如果有的话)。

锁重入

Java中的同步块是可重入的。这意味着,如果Java线程进入了同步的代码块,因此也锁定了这个同步块的监视器对象,那么该线程可以进入在此监视器对象上同步的其他Java代码块。这是一个例子:

public class Reentrant{
    public synchronized outer(){
        inner();
    }

    public synchronized inner(){
        //do something
    }
}

请注意outer()inner()方法被声明为synchronized,这在Java中相当于一个synchronized(this)块。如果一个线程调用了outer(),那么在outer()内部调用inner()没有问题 ,因为这两个方法(或块)在同一个监视器对象(“t​​his”)上同步。如果线程已经在监视器对象上持有锁,则它可以进入在同一监视器对象上同步的所有块,这称为重入。

前面展示的锁实现是不可重入的。如果我们像下面那样重写Reentrant类,线程调用outer()时将被阻塞在inner()方法里面lock.lock()上。

public class Reentrant2{
    Lock lock = new Lock();

    public outer(){
        lock.lock();
        inner();
        lock.unlock();
    }

    public synchronized inner(){
        lock.lock();
        //do something
        lock.unlock();
    }
}

线程调用outer()将首先锁定Lock实例,然后调用inner()。在inner()方法内部,线程将再次尝试锁定Lock实例。此时将会失败(意味着线程将被阻止),因为Lock实例已在outer()方法中被锁定。

当我们查看lock()实现时 ,线程在没有调用unlock()的前提下第二次调用lock()时被阻塞的原因很明显:

public class Lock{
    boolean isLocked = false;

    public synchronized void lock() throws InterruptedException{
        while(isLocked){
            wait();
        }
        isLocked = true;
    }
    
    // other methods
}

while循环(自旋锁)中的条件决定了是否线程被允许退出lock()方法。在这里条件isLocked必须是false时才允许,不管什么线程尝试锁定它。

为了使Lock类可以重入,我们需要进行一些小改动:

public class Lock{
    boolean isLocked = false;
    Thread  lockedBy = null;
    int     lockedCount = 0;

    public synchronized void lock() throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(isLocked && lockedBy != callingThread){
            wait();
        }
        isLocked = true;
        lockedCount++;
        lockedBy = callingThread;
    }

    public synchronized void unlock(){
        if(Thread.curentThread() == this.lockedBy){
            lockedCount--;
            if(lockedCount == 0){
                isLocked = false;
                notify();
            }
        }
    }

}

注意while循环(自旋锁)现在将锁定Lock实例的线程纳入了考虑范畴内。如果锁被解锁(isLocked= false)或者调用线程是锁定Lock实例的线程,则while循环将不会执行,调用lock()的线程也能够退出该方法。

此外,我们需要计算锁被同一个线程锁定的次数。否则,只需要调用unlock()一次就会释放锁,即使锁被多次加锁也是如此。我们不希望锁被释放,直到锁定它的线程执行与lock()调用相同数量的unlock()调用。

这个Lock类现在是可以重入的。

公平锁

Java的synchronized块不保证尝试进入它的线程被授予访问权限的顺序。因此,如果许多线程经常竞争访问同一个同步块,则存在一个或多个线程永远不被授予访问权限的风险—— 访问总是被授予其他线程,这被称为饥饿。为了避免这种情况,Lock应该是公平的。由于本文中Lock的实现在内部使用同步块,因此它不保证公平性。饥饿和公平在“ 饥饿与公平 ”一文中有更详细的讨论 。

从finally子句调用unlock()

当使用Lock保护临界区,并且临界区可能抛出异常时,从finally-clause内部调用unlock()方法是很关键的。这样做可确保Lock解锁,以便其他线程可以锁定它。这是一个例子:

lock.lock();
try {
  //do critical section code, which may throw exception
} finally {
  lock.unlock();
}

这个小模式确保Lock在临界区的代码中抛出异常时解锁。如果unlock()不从finally-clause 内部调用,并且临界区抛出异常,那么Lock将永远保持锁定状态,从而导致在Lock实例上调用lock()的所有线程永远停止运行。

Java中的读写锁

读/写锁是比Locks in Java中出现的Lock实现更复杂的锁。想象一下,你有一个应用程序读取和写入一些资源,但写入它的操作并不像读取它那么多。我们知道读取相同资源的两个线程不会导致彼此出现问题,因此多个想要读取资源的线程能够同时读取这个资源。但是,如果有一个线程想要对资源进行写入,则不能有其他读取或写入操作同时进行。要解决多个读取但只允许一个线程写入的问题,你需要一个读/写锁。

Java 5已经在java.util.concurrent 包中附带读/写锁实现。即便如此,了解其实施背后的理论仍然有用。

读/写锁Java实现

首先,让我们总结一下获取资源的读写访问权限的条件:

读权限 没有线程正在写入,并且没有线程已经请求了写权限。
写权限 如果没有线程正在读或写

如果线程想要读取资源,只要没有线程正在写入它,并且没有线程已经请求了对资源的写访问权就没问题。我们假设写请求比读请求更重要,因此优先考虑写访问请求。此外,如果读取是最常发生的事情,并且我们没有优先考虑写入,则可能发生饥饿。请求写访问权限的线程将一直被阻塞,直到所有读线程都释放了ReadWriteLock。如果新线程不断获取读访问权限,则等待写访问的线程将无限期地被阻塞,从而导致饥饿。因此,如果当前没有线程已经锁定ReadWriteLock 进行写入或为了写入请求加锁,才能让读线程获取读取权限。

当没有线程正在读取或写入资源时,想要写入资源的线程可以获取权限。除非你希望保证请求写访问的线程之间的公平性,否则有多少个请求写访问权的线程或者以什么顺序请求无关紧要。

考虑到这些简单的规则,我们可以像这样实现ReadWriteLock

public class ReadWriteLock{
    private int readers       = 0;
    private int writers       = 0;
    private int writeRequests = 0;

    public synchronized void lockRead() throws InterruptedException{
        while(writers > 0 || writeRequests > 0){
            wait();
        }
        readers++;
    }

    public synchronized void unlockRead(){
        readers--;
        notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException{
        writeRequests++;

        while(readers > 0 || writers > 0){
            wait();
        }
        writeRequests--;
        writers++;
    }

    public synchronized void unlockWrite() throws InterruptedException{
        writers--;
        notifyAll();
    }
}

ReadWriteLock有两个加锁方法和两个解锁方法。一种加锁和解锁方法用于读访问,另一种用于写访问。

读访问的规则在lockRead()方法中实现。除非存在一个已获取写锁的线程,或者一个或多个已经请求了写访问权限的线程,否则所有读线程都能获得读访问权。

写访问的规则在lockWrite()方法中实现。想要进行写访问的线程通过请求写访问权限(writeRequests++)开始,然后它将检查它是否可以真的获得写访问权限。如果没有已经获取了读锁的线程,并且没有已经获取了写锁的线程,则此线程可以获得写访问权,请求写访问权限的线程数无关紧要。

值得注意的是,无论unlockRead()还是unlockWrite()方法都调用notifyAll(),而不是notify()。为了解释原因,请想象以下情况:

在ReadWriteLock中,有正在等待获取读锁的线程和等待获取写锁的线程。如果被notify()唤醒的线程是读线程,它将继续等待,因为已经有线程请求了写锁。但是,没有任何在等待获取写锁的线程被唤醒,所以不会发生任何其他事情,没有线程可以同时既获取读锁也获取写锁。因此需要通过调用noftifyAll()唤醒所有等待的线程并检查它们是否可以获得所需的锁。

调用notifyAll()还有另一个优势。如果多个线程正在等待读锁并且没有在等待写锁的,此时unlockWrite()被调用,则所有等待读锁的线程都能够立即获取读锁 —— 而不是一个一个获取。

读/写锁重入

前面展示的ReadWriteLock类不是可重入的。如果持有写锁的线程再次请求获取写锁,它将被阻塞,因为已经有了一个写线程 —— 它自己本身。此外,考虑这种情况:

  1. 线程1获取了读锁。
  2. 线程2请求写锁但由于有一个读线程而被阻塞。
  3. 线程1再次请求读锁(重新进入锁),但由于存在写请求而被阻止

在这种情况下,之前的ReadWriteLock会锁死 —— 类似于死锁的情况,任何请求读锁或写锁的线程都将被阻塞。

要使ReadWriteLock可重入,必须进行一些更改。读锁和写锁的重入将分开处理。

读重入

为了使ReadWriteLock的读锁可以重入,我们将首先建立读锁重入的规则:

  • 一个线程获取读锁,在线程有资格获得读锁(没有写线程或写请求),或者它已经持有了读锁(不管是否有写请求存在)的情况下。

要确定某个线程是否具有读锁,每个获取了读锁的线程的引用以及它已获取读锁的次数都要保存在一个Map中。在确定是否可以授予读访问权时,将在该Map中查询是否有调用线程的引用。以下是lockRead()unlockRead()方法在更改后的样子:

public class ReadWriteLock{
  private Map<Thread, Integer> readingThreads = new HashMap<Thread, Integer>();
  private int writers        = 0;
  private int writeRequests  = 0;

  public synchronized void lockRead() throws InterruptedException{
      Thread callingThread = Thread.currentThread();
      while(!canGrantReadAccess(callingThread)){
          wait();                                                                   
      }

      readingThreads.put(callingThread, (getAccessCount(callingThread) + 1));
  }

  public synchronized void unlockRead(){
      Thread callingThread = Thread.currentThread();
      int accessCount = getAccessCount(callingThread);
      if(accessCount == 1){ 
          readingThreads.remove(callingThread); 
      } else { 
          readingThreads.put(callingThread, (accessCount -1)); 
      }
      notifyAll();
  }

  private boolean canGrantReadAccess(Thread callingThread){
      if(writers > 0)             return false;
      if(isReader(callingThread)) return true;
      if(writeRequests > 0)       return false;
      return true;
  }

  private int getReadAccessCount(Thread callingThread){
      Integer accessCount = readingThreads.get(callingThread);
      if(accessCount == null) return 0;
      return accessCount.intValue();
  }

  private boolean isReader(Thread callingThread){
      return readingThreads.get(callingThread) != null;
  }

}

正如你所看到的,只有当前没有线程正在写入资源时才能获取读锁。此外,如果调用线程已持有读锁,则优先于任何writeRequests。

写重入

仅当线程已经持有写锁时才能够写重入。以下是lockWrite()unlockWrite()方法的修改版本:

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads = new HashMap<Thread, Integer>();
    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

  public synchronized void lockWrite() throws InterruptedException{
      writeRequests++;
      Thread callingThread = Thread.currentThread();
      while(! canGrantWriteAccess(callingThread)){
          wait();
      }
      writeRequests--;
      writeAccesses++;
      writingThread = callingThread;
  }

  public synchronized void unlockWrite() throws InterruptedException{
      writeAccesses--;
      if(writeAccesses == 0){
          writingThread = null;
      }
      notifyAll();
  }

  private boolean canGrantWriteAccess(Thread callingThread){
      if(hasReaders())             return false;
      if(writingThread == null)    return true;
      if(!isWriter(callingThread)) return false;
      return true;
  }

  private boolean hasReaders(){
      return readingThreads.size() > 0;
  }

  private boolean isWriter(Thread callingThread){
      return writingThread == callingThread;
  }
}

注意在确定调用线程是否可以获得写锁时,现在考虑了正在持有写锁的线程。

Read to Write 重入

有时,具有读锁的线程也需要获取写锁。此线程必须是唯一的读者,才允许这种情况发生。要实现这一点,writeLock()方法应该稍微改变一下:

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads = new HashMap<Thread, Integer>();
    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

  public synchronized void lockWrite() throws InterruptedException{
      writeRequests++;
      Thread callingThread = Thread.currentThread();
      while(! canGrantWriteAccess(callingThread)){
          wait();
      }
      writeRequests--;
      writeAccesses++;
      writingThread = callingThread;
  }

  public synchronized void unlockWrite() throws InterruptedException{
      writeAccesses--;
      if(writeAccesses == 0){
          writingThread = null;
      }
      notifyAll();
  }

  private boolean canGrantWriteAccess(Thread callingThread){
      //注意此处的修改
      if(isOnlyReader(callingThread))    return true;
      if(hasReaders())                   return false;
      if(writingThread == null)          return true;
      if(!isWriter(callingThread))       return false;
      return true;
  }

  private boolean hasReaders(){
      return readingThreads.size() > 0;
  }

  private boolean isWriter(Thread callingThread){
      return writingThread == callingThread;
  }

  private boolean isOnlyReader(Thread thread){
      return readers == 1 && readingThreads.get(callingThread) != null;
  }
}

现在ReadWriteLock类是read-to-write可重入的。

Write to Read 重入

有时,持有写锁的线程也需要获取读锁。一个写线程应该永远都能获取读锁如果发起了读锁请求。如果一个线程持有了写锁,则其他线程都不能获取读锁写锁,因此这是并不危险。以下是canGrantReadAccess()方法的更改:

public class ReadWriteLock{

    private boolean canGrantReadAccess(Thread callingThread){
        if(isWriter(callingThread)) return true;
        if(writingThread != null)   return false;
        if(isReader(callingThread)  return true;
        if(writeRequests > 0)       return false;
        return true;
    }
}

完整的可重入读写锁

以下是完整的可重入ReadWriteLock实现。我对代码进行了一些重构,使它们更容易阅读。

public class ReadWriteLock {
    private Map<Thread, Integer> readingThreads = new HashMap<Thread, Integer>();
    private int writeAccesses = 0;
    private int writeRequests = 0;
    private Thread writingThread = null;

    public synchronized void lockRead() throws InterruptedException {
        Thread callingThread = Thread.currentThread();
        while (!canGrantReadAccess(callingThread)) {
            wait();
        }

        readingThreads.put(callingThread, (getReadAccessCount(callingThread) + 1));
    }

    /**
     * 获取读锁的条件
     * 1. 没有线程正在写,没有写请求
     * 2. 读锁可重入
     * 3. 写-读转换
     * @param callingThread 调用线程
     * @return true 如果满足条件
     */
    private boolean canGrantReadAccess(Thread callingThread) {
        if (isWriter(callingThread)) return true;
        if (hasWriter())             return false;
        if (isReader(callingThread)) return true;
        if (hasWriteRequests())      return false;
        return true;
    }

    public synchronized void unlockRead() {
        Thread callingThread = Thread.currentThread();
        if (!isReader(callingThread)) {
            throw new IllegalMonitorStateException("Calling Thread does not hold a read lock" 
                + "on this ReadWriteLock");
        }
        int accessCount = getReadAccessCount(callingThread);
        if (accessCount == 1) {
            readingThreads.remove(callingThread);
        } else {
            readingThreads.put(callingThread, (accessCount - 1));
        }
        notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while (!canGrantWriteAccess(callingThread)) {
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() throws InterruptedException {
        if (!isWriter(Thread.currentThread())) {
            throw new IllegalMonitorStateException("Calling Thread does not" +
                    " hold the write lock on this ReadWriteLock");
        }
        writeAccesses--;
        if (writeAccesses == 0) {
            writingThread = null;
        }
        notifyAll();
    }

    /**
     * 获取读锁的条件
     * 1. 没有线程正在读或者写
     * 2. 写锁可重入
     * 3. 读-写转换
     * @param callingThread 调用线程
     * @return true 如果条件满足
     */
    private boolean canGrantWriteAccess(Thread callingThread) {
        if (isOnlyReader(callingThread)) return true;
        if (hasReaders()) return false;
        if (writingThread == null) return true;
        if (!isWriter(callingThread)) return false;
        return true;
    }

    private int getReadAccessCount(Thread callingThread) {
        Integer accessCount = readingThreads.get(callingThread);
        if (accessCount == null) return 0;
        return accessCount.intValue();
    }

    private boolean hasReaders() {
        return readingThreads.size() > 0;
    }

    private boolean isReader(Thread callingThread) {
        return readingThreads.get(callingThread) != null;
    }

    private boolean isOnlyReader(Thread callingThread) {
        return readingThreads.size() == 1 &&
                readingThreads.get(callingThread) != null;
    }

    private boolean hasWriter() {
        return writingThread != null;
    }

    private boolean isWriter(Thread callingThread) {
        return writingThread == callingThread;
    }

    private boolean hasWriteRequests() {
        return this.writeRequests > 0;
    }
}

从finally子句调用unlock()

当使用ReadWriteLock保护临界区,并且临界区可能抛出异常时,在finally-clause 内调用readUnlock()和writeUnlock()方法很重要。这样做可以确保ReadWriteLock解锁,以便其他线程可以锁定它。这是一个例子:

lock.lockWrite();
try{
  //do critical section code, which may throw exception
} finally {
    lock.unlockWrite();
}

这个小模式确保ReadWriteLock在l临界区的代码中抛出异常时解锁。如果unlockWrite()没有从finally-clause内调用,并且临界区抛出异常,那么ReadWriteLock将永远保持写入锁定状态,从而导致在ReadWriteLock实例上调用lockRead()lockWrite()的所有线程永远停止。

重入锁死

重入锁死是一种类似于死锁嵌套监视器锁死的情况。在之前的读/写锁中也介绍了重入锁死。

如果线程再次获取不可重入的LockReadWriteLock或其他不可重入的同步器,则可能会发生重入锁死。可重入意味着已经持有锁的线程可以重新获得它,Java的同步块是可重入的。因此,以下代码可以正常工作:

public class Reentrant{
    public synchronized outer(){
        inner();
    }

    public synchronized inner(){
        //do something
    }
}

以下的Lock实现不可重入:

public class Lock{
    private boolean isLocked = false;

    public synchronized void lock() throws InterruptedException{
        while(isLocked){
            wait();
        }
        isLocked = true;
    }

    public synchronized void unlock(){
        isLocked = false;
        notify();
    }
}

如果一个线程在两次调用lock()之间没有调用unlock(),则第二次调用lock()将阻塞,发生了重入锁死。

为避免重入锁死,你有两种选择:

  1. 避免编写再次加锁的代码
  2. 使用重入锁

哪种选择最适合你的项目取决于你的具体情况。重入锁通常没有非重入锁性能好,并且它们更难实现,但在你的情况下这可能不是问题。使用可重入锁是否会让你的代码更简单也必须根据具体情况确定。

Semaphores

Semaphores(信号量)是一种线程同步结构,可用于在线程之间发送信号以避免信号丢失问题,或者像一样保护临界区。Java 5在java.util.concurrent包中附带了信号量的实现,因此你不必自己实现信号量。尽管如此,了解其实现和使用背后的理论仍然很有用。

简单的Semaphore

这是一个简单的Semaphore实现:

public class Semaphore {
    private boolean signal = false;

    public synchronized void take() {
        this.signal = true;
        this.notify();
    }

    public synchronized void release() throws InterruptedException{
        while(!this.signal) wait();
        this.signal = false;
    }
}

take()方法发送一个在Semaphore内部存储的信号。release()方法等待信号,当接收到信号,并且标志被清除时,release()方法退出。

使用这样的信号量可以避免信号丢失。你可以使用take()来代替notify()release()代替wait()。如果take()调用发生在release()调用之前,调用线程仍然知道take()被调用,因为信号内部存储在变量中。这与wait()notify()方法不一样。

当使用信号量发送信号时,take()release()这两个名称可能会显得有点奇怪。这些名称源于使用信号量作为锁,在这种情况下,它们更有意义。

使用信号量发送信号

以下是两个线程使用Semaphore相互发信号的简化示例:

Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
ReceivingThread receiver = new ReceivingThread(semaphore);

receiver.start();
sender.start();


public class SendingThread {
    Semaphore semaphore = null;

    public SendingThread(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    public void run(){
        while(true){
            //do something, then signal
            this.semaphore.take();
        }
    }
}

public class RecevingThread {
    Semaphore semaphore = null;

    public ReceivingThread(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    public void run(){
        while(true){
            this.semaphore.release();
            //receive signal, then do something...
        }
    }
}

计数信号量

在上一节的Semaphore实现中没有计算通过take()的方法调用发送给它的信号数量。我们可以改变Semaphore的实现使其这样做,这称为计数信号量。这是计数信号量的简单实现:

public class CountingSemaphore {
    private int signals = 0;

    public synchronized void take() {
        this.signals++;
        this.notify();
    }

    public synchronized void release() throws InterruptedException{
        while(this.signals == 0) wait();
        this.signals--;
    }
}

有界信号量

CoutingSemaphore中没有指定存储信号数量上限。我们可以将信号量实现更改为具有上限,如下所示:

public class BoundedSemaphore {
    private int signals = 0;
    private int bound   = 0;

    public BoundedSemaphore(int upperBound){
        this.bound = upperBound;
    }

    public synchronized void take() throws InterruptedException{
        while(this.signals == bound) wait();
        this.signals++;
        this.notify();
    }

    public synchronized void release() throws InterruptedException{
        while(this.signals == 0) wait();
        this.signals--;
        this.notify();
    }
}

注意如果信号的数量等于上限,take()方法现在将阻塞。如果BoundedSemaphore 已达到其信号数量上限,直到有一个线程调用release()后 ,才允许调用take()的线程继续传递其信号。

使用信号量作为锁

可以使用有界信号量作为锁。为此,请将上限设置为1,并调用take()release()保护临界区。这是一个例子:

BoundedSemaphore semaphore = new BoundedSemaphore(1);

semaphore.take();
try{
  //critical section
} finally {
    semaphore.release();
}

与作为信号的例子相反,方法take()release()现在由同一个线程调用。由于只允许一个线程获取信号量,因此调用take()的所有其他线程都将被阻塞,直到release()被调用。调用release()永远不会阻塞,因为take()总是先被调用。

你还可以使用有界信号量来限制允许进入临界区的线程数。例如,在上面的示例中,如果将BoundedSemaphore的限制设置为5 ,会发生什么?一次允许5个线程进入临界区。但是,你必须确保这5个线程的操作不会发生冲突,否则应用程序将出错。

relase()方法从finally块内部调用,以确保即使临界区抛出异常也会调用它。

阻塞队列

阻塞队列是一个队列,当你尝试从队列中出队并且队列为空时,或者你尝试将项插入队列且队列已满时会阻塞。尝试从空队列中出队的线程被阻塞,直到某个其他线程将项插入队列。尝试将项插入满队列的线程将被阻塞,直到某个其他线程在队列中腾出空间,方法是将一个或多个项出队或完全清除队列。

这是一个图表,显示了通过阻塞队列协作的两个线程:

|
A BlockingQueue with one thread putting into it, and another thread taking from it.

Java 5在java.util.concurrent包中附带了阻塞队列实现。你可以在我的java.util.concurrent.BlockingQueue教程中了解该类。即使Java 5带有阻塞队列实现,了解其实现背后的理论也很有用。

阻塞队列实现

阻塞队列的实现看起来类似于有界信号量。这是一个阻塞队列的简单实现:

public class BlockingQueue {
    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }

    public synchronized void enqueue(Object item) throws InterruptedException  {
        while(this.queue.size() == this.limit) {
            wait();
        }
        if(this.queue.size() == 0) {
            notifyAll();
        }
        this.queue.add(item);
    }

    public synchronized Object dequeue() throws InterruptedException{
        while(this.queue.size() == 0){
            wait();
        }
        if(this.queue.size() == this.limit){
            notifyAll();
        }
        return this.queue.remove(0);
    }
}

注意notifyAll()仅在队列的大小等于其范围边界(0或极限)时由enqueue()dequeue()调用。如果在调用enqueue()dequeue()时队列大小不等于边界,则没有线程在等待将项排队或出列。

线程池

当你需要限制应用程序中同时运行的线程数时,线程池非常有用。启动一个新线程会产生性能开销,并且也要为其堆栈等分配一些内存。

可以将需要并发执行的任务提交给线程池,而不是为其启动一个新线程。只要线程池有空闲的工作线程,任务就会被分配给其中一个然后执行。在线程池内部,任务被插入到阻塞队列中,而工作线程从阻塞队列中取出任务。当一个新任务插入到阻塞队列时,一个空闲线程将其出队并执行它。而线程池中的其余空闲线程将被阻塞,等待任务。

线程池通常用于多线程服务器,通过网络到达服务器的每个连接都被包装为任务然后传递给线程池。线程池中的工作线程将同步处理连接上的请求。稍后将详细介绍如何在Java中实现多线程服务器。

Java 5在java.util.concurrent包中内置了线程池的实现,因此你不必实现自己的线程池。你可以在我的java.util.concurrent.ExecutorService教程中阅读有关它的更多信息 。不管怎样,了解一下线程池的实现仍然很有用。

这是一个简单的线程池实现。请注意,此实现使用我自己的BlockingQueue类,如我的阻塞队列教程中所述。在现实生活中,你可以使用Java的内置阻塞队列之一。

public class ThreadPool {
    private BlockingQueue taskQueue = null;
    private List<PoolThread> threads = new ArrayList<PoolThread>();
    private boolean isStopped = false;

    public ThreadPool(int noOfThreads, int maxNoOfTasks){
        taskQueue = new BlockingQueue(maxNoOfTasks);

        for(int i=0; i<noOfThreads; i++){
            threads.add(new PoolThread(taskQueue));
        }
        for(PoolThread thread : threads){
            thread.start();
        }
    }

    public synchronized void execute(Runnable task) throws Exception{
        if(this.isStopped) throw new IllegalStateException("ThreadPool is stopped");
        this.taskQueue.enqueue(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThread thread : threads){
           thread.doStop();
        }
    }
}
public class PoolThread extends Thread {
    private BlockingQueue taskQueue = null;
    private boolean       isStopped = false;

    public PoolThread(BlockingQueue queue){
        taskQueue = queue;
    }

    public void run(){
        while(!isStopped()){
            try{
                Runnable runnable = (Runnable) taskQueue.dequeue();
                runnable.run();
            } catch(Exception e){
                //log or otherwise report exception,
                //but keep pool thread alive.
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        this.interrupt(); //break pool thread out of dequeue() call.
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}

线程池实现由两部分组成。一个ThreadPool类,它是线程池的公共接口,以及一个PoolThread类实现执行任务的线程。

要执行一个任务,请使用一个Runnable实现作为调用ThreadPool.execute(Runnable r)方法的参数。这个Runnable在线程池内进入阻塞队列,等待被出队执行。

Runnable会被一个空闲的PoolThread出队并执行,你可以在PoolThread.run()方法中看到这一点。执行完成后,PoolThread 循环并且再次尝试将任务出队,直到停止。

要停止ThreadPool,需要调用ThreadPool.stop()方法。停止信号在内部的isStopped成员变量上记录,然后通过在每个工作线程上调用doStop()来停止线程池中的线程。注意execute()方法将抛出IllegalStateException如果execute()stop()调用后被调用。

线程将在完成当前执行的所有任务后停止。注意PoolThread.doStop()中的this.interrupt()调用,这可以确保在taskQueue.dequeue()中的wait()调用阻塞的线程能跳出wait()调用,抛出InterruptedException然后离开dequeue()方法调用。此异常在PoolThread.run()方法中被捕获,报告,然后进入下一个循环检查isStopped变量。由于isStopped现在是true,PoolThread.run()将退出并且线程死亡。

Compare and Swap

Compare and swap 是设计并发算法时使用的技术。compare and swap将预期值与变量的具体值进行比较,如果变量的具体值等于预期值,则将变量的值交换为新变量。可能听起来有点复杂,但一旦理解它实际上相当简单,所以让我再详细说明一下这个主题,下文称其为CAS。

Compare And Swap 支持什么情形

发生在程序和并发算法中非常常见的模式是“check then act(检查然后操作)”模式。当代码首先检查变量的值然后基于该值进行操作时,会发生check then act模式。这是一个简单的例子:

class MyLock {
    private boolean locked = false;

    public boolean lock() {
        if(!locked) {
            locked = true;
            return true;
        }
        return false;
    }
}

如果要在多线程应用程序中使用此代码,则会出现许多错误,但请暂时忽略它。

如你所见,lock()方法首先检查成员变量locked是否等于false(检查),如果它是就设置locked为true(然后执行)。

如果多个线程可以访问同一个MyLock实例,上面的lock()无法保证能够正常工作。如果线程A检查locked值并且看到它是false,线程B也可以在完全相同的时间检查locked值。或者,实际上,是在线程A设置locked为false之前的任何时间。因此,线程A和线程B都能发现locked为false,然后两者都将基于该信息工作。

要在多线程应用程序中正常工作,“check then act”操作必须是原子的。原子意味着“检查”和“执行”两个动作都作为原子(不可分割)代码块执行。任何开始执行执行块的线程都能执行完此块而不受其他线程的干扰。没有其他线程可以一起同时执行原子块。

使用synchronized关键字将前面的lock()方法代码示例转换为原子代码块:

class MyLock {
    private boolean locked = false;

    public synchronized boolean lock() {
        if(!locked) {
            locked = true;
            return true;
        }
        return false;
    }
}

现在lock()方法是同步的,因此在同一个MyLock实例上一次只能有一个线程能执行它 ,该lock()方法实际上是原子的。

原子lock()方法实际上是"compare and swap"的一个例子。lock()方法将locked变量与预期值false进行比较 ,如果locked实际值等于false,则将变量的值 交换true

Compare And Swap原子操作

现代CPU具有对原子CAS操作的内置支持。从Java 5开始,你可以通过java.util.concurrent.atomic包中的一些新原子类访问CPU中的这些函数。

下面是一个示例,说明如何使用AtomicBoolean类实现前面的lock()方法 :

public static class MyLock {
    private AtomicBoolean locked = new AtomicBoolean(false);

    public boolean lock() {
        return locked.compareAndSet(false, true);
    }
}

注意locked变量不再是boolean类型而是AtomicBoolean类型。这个类具有一个compareAndSet()方法,将AtomicBoolean实例的值与期望值进行比较,如果是期望值,则将该值与新值交换。在这种情况下,它将locked的值和false进行比较,如果是false,则设置AtomicBoolean的新值为true

如果成功交换了值,则compareAndSet()方法返回true,否则返回false

建议使用Java 5+附带的CAS特性而不是实现自己的,因为Java 5+中内置的CAS功能允许你利用CPU底层的CAS功能,这使你的CAS代码执行的更快。

线程应用实例

等待超时模式

开发人员经常会遇到这样的方法调用场景,调用一个方法时等待一段时间(一般来说时给定一个时间段),如果该方法能够在给定的时间段内得到结果,那么结果将立即返回,反之,超时返回默认结果。

假设超时时间段是T,那么可以推断出在当前时间now+T只会就会超时。
定义如下变量:

  • 等待持续时间: REMAINING = T
  • 超时时间: FUTURE = now + T

这时候只需要wait(REMAINING)即可,在wait(REMAINING)返回后会将执行:REMAINING = FUTURE - now。如果REMAINING小于等于0,表示已经超时,直接退出,否则继续执行wait(REMAINING)

上述描述等待超时模式的伪代码如下。

public synchronized Object get(long mills) throws InterruptedException {
        long future = System.currentTimeMillis() + mills;
        long remaining = mills;
        while ((result == null) && remaining > 0) {
            wait(remaining);
            remaining = future - System.currentTimeMillis();
        }
        return result;
    }

可以看出,等待超时模式就是在等待/通知范式基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过长,也不会“永久”阻塞调用者,而是会按调用者的要求“按时”返回。

简单的数据库连接池示例

我们使用等待超时模式来构造一个简单的数据库连接池,在示例中模拟从连接池中获取,使用和释放连接的过程,而客户端获取连接的过程被设定为等待超时模式,也就是在1000ms内如果无法获取到可用连接,将会返回客户端一个null值。

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionPoolTest {
    private static ConnectionPool pool = new ConnectionPool(10);
    private static CountDownLatch start = new CountDownLatch(1);
    private static CountDownLatch end;

    public static void main(String[] args) throws Exception {
        int threadCount = 50;
        end = new CountDownLatch(threadCount);

        int count = 20;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; ++i) {
            Thread thread = new Thread(new ConnectionRunner(count, got, notGot), 
                      "ConnectionRunnerThread");
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection: " + got);
        System.out.println("not got connection: " + notGot);
    }

    private static class ConnectionRunner implements Runnable {
        private int count;
        private AtomicInteger got;
        private AtomicInteger notGot;

        public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        @Override
        public void run() {
            try {
                start.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            while (count > 0) {
                try {
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}

class ConnectionPool {
    private final LinkedList<Connection> pool = new LinkedList<>();

    ConnectionPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; ++i) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }

    /**
     * 释放指定连接,将其归还给线程池
     *
     * @param connection 连接
     */
    void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }

    /**
     * 获取连接,提供超时控制,如果<=0,则一直等待直到获取连接,否则超过指定时间仍未获取连接则返回null
     *
     * @param mills 超时时间
     * @return connection
     * @throws Exception InterruptedException
     */
    Connection fetchConnection(long mills) throws Exception {
        synchronized (pool) {
            if (mills <= 0) {
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}

/**
 * 由于java.sql.Connection是一个接口,最终的实现是由数据库驱动提供方来实现的,
 * 考虑到这只是个示例,我们通过动态代理构造一个Connection。
 */
class ConnectionDriver {
    /**
     * 通过动态代理创建Connection,实现为调用commit()方法时休眠100ms
     *
     * @return Connection
     */
    static Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), 
                  new Class<?>[]{Connection.class}, new ConnectionHandler());
    }

    static class ConnectionHandler implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if ("commit".equals(method.getName())) {
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
        }
    }
}

上述示例使用CountDownLatch来确保ConnectionRunnerThread能够同时开始执行,并且在全部结束后,才使得main线程从等待状态返回。当前设定的场景是10个线程同时运行获取连接池中的连接,通过调节线程数量来观察未获取到连接的情况。(机器CPU:i5-6300hq,16G内存,实际输出可能不同)

线程数量 总获取次数 获取到次数 未获取到次数 未获取到比率
10 200 200 0 0%
20 400 393 7 1.75%
30 600 559 41 6.83%
40 800 712 88 11%
50 1000 820 180 18%

从表中数据可以看出,在资源一定的情况下,随着客户端线程的逐步增加,客户端出现超时无法获取连接的比率不断变高。虽然客户端线程在这种超时获取模式下会出现连接无法获取的情况,但是他能保证客户端线程不会一直挂在连接获取的操作上,而是“按时”返回,并告知客户端连接获取出现问题,是系统的一种自我保护机制。数据库连接池的设计也可以复用到其他的资源获取的场景,针对昂贵资源的获取都应该加以超时限制。

线程池技术及其示例

对于服务端的程序,经常面对的是客户端传入的短小(执行时间短,工作内容较为单一)任务,需要客户端快速处理并返回结果。如果客户端每次接收到一个任务,创建一个线程,然后进行执行,这在原型阶段是个不错的选择,但是面对成千上万的任务递交进服务器时,如果还是采取一个任务一个线程的方式,那么将会创建数以万计的线程,这不是一个好的选择。因为这使得操作系统频繁进行线程上下文切换,无故增加系统负载,而线程的创建和消亡也都是要消耗系统资源的,也无疑浪费了系统资源。

线程池技术能够很好解决这个问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面消除了频繁创建和消亡线程的系统资源开销,另一方面面对过量任务的提交能够平缓的劣化。

下面看一个简单的线程池接口定义:

package util;

public interface ThreadPool<T extends Runnable> {
    /**
     * 执行一个job
     * @param job 线程
     */
    void execute(T job);

    /**
     * 关闭线程池
     */
    void shutdown();

    /**
     * 增加工作线程数量
     * @param num 数量
     */
    void addWorkers(int num);

    /**
     * 减少工作线程数量
     * @param num 数量
     */
    void removeWorkers(int num);

    /**
     * 获取正在等待执行的任务数量
     * @return num
     */
    int getJobSize();
}

客户端可以通过execute(T)方法将任务提交到线程池执行,而客户端自身不用等待任务完成。除了execute(T)方法外,线程池接口提供了增大/减小工作者线程以及关闭线程池的方法。这里工作者线程代表一个重复执行任务的线程,而每个由客户端提交的任务都将进入到工作队列中等待工作者线程的处理。

接下来时线程池接口的默认实现:

package util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultThreadPool<T extends Runnable> implements ThreadPool<T> {
    /** 线程池最大限制数 */
    private static final int MAX_WORKER_NUMBERS = 10;
    /** 线程池最小限制数 */
    private static final int MIN_WORKER_NUMBERS = 1;
    /** 线程池默认线程数量 */
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    /** 工作列表 */
    private final LinkedList<T> jobs = new LinkedList<>();
    /** 工作者列表 */
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
    /** 工作者线程数量 */
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    /** 线程编号 */
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
        workerNum = num > 10 ? MAX_WORKER_NUMBERS : num <=0 ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }

    @Override
    public void execute(T job) {
        if(job !=null) {
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        workers.forEach(Worker::shutdown);
    }

    @Override
    public void addWorkers(int num) {
        synchronized (workers) {
            if(num + workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - workerNum;
            }
            initializeWorkers(num);
            workerNum += num;
        }
    }

    @Override
    public void removeWorkers(int num) {
        if(num > workerNum) {
            throw new IllegalArgumentException("beyond workNum");
        }
        int count=0;
        while (count<num) {
            Worker worker = workers.get(0);
            if(workers.remove(worker)) {
                worker.shutdown();
                count++;
            }
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    private void initializeWorkers(int num) {
        for(int i=0;i<num;++i) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + 
                      threadNum.incrementAndGet());
            thread.start();
        }
    }

    private class Worker implements Runnable {
        private volatile boolean running = true;

        @Override
        public void run() {
            T job;
            while (running) {
                synchronized (jobs) {
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    job = jobs.removeFirst();
                }
                if(job !=null) {
                    try {
                        job.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public void shutdown() {
            running = false;
        }
    }
}

从线程池的实现可以看到,当客户端调用execute(T)方法时,会不断向任务列表jobs中添加任务,而每个工作者线程会不断从jobs中取出一个Job进行执行,当jobs为空时,工作者线程进入等待状态。

增加一个job后,对工作队列jobs调用其notify()方法而不是notifyAll()方法,因为能够确定有工作者线程被唤醒,这是使用notify()方法将会比notifyAll()方法获得更小的开销(避免将等待队列中的线程全都移动到阻塞队列中)。

一个基于线程池技术的简单Web服务器

目前的浏览器都支持多线程访问,比如说在请求一个html页面的时候,页面中包含的图片资源,样式资源会被浏览器并发的获取,这样用户就不会遇到一直等到一个图片完全下载完才能继续查看文字内容的尴尬情况。

如果Web服务器是单线程的,多线程的浏览器也没有用武之地,因为服务器还是一个请求一个请求的按顺序处理。因此,大部分Web服务器都是支持并发访问的。常用的Java Web服务器,如Tomcat、Jetty,在其处理请求的过程中都使用到了线程池技术。

下面通过前面的线程池来构造一个简单的Web服务器,这个Web服务器用来处理http请求,目前只能处理简单的文本和jpg图片内容。这个Web服务器使用main线程不断接受客户端的socket连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处理多个客户端请求。

import util.DefaultThreadPool;
import util.ThreadPool;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleHttpServer {
    /**
     * 处理Http请求的线程池
     */
    private static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>();
    /**
     * 服务器根路径
     */
    private static String basePath = "F:/ftpshare/";
    private static ServerSocket serverSocket;
    /**
     * 服务器监听端口
     */
    private static int port = 8080;

    public static void setPort(int port) {
        if (port > 0) {
            SimpleHttpServer.port = port;
        }
    }

    public static void setBasePath(String basePath) {
        if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()) {
            SimpleHttpServer.basePath = basePath;
        }
    }

    /**
     * 启动服务器
     * @throws Exception IOException by ServerSocket#accept()
     */
    public static void start() throws Exception {
        serverSocket = new ServerSocket(port);
        Socket socket;
        while ((socket = serverSocket.accept()) != null) {
            //接受到一个客户端Socket,生成一个HttpRequestHandler提交给线程池执行
            threadPool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }

    public static void main(String[] args) throws Exception {
        SimpleHttpServer.start();
    }

    private static class HttpRequestHandler implements Runnable {
        private Socket socket;

        HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            String line;
            PrintWriter out = null;
            try (BufferedReader reader = new BufferedReader(new 
                                  InputStreamReader(socket.getInputStream()))) {
                out = new PrintWriter(socket.getOutputStream());
                while (true) {
                    String header = reader.readLine();
                    //根据相对路径计算出绝对路径
                    String filePath = basePath + header.split(" ")[1];
                    //如果请求资源后缀为jpg或者ico,则读取资源并输出
                    if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                        try (var in = new FileInputStream(filePath);
                             var baos = new ByteArrayOutputStream()) {
                            int i = 0;
                            while ((i = in.read()) != -1) {
                                baos.write(i);
                            }
                            byte[] array = baos.toByteArray();
                            out.println("HTTP/1.1 200 OK");
                            out.println("Server: Molly");
                            out.println("Content-Type: image/jpeg");
                            out.println("Content-Length: " + array.length);
                            out.println();
                            socket.getOutputStream().write(array, 0, array.length);
                        } catch (Exception e) {
                            out.println("HTTP/1.1 500");
                            out.println("");
                            out.flush();

                        }
                    } else {
                        try (var br = new BufferedReader(new InputStreamReader(new 
                                  FileInputStream(filePath)))) {
                            out.println("HTTP/1.1 200 OK");
                            out.println("Server: Molly");
                            out.println("Content-Type: text/html; charset=UTF-8");
                            out.println(" ");
                            while ((line = br.readLine()) != null) {
                                out.println(line);
                            }
                            out.println("");
                        } catch (Exception e) {
                            out.println("HTTP/1.1 500");
                            out.println("");
                            out.flush();
                        }
                    }
                    out.flush();
                }
            } catch (Exception e) {
                if (out != null) {
                    out.println("HTTP/1.1 500");
                    out.println("");
                    out.flush();
                }
                e.printStackTrace();
            } finally {
                close(out, socket);
            }
        }

    }

    /**
     * 关闭流或者Socket
     * @param closeables 资源列表
     */
    private static void close(Closeable... closeables) {
        if (closeables != null) {
            for (Closeable closeable : closeables) {
                try {
                    if (closeable != null) {
                        closeable.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接下来,通过一个测试对比来认识线程池技术带来服务器吞吐量的提高。我们准备了一个简单的html界面,内容如下:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>test</title>
</head>
<body>
  <h1>第一张图片</h1>
  <img src="1.jpg" align="middle" />
  <h1>第二张图片</h1>
  <img src="2.jpg" align="middle" />
  <h1>第三张图片</h1>
  <img src="3.jsp" align="middle" />
</body>
</html>

将SimpleHttpServer的根目录设定到该html页面所在目录,并启动,通过Apache HTTP server benchmarking tool(version 2.3)来测试不同线程数下,SimpleHttpServer的吞吐量表现。

测试场景是5000次请求,分10个线程并发执行,测试内容主要考察响应时间(越短越好)和每秒查询的数量(越高越好),结果如下:(机器CPU:i7-3635QM ,8G内存)

ab测试命令: ./ab -n 5000 -c 10 http://localhost:8080/xxxxx

线程池线程数量 1 5 10
响应时间(ms) 0.352 0.246 0.163
每秒查询数量 3076 4065 6123
测试完成时间(s) 1.625 1.230 0.816

可以看到,随着线程池中线程数量增加,SimpleHttpServer的吞吐量不断增大,响应时间不断变小,线程池的作用非常明显。但是,线程池中的线程数量并不是越多越好,具体数量需要评估每个任务处理时间,以及当前计算机的处理器能力和数量。使用线程过少无法发挥处理器性能,使用线程数量过多将会增加系统开销,起到反作用。

其他

关于Java并发工具的更多信息,可以查看java.util.concurrent包。在j.u.c包源码解析中也会进行介绍分析。jenkov的教程也包含了大部分j.u.c包的工具类: http://tutorials.jenkov.com/java-util-concurrent/index.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容