dubbo-重要组件
/** 封装请求实体类的信息和参数 包含Invoker
* Invocation. (API, Prototype, NonThreadSafe)
*/
public interface Invocation {
String getMethodName();
Class<?>[] getParameterTypes();
Object[] getArguments();
Map<String, String> getAttachments();
String getAttachment(String key);
String getAttachment(String key, String defaultValue);
Invoker<?> getInvoker();
}
/** Invocation 基础实现类
* RpcInvocation.
*/
public class RpcInvocation implements Invocation, Serializable {
private String methodName;
private Class<?>[] parameterTypes;
private Object[] arguments;
private Map<String, String> attachments;
private transient Invoker<?> invoker;
public RpcInvocation()
public RpcInvocation(Invocation invocation, Invoker<?> invoker)
public RpcInvocation(Invocation invocation)
public RpcInvocation(Method method, Object[] arguments)
public RpcInvocation(Method method, Object[] arguments, Map<String, String> attachment)
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments)
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[]
arguments,Map<String,String> attachments)
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String,
String> attachments, Invoker<?> invoker)
}
/** RpcInvocation 基础实现类 增加一个功能decode decode请求实体
* RpcInvocation.
*/
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
private static final Logger log = LoggerFactory.getLogger(DecodeableRpcInvocation.class);
private Channel channel;
private byte serializationType;
private InputStream inputStream;
private Request request;
private volatile boolean hasDecoded;
public DecodeableRpcInvocation(Channel channel, Request request, InputStream is, byte id)
public void decode() throws Exception { }
@Override
public Object decode(Channel channel, InputStream input) throws IOException
}
/**
*一个节点
*/
interface Node extends Node
URL getUrl();
boolean isAvailable();
void destroy();
/**
*Invoker = protocol ref(Class<T> type, URL url)
*/
interface Invoker{
Class getInterface();
Result invoke(Invocation invocation)throws RpcException;
}
/**
*Invoker 基础实现类
*/
abstract class AbstractInvoker implements Invoker<T>
public AbstractInvoker(Class<T> type, URL url)
public AbstractInvoker(Class<T> type, URL url, String[] keys)
public AbstractInvoker(Class<T> type, URL url, Map<String, String> attachment)
private static Map<String, String> convertAttachment(URL url, String[] keys) { }
@Override
public Class<T> getInterface()
@Override
public URL getUrl() { }
@Override
public boolean isAvailable() { }
protected void setAvailable(boolean available)
@Override
public void destroy() { }
public boolean isDestroyed() {return destroyed.get(); }
@Override
public String toString() { return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().toString()); }
@Override
public Result invoke(Invocation inv) throws RpcException { return doInvoke(invocation); }
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
/**
* 包装了URL和Invoker
*/
public class InvokerWrapper<T> implements Invoker<T>
private final Invoker<T> invoker;
private final URL url;
public InvokerWrapper(Invoker<T> invoker, URL url)
@Override
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
/**
* 包装了URL和Invoker
*/
public static class InvokerDelegete<T> extends InvokerWrapper<T>
private final Invoker<T> invoker;
public InvokerDelegete(Invoker<T> invoker, URL url) {
super(invoker, url);
this.invoker = invoker;
}
public Invoker<T> getInvoker() { if (invoker instanceof InvokerDelegete) {
return ((InvokerDelegete<T>) invoker).getInvoker();
} else {
return invoker;}}
/**
* 集成了AbstractInvoker AbstractInvoker父类会被调用invoke(子类doinvoke)
* protocol ref- getClients(url) 会调用netty 连接服务端
*/
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
private final AtomicPositiveInteger index = new AtomicPositiveInteger();
private final String version;
private final ReentrantLock destroyLock = new ReentrantLock();
private final Set<Invoker<?>> invokers;
public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) {
this(serviceType, url, clients, null);}
public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});}
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
}
}
/**
* 继承Invoker AbstractInvoker父类会被调用invoke(子类doinvoke)
*
*/
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected final Directory<T> directory;
private volatile Invoker<T> stickyInvoker = null;
public AbstractClusterInvoker(Directory<T> directory) {
this(directory, directory.getUrl());
}
public AbstractClusterInvoker(Directory<T> directory, URL url) {
}
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
throws RpcException
public Result invoke(final Invocation invocation) throws RpcException {
return doInvoke(invocation, invokers, loadbalance); }
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
}
/**
* 继承AbstractClusterInvoker AbstractClusterInvoker父类会被调用 invoke( doinvoke)
*
*/
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
}
}
}
/**
* Directory -目录
*
*/
public interface Directory<T> extends Node {
/**
* get service type.
*
* @return service type.
*/
Class<T> getInterface();
/**
* list invokers.
*
* @return invokers
*/
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
/**
* AbstractDirectory继承Directory
*
*/
public abstract class AbstractDirectory<T> implements Directory<T> {
private final URL url;
private volatile boolean destroyed = false;
private volatile URL consumerUrl;
private volatile List<Router> routers;
public AbstractDirectory(URL url) {
this(url, null);
} public AbstractDirectory(URL url, List<Router> routers) {
this(url, url, routers);
}public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null)
throw new IllegalArgumentException("url == null");
this.url = url;
this.consumerUrl = consumerUrl;
setRouters(routers);
}
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
}
}
return invokers;
}
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private static final ConfiguratorFactory configuratorFactory =
ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
private final String serviceKey; // Initialization at construction time, assertion not null
private final Class<T> serviceType; // Initialization at construction time, assertion not null
private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
private final URL directoryUrl;
private final String[] serviceMethods;
private final boolean multiGroup;
private Protocol protocol; // Initialization at the time of injection, the assertion is not null
private Registry registry; // Initialization at the time of injection, the assertion is not null
private volatile boolean forbidden = false;
private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private volatile Map<String, Invoker<T>> urlInvokerMap;
private volatile Map<String, List<Invoker<T>>> methodInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
public RegistryDirectory(Class<T> serviceType, URL url) {
super(url);
if (serviceType == null)
throw new IllegalArgumentException("service type is null.");
if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
throw new IllegalArgumentException("registry serviceKey is null.");
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
}
public static List<Configurator> toConfigurators(List<URL> urls) {
if (urls == null || urls.isEmpty()) {
return Collections.emptyList();
}
List<Configurator> configurators = new ArrayList<Configurator>(urls.size());
for (URL url : urls) {
if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
configurators.clear();
break;
}
Map<String, String> override = new HashMap<String, String>(url.getParameters());
//The anyhost parameter of override may be added automatically, it can't change the judgement of changing url
override.remove(Constants.ANYHOST_KEY);
if (override.size() == 0) {
configurators.clear();
continue;
}
configurators.add(configuratorFactory.getConfigurator(url));
}
Collections.sort(configurators);
return configurators;
}
public void setProtocol(Protocol protocol) {
this.protocol = protocol;
}
@Override
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
/**
* Turn urls into invokers, and if url has been refer, will not re-reference.
*
* @param urls
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
@Override
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
}
本篇总结 -dubbo一个底层通信框架代码架构包含很多的包装设计模式和相互依赖 下面简述一下各个组件的作用和依赖关系
Invocation-RpcInvocation -DecodeableRpcInvocation
- Invocation 是一个请求实体信息接口 包含了调用的方法 参数等等基础信息, RpcInvocation是invocation的基础实现类 ,DecodeableRpcInvocation 主要多了一个功能decode方法 用于在服务端接收请求的时候把字节反序列到本身的对象的字段上 变成一个 readable的对象 ,同时Invocation又包含invoke 通常invoke 不被序列化
Invoker-AbstractInvoker -DubboInvoker
2.dubbo之间的相互引用特别多也很复杂 只要记清楚invoker 是需要调用服务target类的一个封装 由框架封装
AbstractInvoker 在invoke方法里面调用子类的doinvoke方法
for example dubboInvoke 继承了AbstractInvoker 所以dobboinvoke会有invoke方法 调用invoke的时候会
调子类的doinvoker doinvoke 会在当前invoke 使用 ExchangeClient 进行远程调用 返回一个result
Directory -AbstractDirectory-RegistryDirectory
目录 也可以叫字典 ,和上方同样的设计模式, RegistryDirectory里面包含一个map Map<String, List<Invoker<T>>> methodInvokerMap 通过invocation 参数筛选出合适的invoke