并发编程艺术-8

本文主要介绍的是Java 并发编程的里面几个工具类: CountDownLatch, CyclicBarrier, Semaphore, Exchanger, 分析以及使用介绍。

(1) CountDownLatch 类用一个继承了AQS 抽象类作为内部类,实现了让一个线程或者多个线程等待到达到某一个条件,源码里面使用State 来记录,当state 等于 0 时,所有等待线程都被释放。

A code CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A
CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

Example :

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 
 * @author Eric
 *
 */
public class CountDownLatchUsage {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch latch = new CountDownLatch(2);

        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("This is the thread one");

            }

        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("This is the thread Two");

            }

        }).start();

        System.out.println("Going to release the latch");
        latch.countDown();//state = 1
        TimeUnit.SECONDS.sleep(10);
        latch.countDown();//state = 0

        System.out.println("The end");

    }

}

(2) CyclicBarrier 用于一些线程等待其他线程到达同一个栅栏点,最后一个线程到达之前,所有的线程都被阻塞,而且所有等待线程释放后,CyclicBarrier 可以被复用。

Example :

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierUsage {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

        CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {

            @Override
            public void run() {
                System.out.println("Let's go!!!");

            }
        });

        new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println("Thread 1 is waiting");
                try {
                    barrier.await();
                    System.out.println("Thread 1 is running");
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }

        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {

                System.out.println("Thread 2 is waiting");
                try {
                    barrier.await();
                    System.out.println("Thread 2 is running");
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }

        }).start();

        while (Thread.activeCount() > 1) {
            Thread.yield();
        }
    }

}

(3) Semaphore 用来维护一系列许可证,经常用来限制线程的数目,用于资源的访问,同样在内部有一个继承了AQS的Sync,用于公平NonfairSync和FairSync,两者的区别是

公平锁(首先检查等待队列中是否已有线程在等待)

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

非公平锁: 直接检查是否能获取许可,可以的话直接运行,否者进入等待队列。

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

Example :

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreUsage {

    public static void main(String[] args) {
        Semaphore sem = new Semaphore(2, true);
        
        
        new Thread(new Runnable(){

            @Override
            public void run() {
                
                try {
                    sem.acquire(1);
                    System.out.println("This is Thread 1, get the permit");
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    sem.release();
                }
                
                System.out.println("This is Thread 1");
            }
            
        }).start();
        
        new Thread(new Runnable(){

            @Override
            public void run() {

                
                try {
                    sem.acquire(1);
                    System.out.println("This is Thread 2, get the permit");
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    sem.release();
                }
                
                System.out.println("This is Thread 2");
                
            
            }
            
        }).start();
        
        new Thread(new Runnable(){

            @Override
            public void run() {

                
                try {
                    sem.acquire(1);
                    System.out.println("This is Thread 3, get the permit");
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    sem.release();
                }
                
                System.out.println("This is Thread 3");
                
            
            }
            
        }).start();
        
        new Thread(new Runnable(){

            @Override
            public void run() {

                
                try {
                    sem.acquire(1);
                    System.out.println("This is Thread 4, get the permit");
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    sem.release();
                }
                
                System.out.println("This is Thread 4");
                
            }
            
        }).start();
        
        new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    sem.acquire(1);
                    System.out.println("This is Thread 5, get the permit");
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    sem.release();
                }
                
                System.out.println("This is Thread 5");
            
            }
            
        }).start();
        
        new Thread(new Runnable(){

            @Override
            public void run() {

                try {
                    sem.acquire(1);
                    System.out.println("This is Thread 6, get the permit");
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    sem.release();
                }
                System.out.println("This is Thread 6");
            }   
        }).start();
    }

}

(4) Exchanger: 用来实现线程之间在同步点交换数据,用于基因算法以及管道的设计。

A synchronization point at which threads can pair and swap elements
within pairs. Each thread presents some object on entry to the
{@link #exchange exchange} method, matches with a partner thread,
and receives its partner's object on return. An Exchanger may be
viewed as a bidirectional form of a {@link SynchronousQueue}.
Exchangers may be useful in applications such as genetic algorithms
and pipeline designs.

核心算法:

   for (;;) {
   if (slot is empty) {                       // offer
        place item in a Node;
       if (can CAS slot from empty to node) {
          wait for release;
         return matching item in node;
       }
     }
     else if (can CAS slot from node to empty) { // release
       get the item in node;
     set matching item in node;
       release waiting thread;
      }
      // else retry on CAS failure
    }

Example :

class FillAndEmpty {
    Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
    DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...
 
    class FillingLoop implements Runnable {
      public void run() {
        DataBuffer currentBuffer = initialEmptyBuffer;
        try {
          while (currentBuffer != null) {
            addToBuffer(currentBuffer);
            if (currentBuffer.isFull())
              currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
      }
    }
 
    class EmptyingLoop implements Runnable {
      public void run() {
      DataBuffer currentBuffer = initialFullBuffer;
        try {
          while (currentBuffer != null) {
            takeFromBuffer(currentBuffer);
          if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
       }
        } catch (InterruptedException ex) { ... handle ...}
      }
    }

    void start() {
    new Thread(new FillingLoop()).start();
    new Thread(new EmptyingLoop()).start();
    }
 }}

Exception one(会一直waiting,找找原因):


import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ExchangerUsage {

    
    private static Exchanger<String> data = new Exchanger<String>();
    
    
    public static void main(String[] args) {
        
        ExchangerServer server = new ExchangerServer();
        ExchangerClient client = new ExchangerClient();
        
        server.run();
        client.run();
    }
    
    
    
    private static class ExchangerServer implements Runnable{

        @Override
        public void run() {
            String A = "A";
                    try {
                        try {
                            System.out.println("Data from client" + data.exchange(A,5,TimeUnit.SECONDS));
                        } catch (TimeoutException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                
            }
            
        }
        
        
        
        
    }
    
    
    
    private static class ExchangerClient implements Runnable{

        @Override
        public void run() {

            String B = "B";
                    try {
                        try {
                            System.out.println("Data from client" + data.exchange(B,5,TimeUnit.SECONDS));
                        } catch (TimeoutException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                
            }
            
        
        }
    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容