dataX Plugin的类加载机制

dataX中xxxreader和xxxwriter都属于插件,本文主要探讨这些插件是如何加载到JVM中去的。

Plugin的配置信息

dataX中运行一个Job所需要的所有插件的信息都会存放在LoadUtil#pluginRegisterCenter中
一个典型的mysql到mysql导入的获取到的插件信息如下:

  • 包括插件的名字
  • 插件入口类类名
  • 插件的入口类和插件依赖的Jar包所在的路径
"plugin": {
        "reader": {
            "mysqlreader": {
                "class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
                "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
                "developer": "alibaba",
                "name": "mysqlreader",
                "path": "C:/Users/64371/Desktop/1111/datax\\plugin\\reader\\mysqlreader"
            }
        },
        "writer": {
            "mysqlwriter": {
                "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
                "description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql. warn: The more you know about the database, the less problems you encounter.",
                "developer": "alibaba",
                "name": "mysqlwriter",
                "path": "C:/Users/64371/Desktop/1111/datax\\plugin\\writer\\mysqlwriter"
            }
        }
    }

Plugin的配置信息读取

接下来看看Plugin信息是如何加载到LoadUtil#pluginRegisterCenter中的,解析整个作业配置的过程是在ConfigParser#parse这个方法中。

  1. 解析用户的配置文件mysql2mysql.json
  2. 解析核心配置类文件$DATAX_HOME/conf/core.json,这个类中提供了用户可能没有提供的一些配置的默认值;
  3. 解析插件类的配置,主要逻辑是在ConfigParser#parsePluginConfig这个方法中,从$DATAX_HOME/plugin/reader$DATAX_HOME/plugin/writer中读取插件的配置文件。例如,对于mysqlreader这个插件,会将$DATAX_HOME/plugin/reader/mysqlreader/plugin.json的内容读取进Configuration中,并将插件对应Jar包的路径也写入到Configuration中去。当Job所有的配置信息获取完成之后就会将的Configuration对象赋给LoadUtil#pluginRegisterCenter,之后就可以从这个对象中获取插件的信息了。
// ConfigParser#parse
public static Configuration parse(final String jobPath) {
        Configuration configuration = ConfigParser.parseJobConfig(jobPath);

        configuration.merge(
                ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
                false);
        // todo config优化,只捕获需要的plugin
        String readerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        String writerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

        String preHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);

        String postHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

        Set<String> pluginList = new HashSet<String>();
        pluginList.add(readerPluginName);
        pluginList.add(writerPluginName);

        if(StringUtils.isNotEmpty(preHandlerName)) {
            pluginList.add(preHandlerName);
        }
        if(StringUtils.isNotEmpty(postHandlerName)) {
            pluginList.add(postHandlerName);
        }
        try {
            configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
        }catch (Exception e){
            //吞掉异常,保持log干净。这里message足够。
            LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                //
            }
            configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
        }

        return configuration;
}

插件加载

LoadUtil#pluginRegisterCenter有了值之后,每遇到一个插件就可以从LoadUtil#pluginRegisterCenter获取插件的配置然后将其加载到JVM中去了。对于mysqlreader来说加载的逻辑是在LoadUtil#loadJobPlugin中,LoadUtil#loadJobPlugin会调用LoadUtil#loadPluginClass,LoadUtil#loadPluginClass的功能是利用加载器加载指定的类,那么这个类加载器是怎么获得的呢?在dataX中一个插件会用一个类加载器进行加载,具体类加载器的是在LoadUti#getJarLoader中生成的。

可以看到dataX提供了一个JarLoader类,JarLoader继承自URLClassLoader,提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。对于每个插件传入的路径就是上面提到的插件Jar包所在的目录,这样该JarLoader就能加载该目录下的所有jar包,最后有一点要说明的是每JarLoader
的父加载器是AppClassLoader(至少在dataX目前的环境中是这个),并没有违反父委托机制。

// LoadUtil#loadJobPlugin
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
                                                  String pluginName) {
        Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
                pluginType, pluginName, ContainerType.Job);

        try {
            AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
                    .newInstance();
            jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
            return jobPlugin;
        } catch (Exception e) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    String.format("DataX找到plugin[%s]的Job配置.",
                            pluginName), e);
        }
}

// LoadUti#getJarLoader
public static synchronized JarLoader getJarLoader(PluginType pluginType,
                                                      String pluginName) {
        Configuration pluginConf = getPluginConf(pluginType, pluginName);

        JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
                pluginName));
        if (null == jarLoader) {
            String pluginPath = pluginConf.getString("path");
            if (StringUtils.isBlank(pluginPath)) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        String.format(
                                "%s插件[%s]路径非法!",
                                pluginType, pluginName));
            }
            jarLoader = new JarLoader(new String[]{pluginPath});
            jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
                    jarLoader);
        }

        return jarLoader;
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。