- Deadline核心解决的问题,就是在client请求在遇到异常情况时的最大等待时间,减少不必要的阻塞。
-
GRPC中没有采用传统的timeout方式去处理,而是采用了Deadline机制,主要的区别大致如下:
先举一个超时的例子,设置超时时长为200毫秒,这时需要调用三个服务A/B/C,当处理到B的时候,已经超时了,这时client会抛出TimeoutException,但是这时请注意,只有client一端响应,实际server端还在工作!
实际这个时候server是否返回已经不重要了,因为client已经主动的断开调用了,虽然返回不重要,但是这样会造成很大的资源浪费。
结论:实际看到这,Deadline的要解决的核心问题已经暴露出来了,就是如何两端同步超时时间,如何将超时传播给其他(等待)服务。
client为自己设置监听器,此处为了体现client与server一致,所以EXCEED由server端报出
设计思路
- 当client产生请求时,将request中的deadline绑定到context,并启动一个定时任务,执行时间为当前时间+deadline时间。
- service按照依赖关系依次运行,如果超出deadline规定的时间,执行cancel任务,包括中断当前task,清除所有listeners等。
- 向server返回exceed异常,通知超时,client结束等待。
client设置deadline
stub.withDeadlineAfter(long duration, TimeUnit unit)
public final S withDeadlineAfter(long duration, TimeUnit unit) {
return build(channel, callOptions.withDeadlineAfter(duration, unit));
}
定义在
public abstract class AbstractStub<S extends AbstractStub<S>>
是一个抽象方法,实现在GRPC代码生成器生成的代码中
以下为example的实现
@Override
protected GreeterStub build(io.grpc.Channel channel,
io.grpc.CallOptions callOptions) {
return new GreeterStub(channel, callOptions);
}
只是在当前的channel和callOptions基础上新建一个stub对象.(如果反复设置会创建大量stub,会不会影响性能,这么设计的初衷是什么,为何不能复用.)
Context
Context为一个链表性的结构,每个Context会记录自己的父Context,Root的父Context为null.
每个线程的Context保存到ThreadLocal中
final class ThreadLocalContextStorage extends Context.Storage {
/**
* Currently bound context.
*/
private static final ThreadLocal<Context> localContext = new ThreadLocal<Context>();
server端绑定上下文
首先server端启动时会创建一个serverImpl
public final class ServerImpl extends io.grpc.Server implements WithLogId
该类在初始化的过程中,会创建Context.
@Override
public void streamCreated(
final ServerStream stream, final String methodName, final Metadata headers) {
final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
stream.statsTraceContext(), "statsTraceCtx not present from stream");
final Context.CancellableContext context = createContext(stream, headers, statsTraceCtx);
.......}
以下是createContext方法的内容
private Context.CancellableContext createContext(
final ServerStream stream, Metadata headers, StatsTraceContext statsTraceCtx) {
Long timeoutNanos = headers.get(TIMEOUT_KEY);
Context baseContext = statsTraceCtx.serverFilterContext(rootContext);
if (timeoutNanos == null) { 如果client没有指明timeout,则生成一个timeout为null的上下文,此举应该是为了向下兼容,之前不支持Deadline的概念,所以扩展了Context
return baseContext.withCancellation();
}
Context.CancellableContext context =
baseContext.withDeadlineAfter(timeoutNanos, NANOSECONDS, timeoutService); 还是启动一个监听程序,如果当前上线文的状态是超时状态,则自动取消(满足了取消传播的需求)
context.addListener(new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
Status status = statusFromCancelled(context);
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
// This should rarely get run, since the client will likely cancel the stream before
// the timeout is reached.
stream.cancel(status);
}
}
}, directExecutor());
return context;
}
server端也可以根据实际情况自己设置Context,可以参见API中 public Context attach() 和 public void detach(Context toAttach).
关于Deadline的设置,官方的一段注释写的非常清楚
/**
* If the parent deadline is before the given deadline there is no need to install the value
* or listen for its expiration as the parent context will already be listening for it.
*/
也就是说只有在新设定的deadline值在parent context之前才会创建新的销毁线程.
client端监听代码,这段在clientStreamImpl中,代码跳转太多,只贴上来最后取消的代码
/**
* Stores listener and executor pair. client端会启动一个监听线程,用来cancel掉这个任务
*/
private class ExecutableListener implements Runnable {
private final Executor executor;
private final CancellationListener listener;
private ExecutableListener(Executor executor, CancellationListener listener) {
this.executor = executor;
this.listener = listener;
}
private void deliver() {
try {
executor.execute(this);
} catch (Throwable t) {
log.log(Level.INFO, "Exception notifying context listener", t);
}
}
@Override
public void run() {
listener.cancelled(Context.this);
}
}
调用listener.cancelled(Context.this); 会对当前状态进行判断,并给出相应的status
Throwable cancellationCause = context.cancellationCause();
if (cancellationCause == null) {
return Status.CANCELLED;
}
if (cancellationCause instanceof TimeoutException) {
return Status.DEADLINE_EXCEEDED
.withDescription(cancellationCause.getMessage())
.withCause(cancellationCause);
}
//Status.CANCELLED,DEADLINE_EXCEEDED 给出取消的status参数,返回给调用者
@Override
protected void sendCancel(Status reason) {
synchronized (lock) {
if (cancelSent) {
return;
}
cancelSent = true;
if (pendingData != null) {
// stream is pending.
transport.removePendingStream(this); 把暂停的流给释放掉
// release holding data, so they can be GCed or returned to pool earlier.
requestHeaders = null;
for (PendingData data : pendingData) {
data.buffer.clear();
}
pendingData = null;
transportReportStatus(reason, true, new Metadata());
} else {
// If pendingData is null, start must have already been called, which means synStream has
// been called as well.
transport.finishStream(id(), reason, ErrorCode.CANCEL); 结束传输
}
}
}