sqoop-client类
简介
sqoop2的架构是C-S架构的。sqoop2的用户都必须通过sqoop-client类来与服务器进行交互。
Sqoop-client提供给用户:
- 连接服务器,
- 搜索Connectors,
- 创建Link,
- 创建Job,
- 提交Job,
- 返回Job运行信息等功能。
这些功能基本包括了用户在数据迁移的过程中的所用到的所有的信息。
相关概念解释
sqoop2中将数据迁移任务中的相关概念进行细分。将数据迁移任务中的数据源, 数据传输配置, 数据传输任务进行提取抽象。
经过抽象分别得到核心概念Connector, Link, Job, Driver。
Connector
Connector相当于一个类型,其描述了一种数据源作为传输源或者传输目的地时需要的配置信息。官网中是如下描述Connector的: " The registered connector implementation will provide logic to read from and/or write to a data source that it represents. A connector can have one or more links associated with it."
比如在现在的Sqoop2中提供:
- Generic JDBC Connector
- MySqlJdbcConnector
- KiteConnector (Kite connector enables access to data in HDFS or HBase in diverse file formats (CSV, Avro and Parquet))
- KafkaConnector
- HdfsConnector
共五中connector。
对于每种的数据源有一个相对应的Connector类型。
具体的Connector细节将在后面的文中中给出。
Link
Connector是和数据源(类型)相关的。对于Link是和具体的任务Job相关的。
针对具体的Job, 例如从MySQL->HDFS 的数据迁移Job。就需要针对该Job创建和数据源MySQL的Link1,和数据目的地MySQL的Link2.
Link是和Job相关的, 针对特定的数据源,配置信息。
Link定义了从某一个数据源读出和写入时的配置信息。
Driver
官方源代码中对于Driver的描述是:Sqoop driver that manages the job lifecyle
Dirver提供了对于Job任务运行的其他信息。比如对Map/Reduce任务的配置。
Job
Link定义了从某一个数据源的进行读出和写入时的配置信息。Job是从一个数据源读出, 写入到另外的一个数据源的过程。
所以Job需要由Link(From), Link(To),以及Driver的信息组成。
官网中对Job的原文描述如下: Job: A sqoop job holds the From and To parts for transferring data from the From data source to the To data source. Both the From and theTo are uniquely identified by their corresponding connector Link Ids.
Thus the pre-requisite for creating a job is to first create the links as described above.
Once the linkIds for the From and To are given, then the job configs for the associated connector for the link object have to be filled. You can get the list of all the from and to job config/inputs using Display Config and Input Names For Connector for that connector. A connector can have one or more links. We then use the links in the From and To direction to populate the corresponding MFromConfig and MToConfig respectively.
In addition to filling the job configs for the From and the To representing the link, we also need to fill the driver configs that control the job execution engine environment. For example, if the job execution engine happens to be the MapReduce we will specifiy the number of mappers to be used in reading data from the From data source.
Sqoop-Client源代码分析
public class SqoopClient {
/**
* Underlying request object to fetch data from Sqoop server.
*/
private SqoopResourceRequests resourceRequests;
/**
* True if user retrieved all connectors at once.
*/
private boolean isAllConnectors;
/**
* All cached connectors.
*/
private Map<Long, MConnector> connectors;
/**
* All cached config params for every registered connector in the sqoop system.
*/
private Map<Long, ResourceBundle> connectorConfigBundles;
/**
* Cached driver.
*/
private MDriver mDriver;
/**
* Cached driverConfig bundle.
*/
private ResourceBundle driverConfigBundle;
/**
* Status flags used when updating the submission callback status
*/
//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): Why do wee need a duplicate status enum in client when shell is using the server status?
// NOTE: the getStatus method is on the job resource and this needs to be revisited
private enum SubmissionStatus {
SUBMITTED,
UPDATED,
FINISHED
}
public SqoopClient(String serverUrl) {
resourceRequests = new SqoopResourceRequests();
setServerUrl(serverUrl);
}
/**
* Set new server URL.
*
* Setting new URL will also clear all caches used by the client.
*
* @param serverUrl Server URL
*/
public void setServerUrl(String serverUrl) {
resourceRequests.setServerUrl(serverUrl);
clearCache();
}
/**
* Set arbitrary request object.
*
* @param requests SqoopRequests object
*/
public void setSqoopRequests(SqoopResourceRequests requests) {
this.resourceRequests = requests;
clearCache();
}
/**
* Clear internal cache.
*/
public void clearCache() {
connectorConfigBundles = new HashMap<Long, ResourceBundle>();
driverConfigBundle = null;
connectors = new HashMap<Long, MConnector>();
mDriver = null;
isAllConnectors = false;
}
.......
}
从上面的源代码中可以看出SqoopClient中保存了:
- SqoopResourceRequests类型的对象,其负责和Server进行通信。
- connectors 保存了系统定义的所有的connector
- connectorConfigBundles保存相对应的Connector的配置信息
- mDriver 保存了Driver
- driverConfigBundle 保存了Driver的配置信息。
sqoop-client构造器接受一个URL字符串,用于与Server连接。
同时情况相关的配置信息。
Sqoop-client中剩余的方法主要用于取出相关的connector, Link , Job等,这里先不赘述。