前言
XxlJob 2.0.1
版本源码,其实在去年这个时候我已经看完了,并且做了很详细的注释。但是由于自己太懒了,没有写成博客分享。俗话说好记性不如烂笔头,于是乎我挑出几个源码实现中我认为不错的知识点并且结合自己的见解,来分享一波。
因为篇幅过于长,简书不让发,要分成上下节!
源码思考
- 1.
executor
端是开启一个jettyServer
,其中配置了JettyServerHandler
。将请求交给XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
详细可以看JettyServerHandler
中的handle
函数
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
if ("/services".equals(target)) { // services mapping
StringBuffer stringBuffer = new StringBuffer("<ui>");
for (String serviceKey: xxlRpcProviderFactory.getServiceData().keySet()) {
stringBuffer.append("<li>").append(serviceKey).append(": ").append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");
}
stringBuffer.append("</ui>");
writeResponse(baseRequest, response, stringBuffer.toString().getBytes());
return;
} else { // default remoting mapping
// request parse
XxlRpcRequest xxlRpcRequest = null;
try {
xxlRpcRequest = parseRequest(request);
} catch (Exception e) {
writeResponse(baseRequest, response, ThrowableUtil.toString(e).getBytes());
return;
}
// invoke
XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
// response-serialize + response-write
byte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
writeResponse(baseRequest, response, responseBytes);
}
}
executor
端通过上文中xxlRpcProviderFactory.invokeService(xxlRpcRequest);
函数执行本地暴露的服务方法
/**
* invoke service
*
* @param xxlRpcRequest
* @return
*/
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
// make response
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
// match service bean
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
Object serviceBean = serviceData.get(serviceKey);
// valid
if (serviceBean == null) {
xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
return xxlRpcResponse;
}
if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
return xxlRpcResponse;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
return xxlRpcResponse;
}
// invoke
try {
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
return xxlRpcResponse;
}
executor
端服务提供者维护在serviceData
中, serviceData
是map
结构。key
是服务提供者className
拼接版本号, value
则是服务提供者本身实例。
// ---------------------- server invoke ----------------------
/**
* init local rpc service map
*/
private Map<String, Object> serviceData = new HashMap<String, Object>();
public Map<String, Object> getServiceData() {
return serviceData;
}
admin
端其实是没有开启jettyServer
服务器,可以通过XxlJobDynamicScheduler
的
initRpcProvider()
一看究竟。
// ---------------------- init + destroy ----------------------
public void start() throws Exception {
// valid
Assert.notNull(scheduler, "quartz scheduler is null");
// init i18n
initI18n();
// admin registry monitor run
/*
启动自动注册线程,获取类型为自动注册的执行器信息,完成机器的自动注册与发现
*/
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
/**
* 启动失败日志监控线程
*/
JobFailMonitorHelper.getInstance().start();
// admin-server
/**
* 暴露AdminBiz服务,并设置jettyServerHandler
*/
initRpcProvider();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
// ---------------------- admin rpc provider (no server version) ----------------------
private static JettyServerHandler jettyServerHandler;
private void initRpcProvider(){
// init
XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), null, 0, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null);
// add services
xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());
// jetty handler
jettyServerHandler = new JettyServerHandler(xxlRpcProviderFactory);
}
private void stopRpcProvider() throws Exception {
new XxlRpcInvokerFactory().stop();
}
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
jettyServerHandler.handle(null, new Request(null, null), request, response);
}
admin
端通过API
服务暴露出自己的服务。比如执行器注册服务,任务结果回调服务。admin
端服务暴露都是通过JobApiController
实现的,来达到和executor端类似的效果,请求交给JettyServerHandler
处理, 然后通过xxlRpcProviderFacotry
调用本地方法。
@Controller
public class JobApiController implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
}
@RequestMapping(AdminBiz.MAPPING)
@PermessionLimit(limit=false)
public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
XxlJobDynamicScheduler.invokeAdminService(request, response);
}
}
XxlRpcReferenceBean
这个factoryBean
中的getObject
方法用于创建代理对象,
代理逻辑中过滤掉了非业务方法,也就是Object类中的方法。只是将目标类的方法名,参数,类名等信息包装成XxlRpcRequest
,通过JettyClient
发送给调度中心。调度中心的接口地址为"admin端的ip/api"。调度中心的API
接口拿到请求之后通过参数里面的类名,方法,参数,版本号等信息反射出来一个服务实例,调用invoke
执行方法,也就是上文中提到的JobApiController
。
每个XxlRpcReferenceBean
对象中都会初始化一个JettyClient
对象。感觉这样做有点耗性能,需要优化啊。毕竟创建一个JettyClient
对象开销并不小,也许你使用操作不恰当会造成OOM
。之前有一位朋友就是对每一个请求都创建了一个HttpClient
,这样由于创建每一个HttpClient
实例的时候都会调用evictExpireConnections
,造成有多少请求就会创建多少个定时线程,最后造成系统OOM
。所以建议这里最好采用单例或者将JettyClient
缓存起来。
// ---------------------- initClient ----------------------
Client client = null;
private void initClient() {
try {
client = netType.clientClass.newInstance();
client.init(this);
} catch (InstantiationException | IllegalAccessException e) {
throw new XxlRpcException(e);
}
}
创建发起RPC请求的代理对象
public Object getObject() {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String className = method.getDeclaringClass().getName();
// filter method like "Object.toString()"
if (Object.class.getName().equals(className)) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String address = routeAddress();
if (address==null || address.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(method.getName());
xxlRpcRequest.setParameterTypes(method.getParameterTypes());
xxlRpcRequest.setParameters(args);
// send
if (CallType.SYNC == callType) {
try {
// future set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
// do invoke
client.asyncSend(address, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// remove-InvokerFuture
XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
}
} else if (CallType.FUTURE == callType) {
// thread future set
XxlRpcInvokeFuture invokeFuture = null;
try {
// future set
invokeFuture = new XxlRpcInvokeFuture(new XxlRpcFutureResponse(xxlRpcRequest, null));
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
client.asyncSend(address, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
// remove-InvokerFuture
invokeFuture.stop();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
} else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
}
try {
// future set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, finalInvokeCallback);
client.asyncSend(address, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
// future remove
XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
return null;
} else if (CallType.ONEWAY == callType) {
client.asyncSend(address, xxlRpcRequest);
return null;
} else {
throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
}
}
});
}
这里只讲下SYNC
模式的请求,XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
创建XxlRpcFutureResponse
对象(Future任务执行的结果), 这里的invokeCallback
回调是null
。
public XxlRpcFutureResponse(XxlRpcRequest request, XxlRpcInvokeCallback invokeCallback) {
this.request = request;
this.invokeCallback = invokeCallback;
// set-InvokerFuture
XxlRpcFutureResponseFactory.setInvokerFuture(request.getRequestId(), this);
}
当我们获取Future
执行结果时, XxlRpcFutureResponse
中的done
变量如果是false
, 一直阻塞线程(还有超时机制), 除非有调用setResponse(XxlRpcResponse response)
方法
使done
变量为true
, 并获取锁,调用lock.notifyAll()
, 唤醒线程,并返回执行结果。
在JettyClient
中发送请求,调用asyncSend
方法
@Override
public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
// do invoke
postRequestAsync(address, xxlRpcRequest);
}
在postRequestAsync
方法拿到RPC
请求执行结果,我们应该通知future
,唤醒等待。
// deserialize response
XxlRpcResponse xxlRpcResponse = (XxlRpcResponse) xxlRpcReferenceBean.getSerializer().deserialize(responseBytes, XxlRpcResponse.class);
// notify response
XxlRpcFutureResponseFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse);
在初始化XxlRpcFutureResponse
中, 我们有调用setInvokerFuture
方法将消息和XxlRpcFutureResponse
结果维护起来。
当在JettyClient
执行完请求获取结果时, 调用notifyInvokerFuture
方法设置XxlRpcFutureRespons
e中的xxlRpcResponse
属性
也就是真实的执行结果。
JettyClient
方法请求虽然是异步的, 但是这里还是同步阻塞获取执行结果。
public class XxlRpcFutureResponseFactory {
private static ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
public static void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
// TODO,running future method-isolation and limit
futureResponsePool.put(requestId, futureResponse);
}
public static void removeInvokerFuture(String requestId){
futureResponsePool.remove(requestId);
}
public static void notifyInvokerFuture(String requestId, XxlRpcResponse xxlRpcResponse){
XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse != null) {
futureResponse.setResponse(xxlRpcResponse);
futureResponsePool.remove(requestId);
}
}
}
- 2.
XxlJob
的核心包括XxlRpc
,上文大致提到过其简单的流程。如果我们想要实现一个RPC
,应该要做些什么呢?比如序列化,压缩算法,协议,动态代理,服务注册,加密,网络编码,连接管理,健康检测,负载均衡,优雅启停机,异常重试,业务分组以及熔断限流等等。由于之前写过一个基于Netty
简单的RPC
框架,因此可以通过和XxlRpc
对比来查漏补缺。(这里不会讲Netty
,只会粗略讲一下Rpc
代理)
netty-rpc-client
端扫描需要代理服务的接口并且修改BeanDefinition
初始化的方式。在Spring
容器中实例化一个对象的方式有:Supplier
,FactoryBean
,指定FactoryMethodName
和FactoryBeanName
,Constructor
等等。
@Slf4j
public class ClassPathRpcScanner extends ClassPathBeanDefinitionScanner {
private RpcFactoryBean<?> rpcFactoryBean = new RpcFactoryBean<Object>();
private Class<? extends Annotation> annotationClass;
public void setAnnotationClass(Class<? extends Annotation> annotationClass) {
this.annotationClass = annotationClass;
}
public ClassPathRpcScanner(BeanDefinitionRegistry registry) {
super(registry);
}
@Override
protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);
if (CollectionUtils.isEmpty(beanDefinitions)) {
logger.warn("No RPC mapper was found in '"
+ Arrays.toString(basePackages)
+ "' package. Please check your configuration");
} else {
processBeanDefinitions(beanDefinitions);
}
return beanDefinitions;
}
public void registerFilter() {
boolean acceptAllInterfaces = true;
if (this.annotationClass != null) {
addIncludeFilter(new AnnotationTypeFilter(this.annotationClass));
acceptAllInterfaces = false;
}
if (acceptAllInterfaces) {
addIncludeFilter(new TypeFilter() {
@Override
public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
return true;
}
});
}
// exclude package-info.java
addExcludeFilter(new TypeFilter() {
@Override
public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
String className = metadataReader.getClassMetadata().getClassName();
return className.endsWith("package-info");
}
});
}
private void processBeanDefinitions(Set<BeanDefinitionHolder> beanDefinitionHolders) {
GenericBeanDefinition genericBeanDefinition = null;
for (BeanDefinitionHolder holder : beanDefinitionHolders) {
genericBeanDefinition = (GenericBeanDefinition) holder.getBeanDefinition();
// genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());
// genericBeanDefinition.setBeanClass(this.rpcFactoryBean.getClass());
genericBeanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
/**
* Bean的初始化就可以通过Supplier.get() 和 设置factoryBean.getObject 有异曲同工之妙
*/
// try {
// genericBeanDefinition.setInstanceSupplier(new RpcSupplier<>(Class.forName(genericBeanDefinition.getBeanClassName())));
// } catch (Exception ex) {
// throw new RuntimeException(ex);
// }
/**
*
* 指定factoryMethodName,FactoryBeanName
* 当我们能找到无参 就先执行无参方法,最后执行有参的方法。方法的参数来源于ConstructorArgumentValue
*
* 这里设置独一份的。 我们如果设置了FactoryMethodName, 要注入的类型要和Method ReturnType要匹配起来
* 不然会报错。这个customFactoryBean不能设置成泛型
*/
/**
* DefaultListableBeanFactory:findAutowireCandidates
* DefaultListableBeanFactory:doGetBeanNamesForType
* AbstractAutowiredCapableBeanFactory:determineTargetType
* AbstractAutowiredCapableBeanFactory:getTypeForFactoryMethod
*/
genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());
genericBeanDefinition.setFactoryMethodName("getObject");
genericBeanDefinition.setFactoryBeanName("customFactoryBean");
// genericBeanDefinition.setFactoryBeanName();
// genericBeanDefinition.setFactoryMethodName();
log.info("ClassPathRpcScanner设置GenericBeanDefinition:{}", genericBeanDefinition);
log.info("ClassPathRpcScanner设置BeanDefinitionHolder:{}", holder);
}
}
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
}
}
指定FactoryBeanName和FactoryMethodName创建代理对象
public class CustomFactoryBean<T> {
@Autowired
private RpcFactory<T> rpcFactory;
public <T> T getObject(Class<T> rpcInterface) {
return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {rpcInterface}, this.rpcFactory);
}
public static void main(String[] args) {
Class factoryClass = ClassUtils.getUserClass(CustomFactoryBean.class);
Method[] candidates = ReflectionUtils.getUniqueDeclaredMethods(factoryClass);
Method getObject = null;
for (Method method : candidates) {
if ("getObject".equalsIgnoreCase(method.getName())) {
getObject = method;
}
}
System.out.println(getObject);
/**
* 测试泛型
*/
System.out.println(getObject.getTypeParameters().length);
System.out.println(getObject.getTypeParameters());
System.out.println(Arrays.asList(getObject.getParameterTypes()));
}
}
FactoryBean
创建代理对象
public class RpcFactoryBean<T> implements FactoryBean<T> {
private Class<T> rpcInterface;
@Autowired
private RpcFactory<T> rpcFactory;
public RpcFactoryBean() {
}
public RpcFactoryBean(Class<T> rpcInterface) {
this.rpcInterface = rpcInterface;
}
@Nullable
@Override
public T getObject() throws Exception {
return getRpc();
}
@Nullable
@Override
public Class<?> getObjectType() {
return this.rpcInterface;
}
@Override
public boolean isSingleton() {
return (3&1) == 1;
}
public <T> T getRpc() {
/**
* 这一步不要写成了rpcInterface.getInterfaces()
*/
return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);
}
public static void main(String[] args) {
System.out.println(InfoUserService.class.getInterfaces());
}
}
Supplier
方式创建代理对象
public class RpcSupplier<T> implements Supplier<T> {
private Class<T> rpcInterface;
public RpcSupplier(Class<T> rpcInterface) {
this.rpcInterface = rpcInterface;
}
/**
* 这里不用注入的方式, 因为RpcSupplier没有被Spring容器托管
*/
private RpcFactory<T> rpcFactory = null;
@Override
public T get() {
ApplicationContext context = SpringApplicationContextUtil.getApplicationContext();
if (context != null) {
rpcFactory = context.getBean(RpcFactory.class);
}
return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);
}
}
InvocationHandler
@Component
@Slf4j
public class RpcFactory<T> implements InvocationHandler {
@Autowired(required = false)
private NettyClient nettyClient = new NettyClient();
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setParameterType(method.getParameterTypes());
request.setId(IdUtil.getId());
Object result = nettyClient.send(request);
Class<?> returnType = method.getReturnType();
Response response = JSON.parseObject(result.toString(), Response.class);
if (response.getCode() == 1) {
throw new RuntimeException(response.getErrorMsg());
}
if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {
return response.getData();
} else if (Collection.class.isAssignableFrom(returnType)) {
return JSONArray.parseArray(response.getData().toString(), Object.class);
} else if (Map.class.isAssignableFrom(returnType)) {
return JSON.parseObject(response.getData().toString(), Map.class);
} else {
Object data = response.getData();
if (data != null) {
return JSON.parseObject(data.toString(), returnType);
} else {
return null;
}
}
}
}
Rpc
扫描配置,指定要扫描的路径和服务接口。
@Component
public class RpcScannerConfigurer implements BeanDefinitionRegistryPostProcessor {
private static final String BASE_PACKAGE = "com.cmazxiaoma.springcloud.netty.service";
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
ClassPathRpcScanner classPathRpcScanner = new ClassPathRpcScanner(registry);
classPathRpcScanner.setAnnotationClass(RpcService.class);
classPathRpcScanner.registerFilter();
classPathRpcScanner.scan(StringUtils.tokenizeToStringArray(BASE_PACKAGE, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
public static void main(String[] args) {
System.out.println(Arrays.asList(StringUtils.tokenizeToStringArray("com.cmazxiaoma.springcloud.netty.service", ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS)));
}
}
在我们netty-rpc-client
, 我们扫描所有被@RpcService
注解的接口, 为这些接口
创建BeanDefinition
, 指定其BeanClass
是我们的factory
类, 指定其接口的类型。
我们在这个factory
类,给服务接口做动态代理,其InvocationHandler
中的invoke
函数就会发起rpc调用。
那我们怎么引用服务者提供者接口呢?
在Controller
类中通过@Autowired
注入服务提供者接口即可。
再看XxlJob
的实现
比如我们在Controller
中 把服务接口打上@XxlRpcReference
注解, 可以设置timeout
,version
,序列化算法等等等属性
然后Spring
容器在这个服务接口依赖的bean
实例化(postProcessAfterInstantiation)的时候, 会为被@XxlRpcReference注解的服务接口字段,创建XxlRpcReferenceBean
, 同时把引用赋值给这些服务接口。
XxlRpcReferenceBean
也是一个工厂类, 内部也做了动态代理。
我觉得这些XxlRpcReferenceBean
最好缓存起来,不然10次引用服务接口,就要创建10
次对象(内部还要创建JettyClient
对象)。
public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {
private Logger logger = LoggerFactory.getLogger(XxlRpcSpringInvokerFactory.class);
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
this.serviceRegistryClass = serviceRegistryClass;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
// ---------------------- util ----------------------
private XxlRpcInvokerFactory xxlRpcInvokerFactory;
@Override
public void afterPropertiesSet() throws Exception {
// start invoker factory
xxlRpcInvokerFactory = new XxlRpcInvokerFactory(serviceRegistryClass, serviceRegistryParam);
xxlRpcInvokerFactory.start();
}
@Override
public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
if (field.isAnnotationPresent(XxlRpcReference.class)) {
// valid
Class iface = field.getType();
if (!iface.isInterface()) {
throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
}
XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
// init reference bean
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
rpcReference.netType(),
rpcReference.serializer().getSerializer(),
rpcReference.callType(),
iface,
rpcReference.version(),
rpcReference.timeout(),
rpcReference.address(),
rpcReference.accessToken(),
null
);
Object serviceProxy = referenceBean.getObject();
// set bean
field.setAccessible(true);
field.set(bean, serviceProxy);
logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());
}
}
});
return super.postProcessAfterInstantiation(bean, beanName);
}
@Override
public void destroy() throws Exception {
// stop invoker factory
xxlRpcInvokerFactory.stop();
}
private BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}
在我们netty-rpc-server端, 扫描@RpcService
维护到一个map
里面, key
是接口类型,value
是服务提供者实现类。当消费者发起rpc
调用时, 需要调用服务提供者实现类的method
时, 就减少了反射服务提供者实现类的性能开销。这里是不是可以把服务实现类的所有method
都可以缓存起来? 我觉得有利有弊,可以通过CPU,内存,维护缓存的成本这3
个维度决策。综合考虑,还是不要缓存起来较好。一个method
反射次数超过15
次,会交给MethodAccessorImpl
处理,会在内存中生成对应的字节码,并调用ClassDefiner.defineClass
创建对应的class
对象,性能会得到一定的提升。
Java
反射效率低的原因:
Method#invoke 方法会对参数做封装和解封操作:我们可以看到,invoke 方法的参数是 Object[] 类型,也就是说,如果方法参数是简单类型的话,需要在此转化成 Object 类型,例如 long ,在 javac compile 的时候 用了Long.valueOf() 转型,也就大量了生成了Long 的 Object, 同时 传入的参数是Object[]数值,那还需要额外封装object数组。而在上面 MethodAccessorGenerator#emitInvoke 方法里我们看到,生成的字节码时,会把参数数组拆解开来,把参数恢复到没有被 Object[] 包装前的样子,同时还要对参数做校验,这里就涉及到了解封操作。因此,在反射调用的时候,因为封装和解封,产生了额外的不必要的内存浪费,当调用次数达到一定量的时候,还会导致 GC。
需要检查方法可见性。我们会发现,反射时每次调用都必须检查方法的可见性(在 Method.invoke 里)
需要校验参数:反射时也必须检查每个实际参数与形式参数的类型匹配性(在NativeMethodAccessorImpl.invoke0 里或者生成的 Java 版 MethodAccessor.invoke 里)
反射方法难以内联:Method invoke 就像是个独木桥一样,各处的反射调用都要挤过去,在调用点上收集到的类型信息就会很乱,影响内联程序的判断,使得 Method.invoke() 自身难以被内联到调用方。(方法内联指的是在即时编译过程中遇到方法调用时,直接编译目标方法的方法体,并替换原方法调用。这样就不再需要像调用方法那样的压栈,出栈,传参了)
JIT 无法优化:因为反射涉及到动态加载的类型,所以无法进行优化。
回到正轨,我们这一套实现和Xxl-Rpc
的实现方式大同小异。
public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {
// ---------------------- config ----------------------
private String netType = NetEnum.JETTY.name();
private String serialize = Serializer.SerializeEnum.HESSIAN.name();
private String ip = IpUtil.getIp(); // for registry
private int port = 7080; // default port
private String accessToken;
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
// set
public void setNetType(String netType) {
this.netType = netType;
}
public void setSerialize(String serialize) {
this.serialize = serialize;
}
public void setIp(String ip) {
this.ip = ip;
}
public void setPort(int port) {
this.port = port;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
this.serviceRegistryClass = serviceRegistryClass;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
// util
private void prepareConfig(){
// prepare config
NetEnum netTypeEnum = NetEnum.autoMatch(netType, null);
Serializer.SerializeEnum serializeEnum = Serializer.SerializeEnum.match(serialize, null);
Serializer serializer = serializeEnum!=null?serializeEnum.getSerializer():null;
if (port <= 0) {
throw new XxlRpcException("xxl-rpc provider port["+ port +"] is unvalid.");
}
if (NetUtil.isPortUsed(port)) {
throw new XxlRpcException("xxl-rpc provider port["+ port +"] is used.");
}
// init config
super.initConfig(netTypeEnum, serializer, ip, port, accessToken, serviceRegistryClass, serviceRegistryParam);
}
// ---------------------- util ----------------------
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODO,addServices by api + prop
}
@Override
public void afterPropertiesSet() throws Exception {
this.prepareConfig();
super.start();
}
@Override
public void destroy() throws Exception {
super.stop();
}
}
- 3.
XxlJob
对Groovy
的支持,同时还支持注入Spring
中的bean
.(画外音:Zuul
使用Grovvy
定义动态过滤器的时候,删除Grovvy
文件并不能从当前运行的api网关中移除这个过滤器,只能将shouldFilter
返回false
。目前过滤器无法注入Spring
中的bean
)。
public class GlueFactory {
private static GlueFactory glueFactory = new GlueFactory();
public static GlueFactory getInstance(){
return glueFactory;
}
public static void refreshInstance(int type){
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
glueFactory = new SpringGlueFactory();
}
}
/**
* groovy class loader
*/
private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
/**
* load new instance, prototype
*
* @param codeSource
* @return
* @throws Exception
*/
public IJobHandler loadNewInstance(String codeSource) throws Exception{
if (codeSource!=null && codeSource.trim().length()>0) {
Class<?> clazz = groovyClassLoader.parseClass(codeSource);
if (clazz != null) {
Object instance = clazz.newInstance();
if (instance!=null) {
if (instance instanceof IJobHandler) {
this.injectService(instance);
return (IJobHandler) instance;
} else {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
+ "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
}
}
}
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
}
/**
* inject service of bean field
*
* @param instance
*/
public void injectService(Object instance) {
// do something
}
}
public class SpringGlueFactory extends GlueFactory {
private static Logger logger = LoggerFactory.getLogger(SpringGlueFactory.class);
/**
* inject action of spring
* @param instance
*/
@Override
public void injectService(Object instance){
if (instance==null) {
return;
}
if (XxlJobSpringExecutor.getApplicationContext() == null) {
return;
}
Field[] fields = instance.getClass().getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
Object fieldBean = null;
// with bean-id, bean could be found by both @Resource and @Autowired, or bean could only be found by @Autowired
if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {
try {
Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);
if (resource.name()!=null && resource.name().length()>0){
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(resource.name());
} else {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getName());
}
} catch (Exception e) {
}
if (fieldBean==null ) {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
}
} else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);
if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(qualifier.value());
} else {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
}
}
if (fieldBean!=null) {
field.setAccessible(true);
try {
field.set(instance, fieldBean);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
}
}
}
}
- 4.
Shiro
通过实现DestructionAwareBeanPostProcessor
完成对Bean生命周期的掌握,异曲同工之妙的还有ApplicationListenerDetector
等等等。
我觉得xxl-job
可以用这个更加优雅的方式来完成对bean
的生命周期的托管。
public class LifecycleBeanPostProcessor implements DestructionAwareBeanPostProcessor, PriorityOrdered {
/**
* Private internal class log instance.
*/
private static final Logger log = LoggerFactory.getLogger(LifecycleBeanPostProcessor.class);
/**
* Order value of this BeanPostProcessor.
*/
private int order;
/**
* Default Constructor.
*/
public LifecycleBeanPostProcessor() {
this(LOWEST_PRECEDENCE);
}
/**
* Constructor with definable {@link #getOrder() order value}.
*
* @param order order value of this BeanPostProcessor.
*/
public LifecycleBeanPostProcessor(int order) {
this.order = order;
}
/**
* Calls the <tt>init()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Initializable}
*
* @param object the object being initialized.
* @param name the name of the bean being initialized.
* @return the initialized bean.
* @throws BeansException if any exception is thrown during initialization.
*/
public Object postProcessBeforeInitialization(Object object, String name) throws BeansException {
if (object instanceof Initializable) {
try {
if (log.isDebugEnabled()) {
log.debug("Initializing bean [" + name + "]...");
}
((Initializable) object).init();
} catch (Exception e) {
throw new FatalBeanException("Error initializing bean [" + name + "]", e);
}
}
return object;
}
/**
* Does nothing - merely returns the object argument immediately.
*/
public Object postProcessAfterInitialization(Object object, String name) throws BeansException {
// Does nothing after initialization
return object;
}
/**
* Calls the <tt>destroy()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Destroyable}
*
* @param object the object being initialized.
* @param name the name of the bean being initialized.
* @throws BeansException if any exception is thrown during initialization.
*/
public void postProcessBeforeDestruction(Object object, String name) throws BeansException {
if (object instanceof Destroyable) {
try {
if (log.isDebugEnabled()) {
log.debug("Destroying bean [" + name + "]...");
}
((Destroyable) object).destroy();
} catch (Exception e) {
throw new FatalBeanException("Error destroying bean [" + name + "]", e);
}
}
}
/**
* Order value of this BeanPostProcessor.
*
* @return order value.
*/
public int getOrder() {
// LifecycleBeanPostProcessor needs Order. See https://issues.apache.org/jira/browse/SHIRO-222
return order;
}
/**
* Return true only if <code>bean</code> implements Destroyable.
* @param bean bean to check if requires destruction.
* @return true only if <code>bean</code> implements Destroyable.
* @since 1.4
*/
@SuppressWarnings("unused")
public boolean requiresDestruction(Object bean) {
return (bean instanceof Destroyable);
}
}
- 在
Dubbo
中,Invoke
是一个关键组件。在服务者和消费者之间都充当服务调用的一个职责,有点像xxl-job
中的XxlRpcProviderFactory
。
- 在
Dubbo
中的ProxyFactory
,getInvoker
方法用于在服务提供者端,将服务的具体实现类转换成Invoker
。getProxy
方法用于在消费者端,将invoker
转换成客户端需要的接口。在服务提供者ServiceConfig
和消费者ReferenceConfig
中,都会对proxyFactory
通过ExtensionLoader
扩展机制生成适配类ProxyFactory$Adaptive
。这个适配类会根据URL
的ProxyFactory
参数选择对应的实现类进行操作。
/**
* ProxyFactory. (API/SPI, Singleton, ThreadSafe)
*/
@SPI("javassist")
public interface ProxyFactory {
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create invoker.
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
ReferenceBean
创建的代理对象,就要经过InvokerInovationHandler
处理。首先会经过AbstractInvoker
中的public Result invoke(Invocation inv) throws RpcException
,然后再交给其子类doInvoke(invocation)
实现。
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
可以看看DubboInvoker
中的doInvoke
实现,发起远程调用。
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
服务提供者收到请求时,会回调此方法。最终会通过AbstractProxyInvoker
调用到上文提到过 ProxyFactory
生成的Invoker
对象。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
AbstractProxyInvoker
最终会调用服务提供者类中的方法。
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
我分析的比较简陋,毕竟这篇文章可不能让Dubbo
喧宾夺主。对于xxl-Rpc
在消费者扫描服务接口做动态代理,在提供者扫描服务实例并维护的实现,我们对此可以看下Dubbo
在这一块是怎么实现的。(在Dubbo
中,服务实例维护主要依托Exporter
。它是服务暴露数据结构,规定了通常暴露某个协议的服务时,exporter
具有获取该服务可执行对象Invoker
的能力,以及取消暴露的能力。Invoker
转化为Exporter
时服务暴露的关键, 将该协议下的Exporter
维护到exporterMap
中将完成整个服务的暴露,往往每一种协议都会生成与之对应的Exporter
。Exporter
本文不会讲到,有兴趣的可以看下DubboProtocol
,RegistryProtocol
)
欲知后事如何,且听下回分解
万万没想到一个xxl-job源码分析,竟然能引发这么多血案!(下)
尾言
万万没想到,一个知识点竟然能引发这么多血案!溜了溜了,俯卧撑搞起。