Dubbo异步调用
Dubbo原生支持异步调用,但其中依然有坑。
异步调用依赖传递性
- 问题表现:如果consumer-A异步调用provider-B,而provider-B本身又调用了provider-C。当provider-B调用provider-C时,会变成异步。
- 问题原因:是否异步调用取决于RpcContext中async的值,其次才是服务本身的配置。当A调用B时,会把async=true传给B的RpcContext;B调用C时,虽然服务本身async=false,但RpcContext中async=true,自然也就成了异步调用
- 解决方式:参考:Dubbo异步方法调用里有个坑
异步回调
- 问题表现:Dubbo原生的异步回调,执行feature.get()当远程调用有值就返回,而不是当远程调用有值并且回调执行完毕再返回。
- 问题原因:可能设计的时候就是这么设计的,不认为需要等调执行完毕再返回。但对我们的代码书写造成了很大的不便,需要自己写代码去判断回调是否执行完毕。
- 解决方式:写几个工具类解决此问题。另外Dubbo原生的调用方式还是偏复杂(具体如何使用Dubbo的原生异步回调,请参阅这里:在Dubbo中实现更简单易用的异步),可以对其进行简化。
简化后的调用方式:
//DubboUtils.async()为调用异步工具类
//demoService.echo()为调用Dubbo服务的echo()方法
//DubboCallback.apply()为远程调用返回后的回调
DubboUtils.async(demoService.echo(String echo), new DubboCallback<String>() {
@Override
public void apply(String response) {
//此处写具体的回调内容
}
}).done();
工具类如下,或者你可以直接引用我的项目,地址:GitHub:xydonne/dubbo-async-utils:
/**
* @author DonneyYoung
*/
public class DubboFuture<T> implements Future<T> {
private Future<T> future;
private DubboCallback<T> callback;
public DubboFuture(Future<T> future) {
this.future = future;
}
public DubboFuture(Future<T> future, DubboCallback<T> callback) {
this.future = future;
this.callback = callback;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
@Override
public boolean isDone() {
return future.isDone();
}
@Override
public T get() throws InterruptedException, ExecutionException {
return future.get();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit);
}
public T done() {
if (null != callback)
callback.waitUntilDone();
try {
return get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
public T done(long timeout, TimeUnit unit) {
if (null != callback)
callback.waitUntilDone(unit.toMillis(timeout));
try {
return get(timeout, unit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
/**
* @author DonneyYoung
*/
public abstract class DubboCallback<T> implements ResponseCallback {
protected volatile boolean done = false;
public abstract void apply(T response);
@Override
@SuppressWarnings("unchecked")
public void done(Object result) {
try {
this.apply((T) ((Result) result).getValue());
} finally {
synchronized (this) {
done = true;
this.notifyAll();
}
}
}
@Override
public void caught(Throwable throwable) {
synchronized (this) {
done = true;
this.notifyAll();
}
}
public void waitUntilDone() {
waitUntilDone(0);
}
public void waitUntilDone(long timeout) {
synchronized (this) {
while (!done) {
try {
this.wait(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
/**
* Dubbo服务工具类
*
* @author DonneyYoung
*/
public class DubboUtils {
@SuppressWarnings("unchecked")
public static <E> DubboFuture<E> async(E response) {
return new DubboFuture<>((Future<E>) RpcContext.getContext().getFuture());
}
@SuppressWarnings("unchecked")
public static <E> DubboFuture<E> async(E response, DubboCallback<E> responseCallback) {
((FutureAdapter) RpcContext.getContext().getFuture()).getFuture().setCallback(responseCallback);
return new DubboFuture<>((Future<E>) RpcContext.getContext().getFuture(), responseCallback);
}
}
转载注明出处,我就不和你计较。
by Donney Young
http://www.jianshu.com/p/1ca2026500f8