一、启用
配置XxlJobConfig
-
示例代码:
@Configuration public class XxlJobConfig { /**配置参数 - 实例化XxlJobSpringExecutor所需要的参数 **/ @Value("${xxl.job.admin.addresses}") private String adminAddresses; ...... /** 启动xxl后续过程,需要实例化XxlJobSpringExecutor **/ @Bean(initMethod = "start", destroyMethod = "destroy") public XxlJobSpringExecutor xxlJobExecutor() { ...... } }
-
这里关注@Bean initMethod destroyMethod
需要注意的是:
单实例bean:容器启动时创建对象
多实例bean:每次获取时创建对象
初始化:
对象创建完成,赋值完成,调用初始化方法
销毁:
单实例:容器关闭时调用
多实例:容器不会销毁,只能手动调用销毁方法
-
这里我们进入类 : XxlJobSpringExecutor
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware { @Override public void start() throws Exception { /** 重点看这个方法 **/ // init JobHandler Repository initJobHandlerRepository(applicationContext); // refresh GlueFactory GlueFactory.refreshInstance(1); /** 之后跟踪父类方法 **/ // super start super.start(); } /** 初始化本地JobHandler存储 **/ private void initJobHandlerRepository(ApplicationContext applicationContext){ if (applicationContext == null) { return; } /** 拿到spring上下文,获取注解了JobHandler的实例 **/ Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class); if (serviceBeanMap!=null && serviceBeanMap.size()>0) { for (Object serviceBean : serviceBeanMap.values()) { if (serviceBean instanceof IJobHandler){ /** 这里拿到我们自定义的JobHandler名称**/ String name = serviceBean.getClass().getAnnotation(JobHandler.class).value(); IJobHandler handler = (IJobHandler) serviceBean; /** 判断本地缓存是否已经存在将要注册的名称 **/ if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler naming conflicts."); } /** 调用父类(XxlJobExecutor)的方法,缓存servie到本地 **/ registJobHandler(name, handler); } } } } /** 获取spring上下文 **/ private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
-
父类解析 - XxlJobExecutor (由于这个类过长,我们逐个分析)
/** 用于缓存JobHandler的map **/ private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(); /** 缓存JobHandler信息到本地缓存 **/ public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); } /** 根据自定义的JobHandler名称获取JobHandler实例**/ public static IJobHandler loadJobHandler(String name){ return jobHandlerRepository.get(name); }
上面过程结束,本地的缓存就结束了,也就结束了启用的过程,下一步我们将继续跟踪后面的过程,注册;
二、注册
首先看 XxlJobExecutor 中的start()方法和destroy() :
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init logpath - 日志路径初始化,由于logPath可以默认为空,且不是我们讨论主要流程,这里先不看他
XxlJobFileAppender.initLogPath(logPath);
// init admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}
针对start()方法中的 initAdminBizList 调用:
/** **/
private static List<AdminBiz> adminBizList;
/** adminAddresses:http://192.168.1.33:8099/xxl-job-admin **/
/** accessToken: 没有使用**/
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
String addressUrl = address.concat(AdminBiz.MAPPING);
/** 组装adminBiz实例 - 这里用了动态代理 **/
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
AdminBiz:
private NetEnum netType; - JETTY
private Serializer serializer; - HESSIAN
private CallType callType; - SYNC
private Class<?> iface; - AdminBiz
private String version;
private long timeout;
private String address; - http://192.168.1.33:8099/xxl-job-admin/api
private String accessToken; -
private XxlRpcInvokeCallback invokeCallback; - null
client : Client client
重点关注:initRpcProvider()方法
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init invoker factory -- 此处暂时不用
xxlRpcInvokerFactory = new XxlRpcInvokerFactory();
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
// add services
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
// start - 此处调用
xxlRpcProviderFactory.start();
}
类:XxlRpcProviderFactory
public void start() throws Exception {
// start server
server = netType.serverClass.newInstance();
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
/** 此处调用执行具体的 ExecutorServiceRegistry - start()
ExecutorRegistryThread - start() 方法
**/
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
String ipPort = IpUtil.getIpPort(ip, port);
for (String serviceKey :serviceData.keySet()) {
serviceRegistry.registry(serviceKey, ipPort);
}
}
}
}
});
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
String ipPort = IpUtil.getIpPort(ip, port);
for (String serviceKey :serviceData.keySet()) {
serviceRegistry.remove(serviceKey, ipPort);
}
}
serviceRegistry.stop();
serviceRegistry = null;
}
}
});
server.start(this);
}
再看: ExecutorRegistryThread - start() - 每隔30s发一次的注册信息,就在这个方法里面完成
public void start(final String appName, final String address){
// valid
if (appName==null || appName.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
return;
}
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
/** 重点关注这个调用 在调用adminBiz.registry的时候,会触发 initAdminBizList 中 XxlRpcReferenceBean 的 getObject()方法的调用 ,下面给出这个方法**/
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});
registryThread.setDaemon(true);
registryThread.start();
}
public Object getObject() {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String className = method.getDeclaringClass().getName();
// filter method like "Object.toString()"
if (Object.class.getName().equals(className)) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String address = routeAddress();
if (address==null || address.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(method.getName());
xxlRpcRequest.setParameterTypes(method.getParameterTypes());
xxlRpcRequest.setParameters(args);
/** 由于默认调用这个逻辑,就给出了这个逻辑 **/
// send
if (CallType.SYNC == callType) {
try {
// future set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
/** 重点看这个逻辑,进行远程调用的 接着看 通信和序列化**/
// do invoke
client.asyncSend(address, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// remove-InvokerFuture
XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
}
}
});
}
三、通信&序列化
通篇都是再说注册的过程,这里通信和序列化,通信框架的部分,由于默认使用的Jetty,我们就把Jetty的部分拿出来看下:JettyClient - postRequestAsync()
private void postRequestAsync(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
// reqURL
String reqURL = address;
if (!address.toLowerCase().startsWith("http")) {
reqURL = "http://" + address; // IP:PORT, need parse to url
}
/** 框架默认的使用Hession进行的序列化 **/
// serialize request
byte[] requestBytes = xxlRpcReferenceBean.getSerializer().serialize(xxlRpcRequest);
// httpclient
HttpClient httpClient = getJettyHttpClient();
// request
Request request = httpClient.newRequest(reqURL);
request.method(HttpMethod.POST);
request.timeout(xxlRpcReferenceBean.getTimeout() + 500, TimeUnit.MILLISECONDS); // async, not need timeout
request.content(new BytesContentProvider(requestBytes));
/** 调用send方法,发送消息到远程服务端注册,到此结束 **/
// invoke
request.send(new BufferingResponseListener() {
/** 这里处理 调用完成的结果 **/
@Override
public void onComplete(Result result) {
} catch (Exception e){
});
}
结束语:
- 注解的定义和解析
- RPC框架支持
- 动态代理的使用
- 工厂模式的使用