Tars Java客户端-原理篇

概述

客户端两种应用场景

  • 已发布的Tars服务作为客户端调用其他Tars服务
  • 独立的应用程序访问Tars服务的接口,通常测试使用

主要流程

获取ServantProxy实例的流程

Communicator.stringToProxy((Class<T> clazz, String objName)


ServantProxyFactory.getServantProxy()


ObjectProxyFactory.getObjectProxy()
{
1、初始化servantProxyConfig,依据Comunicator的配置,和obj的名称
createServantProxyConfig
2、更新Endpoint,// 从主控获取服务对应的Endpoint信息,ip和端口
ObjectProxyFactory.updateServantEndpoints(ServantProxyConfig cfg)
3、创建LoadBalance,现在只支持一种默认的负载均衡策略
createLoadBalance(servantProxyConfig)
4、创建protocolInvoker,
createProtocolInvoker(api, objName, servantProxyConfig)
5、上述信息初始化一个新的ObjectProxy实例
new ObjectProxy<T>(api, objName, servantProxyConfig, loadBalance, protocolInvoker, communicator)
}


ObjectProxyFactory.createProtocolInvoker()
1、创建编解码类实例Codec,目前只支持tars协议,若接口类带有@Servant注释,即为tars协议

2、初始化TarsProtocolInvoker
protocolInvoker = new TarsProtocolInvoker<T>(api, servantProxyConfig, new ServantProtocolFactory(codec), communicator.getThreadPoolExecutor());


ServantProtocolInvoker(Class<T> api, ServantProxyConfig config, ProtocolFactory protocolFactory, ThreadPoolExecutor threadPoolExecutor)

ServantProtocolInvoker.initInvoker()
1、从objectname中解析出来所有服务地址列表,不同的服务用冒号分隔
AppName.ServerName.ObjName@tcp -h 172.0.1.1 -t 60000 -p 12345:tcp -h 172.0.1.1 -t 60000 -p 12346

2、每个URL创建Invoker
TarsProtocolInvoker.create()


TarsProtocolInvoker.create();
1、通过url创建客户端
getClients(url)
1.1 根据配置的连接数量,初始化多个ServantClient,每个ServantClient对应一个Session,也就是一个连接
initClient
2、初始化TarsInvoker

Rpc函数调用流程

根据proxy,method,参数调用函数
ObjectProxy.invoke(Object proxy, Method method, Object[] args)


1、将proxy,method,arg组装成InvokeContext
InvokeContext context = protocolInvoker.createContext(proxy, method, args);

2、通过负载均衡选择合适的invoker
Invoker<T> invoker = loadBalancer.select(protocolInvoker.getInvokers(), context);

3、调用invoke执行函数
invoker.invoke(context);


TarsInvoker.doInvokeServant(ServantInvokeContext inv)

1、根据方法名可以知道是同步调用还是异步调用
boolean isAsync = TarsHelper.isAsync(inv.getMethodName());

2、异步调用,调用对应的函数后返回
invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
return null;

3、同步调用,调用对应的函数返回结果
TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
return response.getResult();


TarsInvoker.invokeWithSync()

1、构造Request请求包
TarsServantRequest request = new TarsServantRequest(client.getIoSession());

2、发送请求
client.invokeWithSync(request);


ServantClient.invokeWithSync()

1、检查连接,若连接没有建立或者已经断开,则主动发起连接
ensureConnected();

2、创建一个Ticket,Ticket用户线程间通信,接收完Response后通知调用方,或者触发回调函数。
ticket = TicketManager.createTicket(request, session, this.syncTimeout);

3、发送请求,若为异步操作,则不会再等待返回结果
current.write(request);

4、等待返回结果
ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)

5、获取Response
response = ticket.response();

类及类之间的关系

Proxy相关类

image.png
  • ObjectProxy实际是实现了InvocationHandler接口和ServantProxy接口,最主要的是实现了InvocationHandler的invoke()方法,以及保存了支持invoke的相关变量。
  • ObjectProxyFactory 作为ObjectProxy的工厂类,完成ObjectProxy实例的创建。
  • ServantProxyFactory 作为客户端调用远程方法的Proxy实例的创建和缓存,其中
    ConcurrentHashMap<String, Object> cache
    管理了objName,与实现接口Proxy的对象实例,Rpc调用时使用该实例直接调用的。
    private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { clazz, ServantProxy.class }, objectProxy);
    }

是有ObjectProxy类实现了InvocationHandler接口,函数调用的操作是ObjectProxy实现invoke()函数完成的。

  • 总结
    ObjectProxyFactory 和ObjectProxyFactory 容易混淆,从名称根本不知道两者职责的区别。其实ObjectProxy和JAVA SDK的Proxy名称搞的比较相近的原因。如果将ObjectProxy改为ServantInvocationHandler感觉会清晰一些。

另外,Communicator中保存了,ServantProxyFactory和ObjectProxyFactory实例,而ServantProxyFactory和ObjectProxyFactory也保存了Communicator的实例,这种互相引用会导致类之间的关系不清晰。可以简单的提取Comunicator中用户的Context信息组成一个独立的类,其他使用这些信息的类,引用这个类获取相关信息。

Invoker相关类

image.png
  • Invoker 实现了invoke(),通过调用ServantClient进行网络通信,实现远程方法调用。
  • ProtocolInvoker 维护了多个连接到不同服务器的Invoker,方法调用时通过LoadBalance从中选出一个invoker发送请求。
  • 总结 感觉ProtocolInvoker,名字应该改为InvokerPool更形象一些。

Communicator相关类

  • CommunicatorFactory
ConcurrentHashMap<Object, Communicator> CommunicatorMap = new ConcurrentHashMap<Object, Communicator>();
Communicator communicator = null;

Communicator通过一个Map管理所有Communicator。同时CommunicatorFactory维护了一个默认的Communicator,Tars服务初始化的时候默认会初始化这个Communicator。

  • Communicator

对应的locator或者config,即一个Tars主控制器,一个Tars环境

client的配置文件格式如下:

<tars>
    <application>
        <client>
            locator=tars.tarsregistry.QueryObj@tcp -h 192.168.10.115 -p 17890
            sync-invoke-timeout=20000
            async-invoke-timeout=20000
            refresh-endpoint-interval=60000
            stat=tars.tarsstat.StatObj
            property=tars.tarsproperty.PropertyObj
            report-interval=60000
            modulename=TestApp.HelloJavaServer
        </client>
    </application>
</tars>

所以使用Tars服务中部署的方式Communicator通常是全局唯一,统一采用服务发布中模板的client配置

···
private volatile String id;
private volatile CommunicatorConfig communicatorConfig;
private volatile ThreadPoolExecutor threadPoolExecutor;
private final ServantProxyFactory servantProxyFactory = new ServantProxyFactory(this);
private final ObjectProxyFactory objectProxyFactory = new ObjectProxyFactory(this);

private final QueryHelper queryHelper = new QueryHelper(this);
private final StatHelper statHelper = new StatHelper(this);

private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean inited = new AtomicBoolean(false);

···

其他类

  • ClientPoolManager
    private final static ConcurrentHashMap<CommunicatorConfig, ThreadPoolExecutor> clientThreadPoolMap = new ConcurrentHashMap<CommunicatorConfig, ThreadPoolExecutor>();
    private final static ConcurrentHashMap<ServantProxyConfig, SelectorManager> selectorsMap = new ConcurrentHashMap<ServantProxyConfig, SelectorManager>();

管理了两个线程池

  • clientThreadPoolMap : 管理所有业务线程池,每个Communicator一个业务线程池
  • SelectorManager: 管理所有网络连接池,每个proxy一个网络连接池,这些网络连接都是连接的一个服务端地址。

参考资料

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。