StreamSets的部署
- 配置 SDC_CONF,SDC_DATA,SDC_LOG, SPARK_SUBMIT_COMMAND等变量;
vim libexec/sdc-env.sh //修改环境变量;
export SDC_CONF=/home/bigdata/data/sdc/conf
export SDC_DATA=/home/bigdata/data/sdc/data
export SDC_LOG=/home/bigdata/log/sdc
# 配置Java, Hadoop, Spark的Home目录;
export JAVA_HOME=/usr/java/jdk-release
export HADOOP_HOME=/home/bigdata/app/hadoop-release
export HADOOP_COMMAND=$HADOOP_HOME/bin/hadoop
export YARN_COMMAND=$HADOOP_HOME/bin/yarn
export SPARK_HOME=/home/bigdata/app/spark-release
export SPARK_SUBMIT_YARN_COMMAND=$SPARK_HOME/bin/spark-submit
export SDC_JAVA_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10991 ${SDC_JAVA_OPTS} "
- 修改打开文件数和硬件IO数量?
ulimit -a // 查看所有的资源限制情况;
ulimit -n //查看当前文件打开数量;
ulimit -n 32768 //临时本会话,设为open-file最大数;
// 永久修改本机的打开文件数量
vi /etc/security/limits.conf
sdc soft nofile 32768
sdc hard nofile 32768
DataCollector的编译
// 第一步, 先编译 api 和plugins-api的包
git clone git@git.envisioncn.com:arch/datacollector-plugin-api.git datacollector-plugin-api;
cd datacollector-plugin-api;
mvn clean install -DskipTests
git clone git@git.envisioncn.com:arch/datacollector-api.git datacollector-api;
cd datacollector-api;
mvn clean install -DskipTests
// 第二步, 上述2个包都installed 后, 开始编译datacollector
git clone git@git.envisioncn.com:arch/datacollector.git datacollector;
cd datacollector;
mvn clean package -DskipTests -DskipRat -Drelease
DataCollector编译报错与解决
enos-spark的version问题; 将所有原来的2.3.2.3.1** 改成2.3.2即可;
[INFO] StreamSets Data Collector UI ....................... FAILURE [ 5.416 s]
[INFO] StreamSets Data Collector Docs ..................... SKIPPED
[ERROR] Failed to execute goal com.github.trecloux:yeoman-maven-plugin:0.4:build (run-grunt) on project streamsets-datacollector-ui: Error during : cmd /c node --version: Process exited
with an error: 1 (Exit value: 1) -> [Help 1]
Spark任务启动脚本
SDC的 _cluster-manager脚本
Starting: SystemProcess: /home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/
_cluster-manager start
--master yarn
--deploy-mode cluster
--executor-memory 1024m
--executor-cores 1
--num-executors 1
--archives /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/libs.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/etc.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/resources.tar.gz
--files /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/log4j.properties
--jars /home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka_2.11-0.10.0.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/spark-streaming-kafka-0-10_2.11-2.0.3-EDH-0u1-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka-clients-0.10.2.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/metrics-core-2.2.0.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/api-lib/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar
--conf spark.running.mode=yarn
--conf spark.streaming.dynamicAllocation.maxExecutors=3
--conf spark.driver.extraJavaOptions=-Duser.home=.
--conf spark.executor.extraJavaOptions=-javaagent:./streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar -Duser.home=. -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug
--conf spark.executorEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
--conf spark.streaming.kafka.consumer.poll.max.retries=5
--conf spark.yarn.appMasterEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
--conf spark.driver.extraJavaOptions=-Xdebug -Xrunjdwp:transport=dt_socket,server=n,suspend=n,address=192.168.51.1:35053 --conf spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35054
--name StreamSets Data Collector: Cluster_OriginKafka_To_Trash_TestClusterDSource
--class com.streamsets.pipeline.BootstrapClusterStreaming
/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar
SDC-sparkSubmit 命令
// 分解
spark-submit --master yarn
--deploy-mode cluster
--executor-memory 1024m
--executor-cores 1
--num-executors 2
--archives /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/libs.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/etc.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/resources.tar.gz
--files /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/log4j.properties
--jars /home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka_2.11-0.10.0.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka-clients-0.10.2.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/metrics-core-2.2.0.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/spark-streaming-kafka-0-10_2.11-2.1.0.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/api-lib/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar
--conf spark.running.mode=yarn
--conf spark.streaming.dynamicAllocation.maxExecutors=16
--conf spark.driver.extraJavaOptions=-Duser.home=.
--conf 'spark.executor.extraJavaOptions=-javaagent:./streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar -Duser.home=. -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug'
--conf spark.executorEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
--conf spark.streaming.kafka.consumer.poll.max.retries=5
--conf spark.yarn.appMasterEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
--conf 'spark.driver.extraJavaOptions=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35053'
--name 'StreamSets Data Collector: Cluster_OriginKafka_To_Trash_TestMultiWorkerPerf'
--class com.streamsets.pipeline.BootstrapClusterStreaming
/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar
Spark-submit提交中:spark_conf.properties
spark提交时会从本地临时目录中创建生成一个
__spark_conf__158***.zip 的本地zip压缩文件;
该文件中, 主要是hadoop, yarn, spark的配置文件;
其中 spark相关属性文件: spark_conf.properties, 内容如下:
spark.yarn.dist.files=file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/log4j.properties
spark.yarn.cache.visibilities=PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE
spark.streaming.dynamicAllocation.maxExecutors=16
spark.streaming.blockInterval=300000
spark.yarn.cache.timestamps=1589782852008,1589782852187,1589782852646,1589782852763,1589782852806,1589782852864,1589782853161,1589782853222,1589782853316,1589782853371,1589782853420,1589782853450,1589782859142,1589782859604,1589782860068
spark.streaming.dynamicAllocation.scalingInterval=10000
spark.executor.memory=1024m
spark.yarn.dist.archives=file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/libs.tar.gz,file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/etc.tar.gz,file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/resources.tar.gz
spark.yarn.cache.confArchive=hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/__spark_conf__.zip
spark.yarn.cache.sizes=208179347,85717,34568,5151376,951041,82123,219810,1643501,78429,161631,10314,1772,202462143,16315,470
spark.driver.memory=1G
spark.submit.deployMode=cluster
spark.yarn.appMasterEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
spark.executorEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
spark.yarn.secondary.jars=streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,kafka_2.11-0.10.0.1.jar,kafka-clients-0.10.2.1.jar,metrics-core-2.2.0.jar,spark-streaming-kafka-0-10_2.11-2.2.0.jar,streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar
spark.streaming.backpressure.enabled=true
spark.streaming.dynamicAllocation.enabled=false
spark.streaming.kafka.consumer.poll.max.retries=5
spark.master=yarn
spark.yarn.cache.filenames=hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/__spark_libs__1360858079026319589.zip\#__spark_libs__,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar\#__app__.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/kafka_2.11-0.10.0.1.jar\#kafka_2.11-0.10.0.1.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/kafka-clients-0.10.2.1.jar\#kafka-clients-0.10.2.1.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/metrics-core-2.2.0.jar\#metrics-core-2.2.0.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/spark-streaming-kafka-0-10_2.11-2.2.0.jar\#spark-streaming-kafka-0-10_2.11-2.2.0.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/log4j.properties\#log4j.properties,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/libs.tar.gz\#libs.tar.gz,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/etc.tar.gz\#etc.tar.gz,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/resources.tar.gz\#resources.tar.gz
spark.executor.cores=1
spark.yarn.cache.types=ARCHIVE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,ARCHIVE,ARCHIVE,ARCHIVE
spark.executor.instances=1
spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp\:transport\=dt_socket,server\=y,suspend\=n,address\=35054 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl\=false -Dcom.sun.management.jmxremote.authenticate\=false -Dcom.sun.management.jmxremote.port\=10992
spark.running.mode=yarn
spark.app.name=StreamSets Data Collector\: Cluster_OriginKafka_To_KafkaT
spark.driver.extraJavaOptions=-Xdebug -Xrunjdwp\:transport\=dt_socket,server\=y,suspend\=n,address\=35053 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl\=false -Dcom.sun.management.jmxremote.authenticate\=false -Dcom.sun.management.jmxremote.port\=10991
spark.yarn.dist.jars=file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka_2.11-0.10.0.1.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka-clients-0.10.2.1.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/metrics-core-2.2.0.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/spark-streaming-kafka-0-10_2.11-2.2.0.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/api-lib/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar
Yarn-Driver 命令
exec /bin/bash -c "
$JAVA_HOME/bin/java -server
-Xmx1024m
-Djava.io.tmpdir=$PWD/tmp
'-Xdebug'
'-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35053'
-Dspark.yarn.app.container.log.dir=/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000001
org.apache.spark.deploy.yarn.ApplicationMaster
--class 'com.streamsets.pipeline.BootstrapClusterStreaming'
--jar file:/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar
--properties-file $PWD/__spark_conf__/__spark_conf__.properties
1> /home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000001/stdout
2> /home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000001/stderr
"
Yarn-Executor脚本
exec /bin/bash -c
"$JAVA_HOME/bin/java -server
-Xmx1024m
'-Xdebug'
'-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35054'
-Djava.io.tmpdir=$PWD/tmp
'-Dspark.driver.port=39341'
'-Dspark.ui.port=0'
-Dspark.yarn.app.container.log.dir=/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000002
-XX:OnOutOfMemoryError='kill %p'
org.apache.spark.executor.CoarseGrainedExecutorBackend
--driver-url spark://CoarseGrainedScheduler@192.168.51.152:39341
--executor-id 1
--hostname ldsver52
--cores 1
--app-id application_1588486642253_0015
--user-class-path file:$PWD/__app__.jar
--user-class-path file:$PWD/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar
--user-class-path file:$PWD/kafka_2.11-0.10.0.1.jar
--user-class-path file:$PWD/kafka-clients-0.10.2.1.jar
--user-class-path file:$PWD/metrics-core-2.2.0.jar
--user-class-path file:$PWD/spark-streaming-kafka-0-10_2.11-2.1.0.jar
--user-class-path file:$PWD/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar
--user-class-path file:$PWD/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar
--user-class-path file:$PWD/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar
--user-class-path file:$PWD/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar
1>/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000002/stdout
2>/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000002/stderr
"hadoop_shell_errorcode=$?
SparkStreaming启动和执行 BootstrapClusterStreaming.main()的逻辑:
ApplicationMaster.main(){
SparkHadoopUtil.get.runAsSparkUser { () =>
ApplicationMaster master = new ApplicationMaster(amArgs, new YarnRMClient);
master.run(){
if(isClusterMode){
ApplicationMaster.runDriver(securityMgr);{
# 定义用户App应用的运行(RecordWindowAggr.main()),并结束AppMaster:finish(SUCCEEDED)
userClassThread:Thread = startUserApplication(){
// spark应用中Driver类main入口所在类:如 BootstrapClusterStreaming.main()
val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
userThread:Thread = new Thread(){ run(){
mainMethod.invoke(userArgs.toArray) #RecordWindowAggr.main()
}}
userThread.setName("Driver") // 此处, 在AppMaster进程中,应该能看到名叫Driver的线程;
userThread.start();{//
userThread.run(){
mainMethod.invoke(userArgs.toArray);{//BootstrapClusterStreaming.main()
// BootstrapClusterStreaming的主要执行逻辑
BootstrapClusterStreaming.main(){
binding = SparkStreamingBindingFactory.build(BootstrapCluster.getProperties());{
return bindingFactory[Kafka010SparkStreamingBindingFactory].create(){new Kafka010SparkStreamingBinding();};
}
binding.init();{//SparkStreamingBindingFactory.init()
Configuration hadoopConf = new SparkHadoopUtil().newConfiguration(conf);
offsetHelper = new SdcClusterOffsetHelper(checkPointPath, hdfs, Utils.getKafkaMaxWaitTime(properties));
JavaStreamingContextFactory javaStreamingContextFactory = getStreamingContextFactory();
ssc = javaStreamingContextFactory.create();{//JavaStreamingContextFactoryImpl.create()
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(duration));
return createDStream(jssc, props);{//Kafka010SparkStreamingBindingFactory.createDStream()
JavaInputDStream<ConsumerRecord<byte[], byte[]>> stream = KafkaUtils.createDirectStream();//创建KafkaDStream;
//Driver逻辑:KafkaRDD消费并在mapPartition中通过 dataChannel.dataQueue队列转到SDC框架取消费处理;
Driver$.MODULE$.foreach(stream.dstream(), KafkaOffsetManagerImpl.get());{
dstream.foreachRDD(rdd => {
rdd.mapPartitions(iterator => {
val batch = iterator.map({ pair => new Pair(pair._1, pair._2)});
ClusterFunctionProvider.getClusterFunction.startBatch(batch).asInstanceOf[java.util.Iterator[Record]].asScala;
}
offsetManager.saveOffsets(rdd)
})
}
stream.foreachRDD(new CommitOffset(stream));
return jssc;
}
}
ssc.checkpoint(rddCheckpointDir.toString());
}
BootstrapCluster.createTransformers(binding.getStreamingContext().sparkContext());
binding.startContext();
binding.awaitTermination();
}
}
}
}
return userThread;
}
userClassThread[Thread].join();//等待 BootstrapClusterStreaming.main()线程的执行完成;
}
}
}
}
}