RepositoryManager类
RepositoryManager类负责对RepositoryProvider进行管理, 主要的功能为从配置文件中
“server/conf/sqoop.properties”读取RepositoryProvider的配置信息,并对RepositoryProvider进行实例化。在配置信息发生变动时,更新RepositoryProvider的信息。
RepositoryManager的初始化源代码如下:
public synchronized void initialize(boolean immutableRepository) {
MapContext context = SqoopConfiguration.getInstance().getContext();
Map repoSysProps = context.getNestedProperties("org.apache.sqoop.repository.sysprop.");
LOG.info("Setting system properties: " + repoSysProps);
Iterator repoProviderClassName = repoSysProps.entrySet().iterator();
while(repoProviderClassName.hasNext()) {
Entry repoProviderClass = (Entry)repoProviderClassName.next();
System.setProperty((String)repoProviderClass.getKey(), (String)repoProviderClass.getValue());
}
String repoProviderClassName1 = context.getString("org.apache.sqoop.repository.provider");
if(repoProviderClassName1 != null && repoProviderClassName1.trim().length() != 0) {
if(LOG.isTraceEnabled()) {
LOG.trace("Repository provider: " + repoProviderClassName1);
}
Class repoProviderClass1 = ClassUtils.loadClass(repoProviderClassName1);
if(repoProviderClass1 == null) {
throw new SqoopException(RepositoryError.REPO_0001, repoProviderClassName1);
} else {
try {
this.provider = (RepositoryProvider)repoProviderClass1.newInstance();
} catch (Exception var7) {
throw new SqoopException(RepositoryError.REPO_0001, repoProviderClassName1, var7);
}
this.provider.initialize(context);
if(!immutableRepository) {
LOG.info("Creating or updating respository at bootup");
this.provider.getRepository().createOrUpgradeRepository();
}
if(!this.provider.getRepository().isRepositorySuitableForUse()) {
throw new SqoopException(RepositoryError.REPO_0002);
} else {
SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
LOG.info("Repository Manager initialized: OK");
}
}
} else {
throw new SqoopException(RepositoryError.REPO_0001, "org.apache.sqoop.repository.provider");
}
}
RepositoryProvider类
RepositoryProvider类用于获取配置信息(上下文环境), repository信息。
系统中的一个实现是:JdbcRepositoryProvider类。
JdbcRepositoryProvider类包含了配置信息(JdbcRepositoryContext),Repository信息(JdbcRepository ),以及用于处理事物的类(JdbcRepositoryHandler)。
其中主要包含了初始化handler的方法:
private void initializeRepositoryHandler() {
String jdbcHandlerClassName = this.repoContext.getHandlerClassName();
Class handlerClass = ClassUtils.loadClass(jdbcHandlerClassName);
if(handlerClass == null) {
throw new SqoopException(RepositoryError.JDBCREPO_0001, jdbcHandlerClassName);
} else {
try {
this.handler = (JdbcRepositoryHandler)handlerClass.newInstance();
} catch (Exception var9) {
throw new SqoopException(RepositoryError.JDBCREPO_0001, jdbcHandlerClassName, var9);
}
String connectUrl = this.repoContext.getConnectionUrl();
if(connectUrl != null && connectUrl.trim().length() != 0) {
String jdbcDriverClassName = this.repoContext.getDriverClass();
if(jdbcDriverClassName != null && jdbcDriverClassName.trim().length() != 0) {
Class driverClass = ClassUtils.loadClass(jdbcDriverClassName);
if(driverClass == null) {
throw new SqoopException(RepositoryError.JDBCREPO_0003, jdbcDriverClassName);
} else {
try {
this.driver = (Driver)driverClass.newInstance();
} catch (Exception var8) {
throw new SqoopException(RepositoryError.JDBCREPO_0003, jdbcDriverClassName, var8);
}
Properties jdbcProps = this.repoContext.getConnectionProperties();
DriverManagerConnectionFactory connFactory = new DriverManagerConnectionFactory(connectUrl, jdbcProps);
this.connectionPool = new GenericObjectPool();
this.connectionPool.setMaxActive(this.repoContext.getMaximumConnections());
this.statementPool = new GenericKeyedObjectPoolFactory((KeyedPoolableObjectFactory)null);
new PoolableConnectionFactory(connFactory, this.connectionPool, this.statementPool, this.handler.validationQuery(), false, false, this.repoContext.getTransactionIsolation().getCode());
this.dataSource = new PoolingDataSource(this.connectionPool);
this.txFactory = new JdbcRepositoryTransactionFactory(this.dataSource);
this.repoContext.initialize(this.dataSource, this.txFactory);
this.handler.initialize(this.repoContext);
this.repository = new JdbcRepository(this.handler, this.repoContext);
LOG.info("JdbcRepositoryProvider initialized");
}
} else {
throw new SqoopException(RepositoryError.JDBCREPO_0003);
}
} else {
throw new SqoopException(RepositoryError.JDBCREPO_0002);
}
}
}
JdbcRepository类
JdbcRepository类代表了可以使用jdbc方式进行连接的reposity。其包含了两个对象
private final JdbcRepositoryHandler handler;
private final JdbcRepositoryContext repoContext;
其中JdbcRepositoryContext代表相关的配置信息,JdbcRepositoryHandler用于处理事务(这里采用了代理的设计模式)。
JdbcRepositoryContext类
JdbcRepositoryContext类从配置文件中(server/conf/sqoopproperties)读取配置信息。包括driver,userName, passwaord等。
JdbcRepositoryHandler抽象类
JdbcRepositoryHandler是真正实现各种事物处理的类,事物包括查询connector, link ,创建connector, link, 等。
系统中实现的JdbcRepositoryHandler类的有CommonRepositoryHandler类。
CommonRepositoryHandler利用sql查询语句从数据库中查询结果,并返回给用户。