Executors
public class MyThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = new ThreadPoolExecutor(5, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),new MyThreadFactory("mypool-" ));
// ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println(service);
service.shutdown();
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
TimeUnit.SECONDS.sleep(5);
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);
}
}
class MyThreadFactory implements ThreadFactory{
private String threadName;
private final AtomicInteger index = new AtomicInteger(1);
MyThreadFactory(String threadName){
this.threadName = threadName;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r,threadName + index.getAndAdd(1));
}
}
WorkStealingPool
public class WorkStealingPoolTest {
/**
* 线程数
*/
private static final int threads = 10;
/**
* 用于计算线程是否执行完毕
*/
CountDownLatch countDownLatch = new CountDownLatch(10);
/**
* newFixedThreadPool execute
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void test1() throws InterruptedException {
System.out.println("---start---");
ExecutorService executorService = Executors.newWorkStealingPool();
for (int i = 0; i < threads; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
System.out.println("---end---");
}
/**
* newFixedThreadPool submit Callable
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void test3() throws ExecutionException, InterruptedException {
System.out.println("--- start ---");
ExecutorService service = Executors.newWorkStealingPool();
for (int i = 0; i < threads; i++) {
FutureTask<?> futureTask = new FutureTask<>(() -> Thread.currentThread().getName());
service.submit(futureTask);
System.out.println(futureTask.get());
}
System.out.println("--- end ---");
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
ForkJoinPool
public class ForkJoinTest {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = r.nextInt(100);
}
// 第一种
System.out.println(Arrays.stream(nums).sum());
}
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) {
sum += nums[i];
}
System.out.println("from:" + start + " to:" + end);
} else {
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start,middle);
AddTask subTask2 = new AddTask(middle,end);
subTask1.fork();
subTask2.fork();
}
}
}
public static void main(String[] args) {
new AddTask(0,nums.length).compute();
try {
// 因为forkjoin 是守护线程,所以需要用阻塞方法,等待forkjoin完成。
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ForkJoinTest2 {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = r.nextInt(100);
}
// 第一种
System.out.println(Arrays.stream(nums).sum());
}
static class AddTask extends RecursiveTask<Long> {
int start, end;
AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end -start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) {
sum += nums[i];
}
System.out.println("from: " + start + " to:" + end);
return sum;
}
int middle = start + (end - start) / 2;
AddTask subTask1 = new AddTask(start,middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0,nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);
System.in.read();
}
}