JAVA同步类容器线程是否安全以及介绍:
1,ArrayList,LinkedList线程不安全
Vector 对应 CopyOnWriteArrayList 线程安全
2,hashmap线程不安全
hashtable 对应 ConcurrentHashMap 线程安全
3,Queue下的阻塞队列和非阻塞队列:
ConcurrentLinkedQueue高性能,高并发,线程安全的非阻塞队列。
下面是阻塞队列:
多线程设计模式
1,future模式
代码demo
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
*/
public interface Data {
String getRequest();
}
=================================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
*/
public class FutureClient {
public Data request(String queryString) {
//返回futureData包装类,此时暂无数据
final FutureData futureData = new FutureData();
new Thread(new Runnable() {
@Override
public void run() {
RealData realData = new RealData(queryString);
futureData.setRealData(realData);
}
}).start();
return futureData;
}
}
===========================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
*/
public class FutureData implements Data {
private RealData realData;
private boolean isReady = false;
public synchronized void setRealData(RealData realData) {
//如果已经装载完毕直接返回
if (isReady) {
return;
}
this.realData=realData;
isReady=true;
notify();
}
@Override
public synchronized String getRequest() {
//如果没有装载好,一直阻塞
while (!isReady){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果装载好了返回真实数据,这里调用的是真实数据realData的getRequest()方法。
return this.realData.getRequest();
}
}
=================================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
* 真实数据类
*/
public class RealData implements Data {
private String result;
public RealData(String queryString){
System.out.println("根据"+queryString+"查询操作,一系列的操作省略。。。" );
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("操作完成");
result="查询结果";
}
@Override
public String getRequest() {
return result;
}
}
================================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
* 多线程Future模式 示例
*/
public class Main {
public static void main(String[] args) {
FutureClient client = new FutureClient();
//这里返回的暂时是空数据
Data data = client.request("查询请求");
System.out.println("请求发送成功");
System.out.println("数据正在处理中,此时可以做其它操作");
String result = data.getRequest();
System.out.println(result);
}
}
2,Master-Worker模式:
示例demo:
package com.wjb.demo.master_worker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by wjb on 2018/1/10.
*/
public class Master {
//需要一个装任务的容器,这里使用非阻塞队列
private ConcurrentLinkedQueue<Task> queue = new ConcurrentLinkedQueue();
//需要一个装worker的容器
private HashMap<String,Thread> map = new HashMap<String,Thread>();
//需要一个容器放所有worker处理完的数据
private ConcurrentHashMap<String,Task> resultMap = new ConcurrentHashMap<String,Task>();
public Master(Worker worker,int count){
worker.setMap(resultMap);
worker.setQueue(queue);
for(int i =0;i<count;i++){
this.map.put(Integer.toString(i),new Thread(worker));
}
}
public void submit(Task task){
this.queue.add(task);
}
public void execute(){
for (Map.Entry<String,Thread> m:map.entrySet()){
m.getValue().start();
}
}
public boolean isComplete(){
for (Map.Entry<String,Thread> m:map.entrySet()){
if (m.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
}
=================================================
package com.wjb.demo.master_worker;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by wjb on 2018/1/10.
*/
public class Worker implements Runnable{
private ConcurrentLinkedQueue<Task> queue;
private ConcurrentHashMap<String,Task> map;
public void setQueue(ConcurrentLinkedQueue queue){
this.queue=queue;
}
public void setMap(ConcurrentHashMap<String,Task> map){
this.map=map;
}
@Override
public void run() {
boolean flag = true;
while (flag){
Task task = this.queue.poll();
if(task == null){
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.map.put(Integer.toString(task.getId()),task);
}
}
}
================================================
package com.wjb.demo.master_worker;
/**
* Created by wjb on 2018/1/10.
*/
public class Task {
private int id;
private int price;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
=================================================
package com.wjb.demo.master_worker;
import java.util.Random;
/**
* Created by wjb on 2018/1/10.
* 多线程master-workersa模式
*/
public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(), 10);
Random random = new Random();
for(int i = 0;i<100;i++){
Task task = new Task();
task.setId(i);
task.setPrice(random.nextInt());
master.submit(task);
}
master.execute();
long start = System.currentTimeMillis();
while (true){
if (master.isComplete()){
long end = System.currentTimeMillis();
System.out.println("耗时:"+(end-start));
break;
}
}
}
}
3,生产者消费者模式:
示例demo
package com.wjb.demo.provider_consumer;
import java.util.concurrent.*;
/**
* Created by wjb on 2018/1/11.
*/
public class Main {
public static void main(String[] args) {
//无界的阻塞队列,个数无限制
BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>();
//缓存线程池,可以创建无限大数量的线程,没有任务时不创建线程,空闲线程存活时间默认60S
ExecutorService threadPool = Executors.newCachedThreadPool();
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
threadPool.execute(p1);
threadPool.execute(p2);
threadPool.execute(p3);
threadPool.execute(c1);
threadPool.execute(c2);
threadPool.execute(c3);
try {
Thread.sleep(3000);
p1.stop();
p2.stop();
p3.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
}
}
==============================================
package com.wjb.demo.provider_consumer;
/**
* Created by wjb on 2018/1/11.
*/
public class Data {
private int id;
private String name;
public Data(int id,String name){
this.id=id;
this.name=name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
===========================================
package com.wjb.demo.provider_consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by wjb on 2018/1/11.
*/
public class Provider implements Runnable{
private BlockingQueue queue;
private volatile boolean isRunning = true;
private static AtomicInteger count = new AtomicInteger();
public Provider(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
while (isRunning){
try {
Thread.sleep(1000);
int id = count.incrementAndGet();
Data data = new Data(id, Integer.toString(id));
System.out.println("当前线程:"+Thread.currentThread().getName()+"生产数据ID是:"+id);
if (!this.queue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("数据放入队列失败");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning=false;
}
}
===============================================
package com.wjb.demo.provider_consumer;
import java.util.concurrent.BlockingQueue;
/**
* Created by wjb on 2018/1/11.
*/
public class Consumer implements Runnable{
private BlockingQueue<Data> queue;
private volatile boolean isRunning = true;
public Consumer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
while (isRunning){
try {
Data data = this.queue.take();
Thread.sleep(1000);
System.out.println("消费的数据ID是:"+data.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Executor框架
使用有界队列示例demo
package com.wjb.demo.threadpoolexecutor;
/**
* Created by wjb on 2018/1/11.
*/
public class Task implements Runnable {
private int id;
private String name;
public Task(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"执行任务ID"+this.id);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
=============================================
package com.wjb.demo.threadpoolexecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by wjb on 2018/1/11.
* 自定义线程池,有界队列的使用策略
*/
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
/**
* 首先任务1进来被一个线程执行,任务2,3,4进来时会暂存到队列里去,任务5进来时,
* 队列已无法暂存,如果当前线程数小于最大线程数,则创建新线程执行此任务。
* 所以任务5进来时会新建一个线程执行。
如果还有任务6进来的话,此时会执行拒绝策略,JDK默认是AbortPolicy
*/
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3));
Task task1 = new Task(1, "任务1");
Task task2 = new Task(2, "任务2");
Task task3 = new Task(3, "任务3");
Task task4 = new Task(4, "任务4");
Task task5 = new Task(5, "任务5");
pool.execute(task1);
pool.execute(task2);
pool.execute(task3);
pool.execute(task4);
pool.execute(task5);
pool.shutdown();
}
}
============================================
package com.wjb.demo.threadpoolexecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Created by wjb on 2018/1/11.
* 自定义拒绝策略,这里可以自行处理,真实场景中,数据是不能丢失的,所以放在缓存中或是其它方式。
*/
public class MyRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString());
}
}
=============================================
执行结果:
Task{id=6, name='任务6'}
pool-1-thread-1执行任务ID1
pool-1-thread-2执行任务ID5
pool-1-thread-1执行任务ID3
pool-1-thread-2执行任务ID2
pool-1-thread-1执行任务ID4
使用无界队列示例DEMO
package com.wjb.demo.threadpoolexecutor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by wjb on 2018/1/11.
* 线程池 使用无界队列的策略
*/
public class ThreadPoolExecutor2 implements Runnable{
private int id;
private String name;
private static AtomicInteger count = new AtomicInteger();
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void run() {
try {
int id = count.incrementAndGet();
System.out.println(Thread.currentThread().getName()+"执行的任务ID:"+id);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
/**
* 来一个任务新建一个线程,当线程到corePoolSize时不再创建线程,所以这里的最大线程数一般和核心线程一样。所有新来的任务都暂存到队列中
* 无界队列不会拒绝新来的任务直到内存耗净。
*/
ExecutorService executor = new ThreadPoolExecutor(5,5,120, TimeUnit.SECONDS,queue);
for (int i = 1;i<= 20;i++){
executor.execute(new ThreadPoolExecutor2());
}
try {
Thread.sleep(1000);
System.out.println("队列大小"+queue.size());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}