第五章 并行模式与算法

5.1 探讨单例模式

一种对象创建模式,用于产生一个对象的具体实例,确保系统中一个类只产生一个实例。
好处:
1.对于频繁使用的对象,可以省略new操作花费的时间。
2.由于new操作次数减少,系统内存使用频率降低,减轻GC压力,缩短GC停顿时间。
单例的实现

public class Singleton{
  public static int STATUS = 1;
  private Singleton(){  //构造方法为私有方法
    System.out.println("Singleyon is create");
}

  private static Singleton instance = new Singleton();
  public static Singleton getInstance(){
    return instance;  
}
}

当在任何地方引用STATUS会导致instance实例被创建, 但类初始化只有一次,因此实例instance只会被创建一次。
下面通过锁机制使得只会在第一次使用instance时创建对象:

public class LazySingleton{
    private LazySingleton(){
      System.println("LazySingleton is create");
}
    private static LazySingleton instance = null;
    public static synchronized LazySingleton getInstance(){
      if(instance == null)
          new LazySingleton();
      return instance;
}
}

这种方法好处:充分实现了加载延迟,只在真正需要时创建对象。
坏处:并发环境下加锁,竞争激烈的场合对性能产生一定影响。

双重检查模式:

public class StaticSingleton(){
    private StaticSingleton(){
      System.out.println("StaticSingleton is create");
}
private static class SingletonHolder{
      private static StaticSingleton instance = new StaticSingleton();
}

private static StaticSingleton getInstance(){
      return SingletonHolder.instance;
}
}

5.2 不变模式

核心思想:一个对象一旦被创建,它的内部状态将永远不会改变。
使用场景:
1.对象创建后,其内部状态和数据不再发生任何变化。
2.对象需要被共享,被多线程频繁访问。

java中不变模式的实现:
1.去除setter方法及所有修改自身属性的方法。
2.将所有属性设置成私有,并用final标记,确保其不可修改。
3.确保自类可以重载修改它的行为。
4.有一个可以完整创建对象的构造函数。

5.3 生产者-消费者模式

实现类图

生产者线程实现:

package chapter5.PC;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue<PCData> queue;                             //内存缓存区
    private static AtomicInteger count = new AtomicInteger();        //总数,原子操作
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<PCData> queue) {
        this.queue = queue;
    }

    public void run() {
        PCData data = null;
        Random r = new Random();

        System.out.println("start producer id="+Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data+" is put into queue");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.err.println("failed to put data:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop() {
        isRunning = false;
    }
}

消费者实现:从BlockingQueue中取出PCData对象

package chapter5.PC;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue<PCData> queue;                             //内存缓存区
    private static AtomicInteger count = new AtomicInteger();        //总数,原子操作
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<PCData> queue) {
        this.queue = queue;
    }

    public void run() {
        PCData data = null;
        Random r = new Random();

        System.out.println("start producer id="+Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data+" is put into queue");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.err.println("failed to put data:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop() {
        isRunning = false;
    }
}

PCData对象作为生产者和消费者之间的共享数据模型:

package chapter5.PC;

public final class PCData {
    private  final int intData;
    public PCData(int d){
        intData=d;
    }
    public PCData(String d){
        intData=Integer.valueOf(d);
    }
    public int getData(){
        return intData;
    }
    @Override
    public String toString(){
        return "data:"+intData;
    }
}

主函数中,创建3个生产者3个消费者,让他们协同工作。

package chapter5.PC;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class PCMain {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
        service.execute(consumer3);
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
        Thread.sleep(3000);
        service.shutdown();
    }
}
运行结果

5.4 高性能生产者-消费者:无锁的实现

5.4.1 无锁的缓存框架:Disruptor

能极大的提高系统性能。
使用环型队列,队列大小为2的整数倍。
PCData对象:

package chapter5.disruptor;

public class PCData
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
    public long get(){
        return value;
    }
}

消费者实现来自Disrupter框架的WorkHandler接口。onEvent()方法为框架的回调方法。
PCData对象的工厂类:

package chapter5.disruptor;

import com.lmax.disruptor.EventFactory;

public class PCDataFactory implements EventFactory<PCData>
{
    public PCData newInstance()
    {
        return new PCData();
    }
}

生产者类

package chapter5.disruptor;

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class Producer
{
    private final RingBuffer<PCData> ringBuffer;

    public Producer(RingBuffer<PCData> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  //得到下一个可用的序列号
        try
        {
            PCData event = ringBuffer.get(sequence); // 通过序列号得到对象实体
            event.set(bb.getLong(0)); 
        }
        finally
        {
            ringBuffer.publish(sequence); //发布数据
        }
    }
}

其中RingBuffer为环形缓冲区,pushData()将传入的ByteBuffer对象中数据提取出来,并装载到环形缓冲区。
主函数

package chapter5.disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class PCMain
{
    public static void main(String[] args) throws Exception
    {
        Executor executor = Executors.newCachedThreadPool();
        PCDataFactory factory = new PCDataFactory();
        // ringBuffer的大小必须是2的整数倍
        int bufferSize = 1024;
        Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,
                bufferSize,
                executor,
                ProducerType.MULTI,
                new BlockingWaitStrategy()
        );
       
        disruptor.handleEventsWithWorkerPool(
                new Consumer(),
                new Consumer(),
                new Consumer(),
                new Consumer());
        disruptor.start();

        RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.pushData(bb);
            Thread.sleep(100);
            System.out.println("add data "+l);
        }
    }
}
运行结果

5.4.3 Disrupter框架的4种策略
BlockingWaitStrategy:默认策略。使用锁和条件进行数据的监控和线程的唤醒,最节省CPU,但在高并发中性能下降明显。

SleepWaitStrategy:它会在循环中不断等待数据。会进行自旋等待,如果不成功,使用Thread.yield()让出CPU,并进行线程休眠,以确保不占用太多CPU数据。
这种策略延迟较高,但对生产者影响较小。适合场景为异步日志等等。

YieldingWaitStrategy:消费者线程会不断循环监控缓存区的变化。消费者线程变成了一个内部执行了Thread.yeild()方法的死循环。最后有多于消费者线程的CPU核数。这种策略用于低延迟的场合。

BusySpinWaitStrategy:消费者线程尽最大努力监控缓存区变化。会占用所有CPU资源,CPU的物理核数必须大于消费者线程核数,否则可能会无法工作。这种策略适用于对延迟要求非常高的系统。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容