dataX中xxxreader和xxxwriter都属于插件,本文主要探讨这些插件是如何加载到JVM中去的。
Plugin的配置信息
dataX中运行一个Job所需要的所有插件的信息都会存放在LoadUtil#pluginRegisterCenter中
一个典型的mysql到mysql导入的获取到的插件信息如下:
- 包括插件的名字
- 插件入口类类名
- 插件的入口类和插件依赖的Jar包所在的路径
"plugin": {
"reader": {
"mysqlreader": {
"class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba",
"name": "mysqlreader",
"path": "C:/Users/64371/Desktop/1111/datax\\plugin\\reader\\mysqlreader"
}
},
"writer": {
"mysqlwriter": {
"class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba",
"name": "mysqlwriter",
"path": "C:/Users/64371/Desktop/1111/datax\\plugin\\writer\\mysqlwriter"
}
}
}
Plugin的配置信息读取
接下来看看Plugin信息是如何加载到LoadUtil#pluginRegisterCenter中的,解析整个作业配置的过程是在ConfigParser#parse这个方法中。
- 解析用户的配置文件
mysql2mysql.json
; - 解析核心配置类文件
$DATAX_HOME/conf/core.json
,这个类中提供了用户可能没有提供的一些配置的默认值; - 解析插件类的配置,主要逻辑是在ConfigParser#parsePluginConfig这个方法中,从
$DATAX_HOME/plugin/reader
和$DATAX_HOME/plugin/writer
中读取插件的配置文件。例如,对于mysqlreader这个插件,会将$DATAX_HOME/plugin/reader/mysqlreader/plugin.json
的内容读取进Configuration中,并将插件对应Jar包的路径也写入到Configuration中去。当Job所有的配置信息获取完成之后就会将的Configuration对象赋给LoadUtil#pluginRegisterCenter,之后就可以从这个对象中获取插件的信息了。
// ConfigParser#parse
public static Configuration parse(final String jobPath) {
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
configuration.merge(
ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
false);
// todo config优化,只捕获需要的plugin
String readerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
String writerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
String preHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
String postHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
if(StringUtils.isNotEmpty(preHandlerName)) {
pluginList.add(preHandlerName);
}
if(StringUtils.isNotEmpty(postHandlerName)) {
pluginList.add(postHandlerName);
}
try {
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}catch (Exception e){
//吞掉异常,保持log干净。这里message足够。
LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
//
}
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}
return configuration;
}
插件加载
LoadUtil#pluginRegisterCenter有了值之后,每遇到一个插件就可以从LoadUtil#pluginRegisterCenter获取插件的配置然后将其加载到JVM中去了。对于mysqlreader来说加载的逻辑是在LoadUtil#loadJobPlugin中,LoadUtil#loadJobPlugin会调用LoadUtil#loadPluginClass,LoadUtil#loadPluginClass的功能是利用加载器加载指定的类,那么这个类加载器是怎么获得的呢?在dataX中一个插件会用一个类加载器进行加载,具体类加载器的是在LoadUti#getJarLoader中生成的。
可以看到dataX提供了一个JarLoader类,JarLoader继承自URLClassLoader,提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。对于每个插件传入的路径就是上面提到的插件Jar包所在的目录,这样该JarLoader就能加载该目录下的所有jar包,最后有一点要说明的是每JarLoader
的父加载器是AppClassLoader(至少在dataX目前的环境中是这个),并没有违反父委托机制。
// LoadUtil#loadJobPlugin
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
String pluginName) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);
try {
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
.newInstance();
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
return jobPlugin;
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format("DataX找到plugin[%s]的Job配置.",
pluginName), e);
}
}
// LoadUti#getJarLoader
public static synchronized JarLoader getJarLoader(PluginType pluginType,
String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));
if (null == jarLoader) {
String pluginPath = pluginConf.getString("path");
if (StringUtils.isBlank(pluginPath)) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format(
"%s插件[%s]路径非法!",
pluginType, pluginName));
}
jarLoader = new JarLoader(new String[]{pluginPath});
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader);
}
return jarLoader;
}