es里如何使用使用的?
发送响应之后,需要触发es内部的相关逻辑.
Netty4HttpChannel里的sendResponse.
@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
//channel.
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
}
channel执行完writeAndFlush.
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
es把自己的ActionListener和channel作为入参,需要channe,是因为promise是从channel构建的.
/**
* Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener}
* on its completion.
*
* @param listener lister to invoke
* @param channel channel
* @return write promise
*/
public static ChannelPromise addPromise(ActionListener<Void> listener, Channel channel) {
//newPromise
ChannelPromise writePromise = channel.newPromise();
//添加监听器
writePromise.addListener(f -> {
//成功
if (f.isSuccess()) {
listener.onResponse(null);
} else {
//失败
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
//返回promise,传给netty.
return writePromise;
}
然后netty处理完之后会setSuccess,就会回调es的actionListener.
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
最终会执行notifyListener0.因为l里
@SuppressWarnings({ "unchecked", "rawtypes" })
static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
es封装好的需要回调的方法.
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
上面是一个简单的调用链路,下面我们来分析下netty的ChannelPromise的设计.
ChannelPromise
extends ChannelFuture, Promise<Void>
Promise
这个是netty的扩展了jdk的Future.因为jdk的Future太简陋了,需要添加一些监听,等待的方法.
extends Future<V>
ChannelFuture
The result of an asynchronous Channel I/O operation.(异步Channel的IO操作的结果).
All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a ChannelFuture instance which gives you the information about the result or status of the I/O operation.
netty里所有的io操作都是异步的,这就意味着,任何一个IO调用会立马返回,且不保证请求io的操作一点会在调用后完成,相反,调用方会接受到一个ChannelFuture实例,这个实例会给你一些关于这个io操作的结果或者状态的信息.
A ChannelFuture is either uncompleted or completed. When an I/O operation begins, a new future object is created. The new future is uncompleted initially - it is neither succeeded, failed, nor cancelled because the I/O operation is not finished yet. If the I/O operation is finished either successfully, with failure, or by cancellation, the future is marked as completed with more specific information, such as the cause of the failure. Please note that even failure and cancellation belong to the completed state.
一个ChannelFuture要么是未完成的要么是已完成的.当io操作开始的时候,一个新的future就被创建了.这个future刚开始是初始化成未完成的-既不是成功也不是失败,也不是被取消的,因为这个io操作目前还没有完成呢.如果这个io槽子成功了失败了或者被取消了,这个future会被标记成完成,还会携带一些特殊的信息,比如失败的原因.请注意失败和取消都是完成的状态.
实现DefaultPromise
先看一下抽象父类的AbstractFuture,只有两个方法,一个get,一个带超时的get
@Override
public V get() throws InterruptedException, ExecutionException {
//子类实现
await();
//子类实现
Throwable cause = cause();
//子类实现
if (cause == null) {
//子类实现.
return getNow();
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
//超时的await
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
DefaultPromise
核心变量
//
private static final int MAX_LISTENER_STACK_DEPTH = 8;
//
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
//
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class.getName() + ".UNCANCELLABLE");
//
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException());
//
private final EventExecutor executor;
//结果
private volatile Object result;
//监听者
private Object listeners;
//
private LateListeners lateListeners;
//
private short waiters;
cause实现
@Override
public Throwable cause() {
Object result = this.result;
if (result instanceof CauseHolder) {
return ((CauseHolder) result).cause;
}
return null;
}
await实现
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
synchronized (this) {
//死循环,等待完成.
while (!isDone()) {
//
checkDeadLock();
incWaiters();
try {
//Object的wait
wait();
} finally {
decWaiters();
}
}
}
return this;
}
getNow实现
@Override
@SuppressWarnings("unchecked")
public V getNow() {
//获取结果
Object result = this.result;
//如果失败了或者==SUCCESS那么返回null
if (result instanceof CauseHolder || result == SUCCESS) {
return null;
}
//直接返回结果
return (V) result;
}
为什么result == SUCCESS会返回null?
因为setSuccess0里
if (result == null) {
this.result = SUCCESS;
} else {
this.result = result;
}
核心中的核心setSuccess方法
本质上就是,如果其他的业务完成了,调用了setSuccess方法,那么就调用回调方法.
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
非常简单的setSuccess0
private boolean setSuccess0(V result) {
//已完成的不通知
if (isDone()) {
return false;
}
synchronized (this) {
// Allow only once.
if (isDone()) {
return false;
}
if (result == null) {
this.result = SUCCESS;
} else {
this.result = result;
}
//通知等待的线程.
if (hasWaiters()) {
notifyAll();
}
}
return true;
}
然后就是通知监听者了
private void notifyListeners() {
//这个方法不需要加锁
// This method doesn't need synchronization because:
// 1) This method is always called after synchronized (this) block.
// Hence any listener list modification happens-before this method.
// 2) This method is called only when 'done' is true. Once 'done'
// becomes true, the listener list is never modified - see add/removeListener()
Object listeners = this.listeners;
if (listeners == null) {
return;
}
//
EventExecutor executor = executor();
//如果是线程组中的线程.
if (executor.inEventLoop()) {
//
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
//
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0(this, (DefaultFutureListeners) listeners);
} else {
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
notifyListener0(this, l);
}
} finally {
this.listeners = null;
threadLocals.setFutureListenerStackDepth(stackDepth);
}
//返回
return;
}
}
//如果是外部线程调用.
if (listeners instanceof DefaultFutureListeners) {
//
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
//executor执行
execute(executor, new Runnable() {
@Override
public void run() {
notifyListeners0(DefaultPromise.this, dfl);
DefaultPromise.this.listeners = null;
}
});
} else {
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
//
execute(executor, new Runnable() {
@Override
public void run() {
notifyListener0(DefaultPromise.this, l);
DefaultPromise.this.listeners = null;
}
});
}
}
notifyListeners0
单个和多个的区别.
static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
private static void notifyListeners0(Future<?> future, DefaultFutureListeners listeners) {
final GenericFutureListener<?>[] a = listeners.listeners();
final int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(future, a[i]);
}
}
总结
总体来说这个类的逻辑还是很简单的,但是这个类的设计还是蛮顶的.