什么是线程拒绝策略
当线程池达到饱和状态时,新提交的任务需要一种处理方法,这也就出现了拒绝策略。Java中提供了四种默认的拒绝策略,CallerRunsPolicy ,AbortPolicy,DiscardPolicy,DiscardOldestPolicy。我们也可以通过实现RejectedExecutionHandler接口来定义自己的拒绝策略。
拒绝策略发生在什么时候
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 当检测到线程池不是在运行状态时,将会执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 当添加线程任务失败时,将会执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
从代码中可以看到reject(command)方法,是有两处地方被调用的。代码中也有标注:
1、当检测到线程池不是在运行状态时,将会执行拒绝策略。
2、当检测到线程池不是在运行状态时,将会执行拒绝策略。
四种拒绝策略
CallerRunsPolicy
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 判断线程池当前的状态,如果不是关闭状态,则直接运行该方法。
if (!e.isShutdown()) {
r.run();
}
}
}
AbortPolicy
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 直接抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy :不做任何处理
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy:判断当前线程池的状态,如果不是shutdown,则将队列中最老的任务弹出,加入信息任务。
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
其他拒绝策略
我们来看一下Dubbo中,拒绝策略是怎么实现的,可以参考到我们的项目中。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.threadpool.support;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.JVMUtil;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Abort Policy.
* Log warn info when abort.
*/
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 打印了线程的名字,Pool Size:线程池的大小,active:活跃数,core:核心线程数,max:最大线程数,largest:线程池中达到过的最大值。线程池当前的状态,shutdown,isTerminated,isTerminating。请求URL地址。
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
// 打印线程堆栈信息
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
long now = System.currentTimeMillis();
//dump every 10 minutes 限制两个堆栈信息打印的时间不能超过10分钟。
if (now - lastPrintTime < 10 * 60 * 1000) {
return;
}
// 通过Semaphore来控制同一时间只有一个线程在打印信息。
if (!guard.tryAcquire()) {
return;
}
// 将打印堆栈信息,放到一个单独的线程池中来执行。
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
String OS = System.getProperty("os.name").toLowerCase();
// window system don't support ":" in file name
if(OS.contains("win")){
sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
}else {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
}
String dateStr = sdf.format(new Date());
FileOutputStream jstackStream = null;
try {
// 设置输出打印文件的路径,文件名称。
jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr));
// 将堆栈信息写入文件中。
JVMUtil.jstack(jstackStream);
} catch (Throwable t) {
logger.error("dump jstack error", t);
} finally {
guard.release();
if (jstackStream != null) {
try {
jstackStream.flush();
jstackStream.close();
} catch (IOException e) {
}
}
}
lastPrintTime = System.currentTimeMillis();
}
});
}
}
我们再来看一下线程的堆栈信息是怎么获取的?
public class JVMUtil {
public static void jstack(OutputStream stream) throws Exception {
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
// 通过ThreadMxBean获的所有线程信息,方法中的两个true,true分别代表
Monitor锁:通常synchronized(this)类型的锁,Synchronized锁:ReentrantLock锁和ReentrantReadWriteLock锁。
for (ThreadInfo threadInfo : threadMxBean.dumpAllThreads(true, true)) {
stream.write(getThreadDumpString(threadInfo).getBytes());
}
}
该方法主要是将ThreadInfo中的线程信息打印出来,线程状态,线程名称,线程持有锁的状态,线程堆栈信息。
private static String getThreadDumpString(ThreadInfo threadInfo) {
StringBuilder sb = new StringBuilder("\"" + threadInfo.getThreadName() + "\"" +
" Id=" + threadInfo.getThreadId() + " " +
threadInfo.getThreadState());
if (threadInfo.getLockName() != null) {
sb.append(" on " + threadInfo.getLockName());
}
if (threadInfo.getLockOwnerName() != null) {
sb.append(" owned by \"" + threadInfo.getLockOwnerName() +
"\" Id=" + threadInfo.getLockOwnerId());
}
if (threadInfo.isSuspended()) {
sb.append(" (suspended)");
}
if (threadInfo.isInNative()) {
sb.append(" (in native)");
}
sb.append('\n');
int i = 0;
StackTraceElement[] stackTrace = threadInfo.getStackTrace();
MonitorInfo[] lockedMonitors = threadInfo.getLockedMonitors();
for (; i < stackTrace.length && i < 32; i++) {
StackTraceElement ste = stackTrace[i];
sb.append("\tat " + ste.toString());
sb.append('\n');
if (i == 0 && threadInfo.getLockInfo() != null) {
Thread.State ts = threadInfo.getThreadState();
switch (ts) {
case BLOCKED:
sb.append("\t- blocked on " + threadInfo.getLockInfo());
sb.append('\n');
break;
case WAITING:
sb.append("\t- waiting on " + threadInfo.getLockInfo());
sb.append('\n');
break;
case TIMED_WAITING:
sb.append("\t- waiting on " + threadInfo.getLockInfo());
sb.append('\n');
break;
default:
}
}
for (MonitorInfo mi : lockedMonitors) {
if (mi.getLockedStackDepth() == i) {
sb.append("\t- locked " + mi);
sb.append('\n');
}
}
}
if (i < stackTrace.length) {
sb.append("\t...");
sb.append('\n');
}
LockInfo[] locks = threadInfo.getLockedSynchronizers();
if (locks.length > 0) {
sb.append("\n\tNumber of locked synchronizers = " + locks.length);
sb.append('\n');
for (LockInfo li : locks) {
sb.append("\t- " + li);
sb.append('\n');
}
}
sb.append('\n');
return sb.toString();
}
}