这里源码主要学习服务的发布和引用
为什么要有服务的发布和引用
- Sofa中不同模块之间的spring context上下文是相互隔离的
- 两个模块之间的bean不能通过spring的依赖注入来直接调用
- Sofa通过将bean注册为服务,进行发布和引用,用来解决不同模块之间调用的问题。
- JVM 服务发布和引用:解决一个 SOFA 应用内部不同模块之间的调用问题(单机调用)
- RPC 服务发布和引用:解决多个 SOFA 应用之间不同模块的调用问题(跨机调用)
基本功能
VM服务发布和引用
用于同一个Sofa应用不同模块之间服务的相互调用,有XML配置和注解两个方式
1.XML配置
服务发布
- 创建接口和实现类:
public interface JvmService(){
String get();
}
public Class JvmServiceImpl extend JvmService(){
@Overrride
public String get(){
return "Jvm";
}
}
- 在Spring xml配置文件中,将实现类声明为一个bean
<bean id="jvmService" class="cn.test.impl.JvmServiceImpl"/>
- 将bean注册为sofa服务,本工程中的其他模块就可以引用了。
<sofa:service ref="jvmService" interface="cn.test.JvmService"/>
服务引用
- 在Spring xml配置文件中,引用上述发布出来的sofa服务
<sofa:reference id="jvmService" interface="cn.test.JvmService"/>
- 代码中直接用Spring注解
@Autowired
自动注入就可以使用
@Autowired
private JvmService jvmService;
2.注解
服务发布
- 创建接口和实现类,接口一样,实现类上打上
@SofaService
注解
@SofaService
public class JvmServiceImpl implements JvmService {
@Override
public String get() {
return "JVM";
}
}
- 将实现类注册为Spring Bean,和上面一样
服务引用
- 使用的时候,直接使用
@SofaReference
注解,运行的时候会将上面注册的sofa服务注入到此处的变量中。
@SofaReference
private JvmService jvmService;
RPC服务发布和引用
用于跨机调用不同Sofa应用之间服务的发布和引用,目前只支持XML配置方式
服务发布
- 创建接口和实现类RpcService和RpcServiceImpl,和以上JVM服务一样
- 使用XML的方式,在Spring xml配置文件中将实现类注册成为RPC服务(只能通过XML的方式配置,不能使用注解)
<sofa:service ref="rpcService " interface="cn.test.RpcService">
<sofa:binding.tr/><!-- 加上这句,表示这个服务被注册为基于taobao remoteing的RPC服务 -->
</sofa:service>
服务引用
- 在Spring xml配置文件中,将RPC服务引入过来
<sofa:reference id="rpcService" interface="cn.test.RpcService">
<sofa:binding.tr/><!-- 加上这句,表示要引入的这个服务是基于taobao remoteing的RPC服务 -->
</sofa:reference>
- 代码中直接用Spring注解
@Autowired
自动注入就可以使用
@Autowired
private RpcService rpcService;
使用总结
- @SofaService和@SofaReference只能在JVM服务发布和引用中使用,RPC服务的发布和引用必须采用XML的方式进行配置
- 因此平时尽量采用XML的方式进行服务的发布和引用,RPC服务就是在JVM的基础之上,增加<sofa:binding.tr/>即可。
源码分析
sofa4源代码地址:http://gitlab.alipay-inc.com/jiuzhou-middleware/sofa4
sofa-rpc源代码地址:http://gitlab.alipay-inc.com/jiuzhou-middleware/sofa-rpc
服务发布和引用
标签解析:生成bean
1.在xml中引入sofa命名空间
// sofa命名空间
xmlns:sofa="http://schema.alipay.com/sofa/schema/service"
// 该命名空间的唯一标示
xsi:schemaLocation="http://schema.alipay.com/sofa/schema/service
http://schema.alipay.com/sofa/sofa-service-4-0-0.xsd"
2.通知Spring加载sofa标签
sofa标签是通过Spring来进行解析和加载的,我们需要做的是告诉Spring如何解析这个标签,通知Spring的方式就是通过spring.schemas和spring.handlers两个文件来完成的。在sofa-runtime-service-x.x.x.jar这个jar包下的META-INF中定义了这两个文件:
http\://schema.alipay.com/sofa/common/sofa-service-4-0-0.xsd=com/alipay/sofa/service/config/common/sofa-service.xsd
http\://schema.alipay.com/sofa/schema/service=com.alipay.sofa.runtime.spring.SofaNamespaceHandler
可以看出,sofa:service和sofa:reference标签的解析是通过com.alipay.sofa.runtime.spring.SofaNamespaceHandler这个类来完成的(实际上这个类并没有进行解析,只是为相应标签注册解析器)。
3.解析sofa:service和sofa:reference标签
可以看出,SofaNamespaceHandler首先获得这个appname下的所有BeanDefinitionParser,将其注册到spring中,然后这些BeanDefinitionParser会根据不同的tagName对其进行解析。
public class SofaNamespaceHandler extends NamespaceHandlerSupport {
...//
public void init() {
registerBeanDefinitionParsers();
registerBeanDefinitionDecorators();
}
private void registerBeanDefinitionParsers() {
BeanDefinitionParserRegistry beanDefinitionParserRegistry = SofaFrameworkHolder.injector
.getInstance(PluginServiceManager.class).getBeanDefinitionParserRegistry(appName);
Set<BeanDefinitionParser> beanDefinitionParsers = beanDefinitionParserRegistry
.getBeanDefinitionParsers();
if (beanDefinitionParsers == null) {
return;
}
for (BeanDefinitionParser beanDefinitionParser : beanDefinitionParsers) {
if (!(beanDefinitionParser instanceof TagNameSupport)) {
continue;
}
String tagName = ((TagNameSupport) beanDefinitionParser).supportTagName();
registerBeanDefinitionParser(tagName, beanDefinitionParser);
}
}
}
- 服务发布的标签解析器
服务发布的标签是<sofa:service...>,也就是tagName是service,对应的解析器是由ServiceDefinitionParser这个类完成的。解析的结果是一个对应的springbean,也就是getBeanClass方法的返回值,这里是ServiceFactoryBean。
package com.alipay.sofa.runtime.service.spring;
public class ServiceDefinitionParser extends AbstractContractDefinitionParser {
private static final String REF = "ref";
private static final String BEAN_ID = "beanId";
private static final ServiceDefinitionParser instance = new ServiceDefinitionParser();
private ServiceDefinitionParser() { }
public static ServiceDefinitionParser getInstance() { return instance;}
@Override
protected void doParseInternal(Element element, ParserContext parserContext,
BeanDefinitionBuilder builder) {
String ref = element.getAttribute(REF);
builder.addPropertyReference(REF, ref);
builder.addPropertyValue(BEAN_ID, ref);
}
@Override
protected Class getBeanClass(Element element) {
return ServiceFactoryBean.class;
}
// 配置 <sofa:service> 的时候是不需要配置 id 属性的,所以我们必须给生成一个
@Override
protected boolean shouldGenerateIdAsFallback() { return true;}
@Override
public String supportTagName() { return "service";}
}
- 服务引用的标签解析器
服务引用的标签解析是由类ReferenceDefinitionParser完成的,解析的结果是ReferenceFactoryBean。
package com.alipay.sofa.runtime.service.spring;
public class ReferenceDefinitionParser extends AbstractContractDefinitionParser {
...//
@Override
protected Class getBeanClass(Element element) {
return ReferenceFactoryBean.class;
}
@Override
public String supportTagName() {
return "reference";
}
}
服务发布:生成&注册组件
服务发布解析结果是生成ServiceFactoryBean,这是一个工厂类,根据getObject方法来生成service对象实例,对象生成时会触发doAfterPropertiesSet进行初始化,生成一个sofa服务组件对象componentInfo,并通过register方法将组件注册到sofa上下文sofaRuntimeContext中。
package com.alipay.sofa.runtime.service.spring;
public class ServiceFactoryBean extends AbstractContractFactoryBean {
protected Object ref;
protected String beanId;
protected Service service;
@Override
protected void doAfterPropertiesSet() throws Exception {
// Issue #62 判断 Bean 的实现类上是否有 @SofaService 的注解
if (hasSofaServiceAnnotation()) {
throw new ServiceRuntimeException(
"Bean " + beanId + " of type " + ref.getClass()
+ " has already annotated by @SofaService,"
+ " can not be registered using xml. Please check it.");
}
Implementation implementation = new DefaultImplementation();
implementation.setTarget(ref);
service = buildService();
if (bindings.size() == 0) { bindings.add(new JvmBinding());}
for (Binding binding : bindings) { service.addBinding(binding);}
ComponentInfo componentInfo = new ServiceComponent(implementation, service,
sofaRuntimeContext);
sofaRuntimeContext.getComponentManager().register(componentInfo);
}
...//
@Override
public Object getObject() throws Exception {
return service;
}
...//
}
查看ComponentManager的register方法实现,可以看到依次调用服务组件的register(),resolve(),activate()三个方法,把组件对象塞到全局的registry中,程序运行中,所有模块都可以通过registry获得想要的服务。
public class ComponentManagerImpl implements ComponentManager {
@Override
public synchronized void register(ComponentInfo componentInfo) {
_register(componentInfo);
}
private ComponentInfo _register(ComponentInfo ci) {
ComponentName name = ci.getName();
if (isRegistered(name)) {
LOGGER.error("Component was already registered: " + name);
return getComponentInfo(name);
}
try {
ci.register();
} catch (Exception e) {
LOGGER.error("Failed to register component: " + ci.getName(), e);
return null;
}
LOGGER.info("Registering component: " + ci.getName());
try {
registry.put(ci.getName(), ci);
if (ci.resolve()) {
// 组册到类型容器中
_typeRegistry(ci);
boolean isLazyActivate;
if (ci instanceof AbstractComponent) {
isLazyActivate = ((AbstractComponent) ci).isLazyActivateInternal();
} else {
isLazyActivate = ci.isLazyActivate();
}
if (!isLazyActivate) {
ci.activate();
} else {
lazyComponents.put(ci.getName(), ci);
}
}
} catch (Exception e) {
ci.exception(e);
LOGGER.error("Failed to create the component " + ci.getName(), e);
}
return ci;
}
}
查看register(),resolve(),active()三个方法的实现,可以看出register和resolve两个方法只是简单的改变一下组件的状态
UNREGISTERED(0, "撤销注册"), REGISTERED(0, "已注册"), RESOLVED(0, "已解析"), ACTIVATED(0, "已激活");
public abstract class AbstractComponent implements ComponentInfo {
@Override
public void register() {
if (componentStatus != ComponentStatus.UNREGISTERED) {
return;
}
componentStatus = ComponentStatus.REGISTERED;
}
@Override
public boolean resolve() {
if (componentStatus != ComponentStatus.REGISTERED) {
return false;
}
componentStatus = ComponentStatus.RESOLVED;
return true;
}
@Override
public void activate() throws ServiceRuntimeException {
if (componentStatus != ComponentStatus.RESOLVED) {
return;
}
if (!checkImplementation()) {
return;
}
Object target = this.implementation.getTarget();
if (target instanceof ComponentLifeCycle) {
((ComponentLifeCycle) target).activate();
} else {
invokeWithArgs("activate");
}
componentStatus = ComponentStatus.ACTIVATED;
}
}
ServiceComponent重写的active方法如下,可以看到activate会获取到服务的 Binding 集,然后迭代加载每一个 Binding 对应 BindingAdapter,再调用 outBinding 对外暴露服务。
public class ServiceComponent extends AbstractComponent {
@Override
public void activate() throws ServiceRuntimeException {
activateBinding();
super.activate();
}
private void activateBinding() {
Object target = service.getTarget();
if (target == null) {
throw new ServiceRuntimeException(
"Must contains the target object whiling registering Service.");
}
// 存在绑定。
if (service.hasBinding()) {
Set<Binding> bindings = service.getBindings();
Iterator<Binding> it = bindings.iterator();
boolean allPassed = true;
while (it.hasNext()) {
Binding binding = it.next();
BindingAdapter<Binding> bindingAdapter = this.bindingAdapterManager.loadAdapter(binding.getBindingType());
if (bindingAdapter == null) {
...//异常
}
Object outBindingResult;
SofaLogger.BINDING_LOG.info(" <<Out Binding [" + binding.getBindingType() + "] Begins - " + service);
try {
outBindingResult = bindingAdapter.outBinding(service, binding, target, getContext());
} catch (Throwable t) {
allPassed = false;
SofaLogger.BINDING_LOG.error(" <<Out binding [" + binding.getBindingType() + "] for [" + service + "] occur exception, ", t);
continue;
}
if (outBindingResult != null && !Boolean.FALSE.equals(outBindingResult)) {
SofaLogger.BINDING_LOG.info(" <<Out Binding [" + binding.getBindingType() + "] Ends - " + service);
} else {
binding.setHealthy(false);
SofaLogger.BINDING_LOG.info(" <<Out Binding [" + binding.getBindingType() + "] Fails, Don't publish service - " + service);
}
}
}
SofaLogger.BINDING_LOG.info("Register Service - " + service);
}
}
JVM服务的Adater是JvmBindingAdapter,对于JVM服务来说是内部调用不需要暴露给外部,因此JVM服务的outBinding返回的是null,同一应用下其他模块调用该服务直接查本地注册中心registry即可。
public class JvmBindingAdapter implements BindingAdapter<JvmBinding> {
@Override
public Object outBinding(Object contract, JvmBinding binding, Object target,
SofaRuntimeContext sofaRuntimeContext) {
return null;
}
}
Rpc服务的Adapter是RpcBindingAdapter,对于RPC服务需要暴露给外部,因此RpcBindingAdapter的outBinding方法中会将服务推送到远程注册中心中。这样其他应用就可以通过注册中心调用该服务。
public abstract class RpcBindingAdapter<T extends RpcBinding> implements BindingAdapter<T> {
@Override
public Object outBinding(Object contract, T binding, Object target,
SofaRuntimeContext sofaRuntimeContext) {
if (!judgeBinding(contract, binding, target, sofaRuntimeContext)) {
return Boolean.FALSE;
}
String uniqueName = getUniqueName((Contract) contract);
ServiceMetadata metadata = getServiceMetadata(uniqueName);
if (metadata == null) {
if (notNeedPublishServiceSet.contains(uniqueName)) {
return Boolean.FALSE;
} else {
throw new ServiceRuntimeException(LogCodes.getLog(LogCodes.INFO_SERVICE_METADATA_IS_NULL, uniqueName));
}
}
try {
// 将服务推送到远程注册中心
this.getProcessService().provide(metadata);
} catch (Exception e) {
throw new ServiceRuntimeException(LogCodes.getLog(LogCodes.ERROR_PROXY_PUBLISH_FAIL), e);
}
return Boolean.TRUE;
}
}
服务引用:找到&注册组件
标签解析的结果是生成ReferenceFactoryBean,Reference的目标就是把sofa runtime context的一个组件变成spring中的一个bean,核心代码在于proxy = ReferenceRegisterHelper.registerReference(reference, sofaRuntimeContext,applicationContext);这一句,这句代码会从SOFA上下文中,拿到服务对应的代理对象。getObject方法会会返回该对象proxy。
package com.alipay.sofa.runtime.spring.factory;
public class ReferenceFactoryBean extends AbstractContractFactoryBean {
private Object proxy;
protected boolean localFirst = true;
protected boolean jvmService;
@Override
protected void doAfterPropertiesSet() throws Exception {
Reference reference = buildReference();
Assert
.isTrue(bindings.size() <= 1,
"Found more than one binding in <sofa:reference/>, <sofa:reference/> can only have one binding.");
if (bindings.size() == 0) {
bindings.add(new JvmBinding());
}
reference.addBinding(bindings.get(0));
proxy = ReferenceRegisterHelper.registerReference(reference, sofaRuntimeContext);
}
@Override
public Object getObject() throws Exception {
return proxy;
}
}
深入看到registerReference方法来看它是如何拿到服务对应的对象的。主要做了两个事:
- 给RPC服务引用自动增加一个JVM绑定,也就是一个RPC服务默认会有JVM和RPC两个binding,以达到优先本地JVM调用
- sofa的componentManager将该reference注册成一个组件,注册之后生成引用服务的代理对象。
package com.alipay.sofa.runtime.service.helper;
public class ReferenceRegisterHelper {
public static Object registerReference(Reference reference,
SofaRuntimeContext sofaRuntimeContext) {
Binding binding = (Binding) reference.getBindings().toArray()[0];
if (reference.jvmService() && binding.getBindingType().equals(JvmBinding.JVM_BINDING_TYPE)) {
throw new ServiceRuntimeException(
"jvm-service=\"true\" can not be used with JVM binding.");
}
// 优先本地调用
if (!binding.getBindingType().equals(JvmBinding.JVM_BINDING_TYPE)
&& isLocalFirst(reference, sofaRuntimeContext)) {
reference.addBinding(new JvmBinding());
}
ComponentManager componentManager = sofaRuntimeContext.getComponentManager();
ReferenceComponent referenceComponent = new ReferenceComponent(reference,
new DefaultImplementation(), sofaRuntimeContext);
// 保证若不存在,则注册为原子操作
// 去掉此处 ReferenceRegisterHelper 的锁,此处的锁不需要,ComponentManager 中已带锁,背景见 http://gitlab.alipay-inc.com/jiuzhou-middleware/sofa4/issues/257
// 如果component已注册,返回已注册的component
if (componentManager.isRegistered(referenceComponent.getName())) {
return componentManager.getComponentInfo(referenceComponent.getName())
.getImplementation().getTarget();
}
ComponentInfo componentInfo = componentManager.registerAndGet(referenceComponent);
return componentInfo.getImplementation().getTarget();
}
}
再查看ComponentManager的registerAndGet方法,可以看到也是调用了之间注册组件的_register方法。register() -> resolve() -> activate() ,其中register和resolve也只是改变状态,代理对象的生成在active方法中完成。
public class ComponentManagerImpl implements ComponentManager {
@Override
public void register(ComponentInfo componentInfo) {
_register(componentInfo);
}
}
引用组件ReferenceComponent的active实现如下,代理对象的生成与binding有很大关系,不同类型的binding会生成不同类型的代理对象(JVM和RPC)。如果reference上只有一个binding,这只要使用这个binding生成代理对象即可;如果有多个binding,则优先使用jvm binding来生成本地调用的代理对象,其他类型的binding生成的远程调用的代理对象作为jvm binding生成的对象的备选,这样做也是为了优先本地调用,如果本地代理对象不存在,则会使用远程代理对象。
public class ReferenceComponent extends AbstractComponent {
@Override
public void activate() throws ServiceRuntimeException {
if (reference.hasBinding()) {
Binding candidate = null;
Set<Binding> bindings = reference.getBindings();
// 如果binding个数大于1,说明设置了jvm优先调用
if (bindings.size() == 1) {
candidate = bindings.iterator().next();
} else if (bindings.size() > 1) {
// 有多个说明设置了local-first=true,要求本地调用优先
Object backupProxy = null;
for (Binding binding : bindings) {
if ("jvm".equals(binding.getName())) {
candidate = binding;
} else {
backupProxy = createProxy(reference, binding);
}
}
if (candidate != null) {
// 一般不会出现 candidate 为 null 的情况,一个 Reference 的绑定,至少会有一个 JVM 绑定
((JvmBinding) candidate).setBackupProxy(backupProxy);
}
}
Object proxy = null;
// 生成代理对象
if (candidate != null) {
proxy = createProxy(reference, candidate);
}
if (proxy != null) {
try {
// Spring xml 引用服务, 具体参考 http://gitlab.alipay-inc.com/alipay-sofa/sofa-dynamic-module-runtime/issues/6
// 这里直接 new 空的 ApplicationContext 单纯为了兼容 XTS 使用的老接口
InterfaceMode interfaceMode = reference.getInterfaceMode();
if (interfaceMode.equals(InterfaceMode.spring)) {
proxy = postProcessReferenceProxy(reference, proxy,
new GenericApplicationContext());
} else {
proxy = postProcessReferenceProxy(reference, proxy, null);
}
} catch (Throwable e) {
SofaLogger.BINDING_LOG.error(
"Failed to invoke post process reference, use origin proxy instead.", e);
}
}
this.implementation = new DefaultImplementation();
implementation.setTarget(proxy);
}
publishAsService(reference, implementation.getTarget());
super.activate();
latch.countDown();
}
}
那么,如何通过binding来生成代理对象呢?就是通过createProxy中调用不同bindingAdapter的inBinding方法,借助于动态代理技术进行生成。
private Object createProxy(Reference reference, Binding binding) {
BindingAdapter<Binding> bindingAdapter = this.bindingAdapterManager.loadAdapter(binding
.getBindingType());
if (bindingAdapter == null) {
...// 异常
}
SofaLogger.BINDING_LOG.info(" >>In Binding [" + binding.getBindingType() + "] Begins - " + reference);
Object proxy = null;
try {
proxy = bindingAdapter.inBinding(reference, binding, sofaRuntimeContext);
} finally {
SofaLogger.BINDING_LOG.info(" >>In Binding [" + binding.getBindingType() + "] Ends - " + reference);
}
return proxy;
}
- JVM的binding,会使用JvmBindingAdapter来生成代理对象,生成的代理对象的InvocationHandler为JvmServiceInvoker。当调用代理对象的方法时,就会触发JvmServiceInvoker的doInvoke,内部会找到代理对象对应的目标对象进行调用。
public class JvmBindingAdapter implements BindingAdapter<JvmBinding> {
public Object inBinding(Object contract, JvmBinding binding,
SofaRuntimeContext sofaRuntimeContext) {
return createServiceProxy((Contract) contract, binding, sofaRuntimeContext);
}
protected Object createServiceProxy(Contract contract, JvmBinding binding,
SofaRuntimeContext sofaRuntimeContext) {
...//
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(newClassLoader);
// 生成代理对象
ServiceProxy handler = new JvmServiceInvoker(contract, binding, sofaRuntimeContext);
...
}
}
}
- RPC的binding,会调用RpcBindingAdapter来生成代理对象,生成代理对象的Handler为SofaServiceProxy。当调用代理对象的方法时,就会触发SofaServiceProxy的doInvoke,进而发起RPC调用
总结图
总结
解析
- 对标签注册相应解析器:com.alipay.sofa.runtime.spring.SofaNamespaceHandler
(绑定tagName和beanDefinitionParser ) - 对相应标签进行解析
- <sofa:service>:com.alipay.sofa.runtime.spring.parser.ServiceDefinitionParser
- <sofa:reference>:com.alipay.sofa.runtime.spring.parser.ReferenceDefinitionParser
- 标签解析过程
- 父类AbstractContractDefinitionParser的doParse方法:解析service和reference的共有属性
- 对应parser自身的doParseInternal方法:解析私有熟悉
- 标签解析结果
- <sofa:service>:com.alipay.sofa.runtime.spring.factory.ServiceFactoryBean
- <sofa:reference>:com.alipay.sofa.runtime.spring.factory.ReferenceFactoryBean
生成组件
服务发布组件ServiceComponent:在ServiceFactoryBean的doAfterPropertiesSet中生成并注册组件
服务引用组件ReferenceComponent:在ReferenceFactoryBean的doAfterPropertiesSet中生成并注册组件
服务发布注册组件
目的:将服务的bean注册到Sofa Runtime Context中
服务组件管理接口:ComponentManager
注册中心:registry(componentManager的一个实例)
注册组件经历三个过程:register(),resolve(),activate()
-
activate() 获取到服务的Binding集,加载每一个Binding对应BindingAdapter,调用outBinding对外暴露服务
JVM服务发布的Adapter:JvmBindingAdapter,outBinding返回null(不需要暴露)
RPC服务发布的Adapter:RpcBindingAdapter,将服务注册到注册中心。
服务引用注册组件
目的:将Sofa Runtime Context的一个服务注册成spring的一个bean
-
拿到服务对应的对象:ReferenceRegisterHelper的registerReference
给RPC服务引用自动增加一个JVM绑定,以达到优先本地JVM调
调用componentManager的registerAndGet方法注册并生成引用服务的代理对象
-
生成代理对象:ReferenceComponent的active方法
如果有多个binding,优先使用jvm binding来生成本地调用的代理对象
JvmBindingAdapter生成代理对象的handler:JvmServiceInvoker
RpcBindingAdapter生成代理对象的handler:SofaServiceProxy