connector启动后会将share/java下目录为 confluent-common、kafka-serde-tools、monitoring-interceptors、kafka-connect-*路径设置到CLASSPATH中,程序运行起来后目录下的jar会加载到内存中。
程序运行入口
bin/connect-distributed.sh config/connect-distributed.properties
public static void main(String[] args) throws Exception {
String workerPropsFile = args[0];
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
Time time = Time.SYSTEM;
Plugins plugins = new Plugins(workerProps);//初始化connector Converter Transformation类插件加载器
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);//加载配置信息
//实例化RestServer,服务端ip和端口从配置文件获取(0.0.0.0:7583),接受连接,生成对象server
//1)实例化对象 jetty httpserver,对外提供REST风格的API 接受http请求,然后调用服务Handler使用一个ThreadPool接受Post、Get等请求,交于Handler处,用到了NIO接受请求。
RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();//offset后台存储
offsetBackingStore.configure(config);
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);//实例化Worker,worker在一组线程里动态运行一系列的tasks来执行source or sink任务,每个task都有一个固定的线程,worker充当了个容器作用,
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
statusBackingStore.configure(config);//状态后台存储
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);//配置后台存储
DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore,
advertisedUrl.toString());
final Connect connect = new Connect(herder, rest);
try {
connect.start();
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
}
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
}
本文先介绍/connector/{connector}/config,根据传入的配置信息新建connector的流程,这个流程主要是生成connector的示例,并生成connector的过程,后面会将task的启动
生成的流程步骤在herder.start()