SDC-部署和sdc各启停脚本

StreamSets的部署

  1. 配置 SDC_CONF,SDC_DATA,SDC_LOG, SPARK_SUBMIT_COMMAND等变量;

vim libexec/sdc-env.sh //修改环境变量;
export SDC_CONF=/home/bigdata/data/sdc/conf
export SDC_DATA=/home/bigdata/data/sdc/data
export SDC_LOG=/home/bigdata/log/sdc

# 配置Java, Hadoop, Spark的Home目录; 
export JAVA_HOME=/usr/java/jdk-release

export HADOOP_HOME=/home/bigdata/app/hadoop-release
export HADOOP_COMMAND=$HADOOP_HOME/bin/hadoop
export YARN_COMMAND=$HADOOP_HOME/bin/yarn

export SPARK_HOME=/home/bigdata/app/spark-release
export SPARK_SUBMIT_YARN_COMMAND=$SPARK_HOME/bin/spark-submit

export SDC_JAVA_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10991 ${SDC_JAVA_OPTS} "
  1. 修改打开文件数和硬件IO数量?
ulimit -a // 查看所有的资源限制情况;
ulimit -n //查看当前文件打开数量;
ulimit -n 32768  //临时本会话,设为open-file最大数;

// 永久修改本机的打开文件数量
 vi /etc/security/limits.conf
sdc soft nofile    32768
sdc hard nofile    32768

DataCollector的编译

// 第一步, 先编译 api 和plugins-api的包
git clone git@git.envisioncn.com:arch/datacollector-plugin-api.git datacollector-plugin-api;
cd datacollector-plugin-api;
mvn clean install -DskipTests

git clone git@git.envisioncn.com:arch/datacollector-api.git datacollector-api;
cd datacollector-api;
mvn clean install -DskipTests


// 第二步, 上述2个包都installed 后, 开始编译datacollector
git clone git@git.envisioncn.com:arch/datacollector.git datacollector;
cd datacollector;
mvn clean package -DskipTests -DskipRat -Drelease

DataCollector编译报错与解决

  1. enos-spark的version问题; 将所有原来的2.3.2.3.1** 改成2.3.2即可;

[INFO] StreamSets Data Collector UI ....................... FAILURE [  5.416 s]
[INFO] StreamSets Data Collector Docs ..................... SKIPPED

[ERROR] Failed to execute goal com.github.trecloux:yeoman-maven-plugin:0.4:build (run-grunt) on project streamsets-datacollector-ui: Error during : cmd /c node --version: Process exited
with an error: 1 (Exit value: 1) -> [Help 1]

Spark任务启动脚本

SDC的 _cluster-manager脚本

Starting: SystemProcess: /home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/

_cluster-manager start 
--master yarn 
--deploy-mode cluster 
--executor-memory 1024m 
--executor-cores 1 
--num-executors 1 
--archives /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/libs.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/etc.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/resources.tar.gz 

--files /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestClusterDSource62e2fe99-d7f8-4f4d-8ba8-38282095bafe-0/staging/log4j.properties 

--jars /home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka_2.11-0.10.0.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/spark-streaming-kafka-0-10_2.11-2.0.3-EDH-0u1-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka-clients-0.10.2.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/metrics-core-2.2.0.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/api-lib/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar 

--conf spark.running.mode=yarn 
--conf spark.streaming.dynamicAllocation.maxExecutors=3 
--conf spark.driver.extraJavaOptions=-Duser.home=. 
--conf spark.executor.extraJavaOptions=-javaagent:./streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar -Duser.home=. -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug 

--conf spark.executorEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65 
--conf spark.streaming.kafka.consumer.poll.max.retries=5 
--conf spark.yarn.appMasterEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65 
--conf spark.driver.extraJavaOptions=-Xdebug -Xrunjdwp:transport=dt_socket,server=n,suspend=n,address=192.168.51.1:35053 --conf spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35054 
--name StreamSets Data Collector: Cluster_OriginKafka_To_Trash_TestClusterDSource 
--class com.streamsets.pipeline.BootstrapClusterStreaming

 /home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar    

SDC-sparkSubmit 命令

// 分解
spark-submit --master yarn 
--deploy-mode cluster 
--executor-memory 1024m 
--executor-cores 1 
--num-executors 2

 --archives /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/libs.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/etc.tar.gz,/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/resources.tar.gz 

--files /home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToTrashTestMultiWorkerPerf90ece481-843c-4e5e-86fe-d17911ccbc5c-0/staging/log4j.properties 

--jars /home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka_2.11-0.10.0.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka-clients-0.10.2.1.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/metrics-core-2.2.0.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/spark-streaming-kafka-0-10_2.11-2.1.0.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/api-lib/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar

--conf spark.running.mode=yarn 
--conf spark.streaming.dynamicAllocation.maxExecutors=16 
--conf spark.driver.extraJavaOptions=-Duser.home=. 
--conf 'spark.executor.extraJavaOptions=-javaagent:./streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar -Duser.home=. -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug' 
--conf spark.executorEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65 
--conf spark.streaming.kafka.consumer.poll.max.retries=5 
--conf spark.yarn.appMasterEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65 
--conf 'spark.driver.extraJavaOptions=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35053' 
--name 'StreamSets Data Collector: Cluster_OriginKafka_To_Trash_TestMultiWorkerPerf' 
--class com.streamsets.pipeline.BootstrapClusterStreaming 

/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar


Spark-submit提交中:spark_conf.properties

__spark_conf__158841583956203155.zip 解压内存

spark提交时会从本地临时目录中创建生成一个
__spark_conf__158***.zip 的本地zip压缩文件;
该文件中, 主要是hadoop, yarn, spark的配置文件;
其中 spark相关属性文件: spark_conf.properties, 内容如下:

spark.yarn.dist.files=file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/log4j.properties
spark.yarn.cache.visibilities=PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE,PRIVATE
spark.streaming.dynamicAllocation.maxExecutors=16
spark.streaming.blockInterval=300000
spark.yarn.cache.timestamps=1589782852008,1589782852187,1589782852646,1589782852763,1589782852806,1589782852864,1589782853161,1589782853222,1589782853316,1589782853371,1589782853420,1589782853450,1589782859142,1589782859604,1589782860068
spark.streaming.dynamicAllocation.scalingInterval=10000
spark.executor.memory=1024m
spark.yarn.dist.archives=file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/libs.tar.gz,file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/etc.tar.gz,file\:/home/app/appdata/streamset/data/temp/cluster-pipeline-ClusterOriginKafkaToKafkaT41271c14-12a8-4761-8f79-4901d995cd88-0/staging/resources.tar.gz
spark.yarn.cache.confArchive=hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/__spark_conf__.zip
spark.yarn.cache.sizes=208179347,85717,34568,5151376,951041,82123,219810,1643501,78429,161631,10314,1772,202462143,16315,470
spark.driver.memory=1G
spark.submit.deployMode=cluster
spark.yarn.appMasterEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
spark.executorEnv.JAVA_HOME=/home/app/java/jdk1.8.0_65
spark.yarn.secondary.jars=streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,kafka_2.11-0.10.0.1.jar,kafka-clients-0.10.2.1.jar,metrics-core-2.2.0.jar,spark-streaming-kafka-0-10_2.11-2.2.0.jar,streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar
spark.streaming.backpressure.enabled=true
spark.streaming.dynamicAllocation.enabled=false
spark.streaming.kafka.consumer.poll.max.retries=5
spark.master=yarn
spark.yarn.cache.filenames=hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/__spark_libs__1360858079026319589.zip\#__spark_libs__,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar\#__app__.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/kafka_2.11-0.10.0.1.jar\#kafka_2.11-0.10.0.1.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/kafka-clients-0.10.2.1.jar\#kafka-clients-0.10.2.1.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/metrics-core-2.2.0.jar\#metrics-core-2.2.0.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/spark-streaming-kafka-0-10_2.11-2.2.0.jar\#spark-streaming-kafka-0-10_2.11-2.2.0.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar\#streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/log4j.properties\#log4j.properties,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/libs.tar.gz\#libs.tar.gz,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/etc.tar.gz\#etc.tar.gz,hdfs\://ldsver51\:9000/user/app/.sparkStaging/application_1589778130925_0004/resources.tar.gz\#resources.tar.gz
spark.executor.cores=1
spark.yarn.cache.types=ARCHIVE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,FILE,ARCHIVE,ARCHIVE,ARCHIVE
spark.executor.instances=1
spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp\:transport\=dt_socket,server\=y,suspend\=n,address\=35054 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl\=false -Dcom.sun.management.jmxremote.authenticate\=false -Dcom.sun.management.jmxremote.port\=10992
spark.running.mode=yarn
spark.app.name=StreamSets Data Collector\: Cluster_OriginKafka_To_KafkaT
spark.driver.extraJavaOptions=-Xdebug -Xrunjdwp\:transport\=dt_socket,server\=y,suspend\=n,address\=35053 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl\=false -Dcom.sun.management.jmxremote.authenticate\=false -Dcom.sun.management.jmxremote.port\=10991
spark.yarn.dist.jars=file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka_2.11-0.10.0.1.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/kafka-clients-0.10.2.1.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/metrics-core-2.2.0.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/streamsets-libs/streamsets-datacollector-edh-cluster-kafka_0_10-spark_2_0-lib/lib/spark-streaming-kafka-0-10_2.11-2.2.0.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/container-lib/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/api-lib/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar,file\:/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar

Yarn-Driver 命令

exec /bin/bash -c "

$JAVA_HOME/bin/java -server 
-Xmx1024m 
-Djava.io.tmpdir=$PWD/tmp 
'-Xdebug' 
'-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35053' 
-Dspark.yarn.app.container.log.dir=/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000001 

org.apache.spark.deploy.yarn.ApplicationMaster 

--class 'com.streamsets.pipeline.BootstrapClusterStreaming' 
--jar file:/home/app/stream/streamset/streamset-3.3.2_tag0529/libexec/bootstrap-libs/cluster/streamsets-datacollector-cluster-bootstrap-api-3.3.2-SNAPSHOT.jar 
--properties-file $PWD/__spark_conf__/__spark_conf__.properties 

1> /home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000001/stdout 
2> /home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000001/stderr

"

Yarn-Executor脚本

exec /bin/bash -c 
"$JAVA_HOME/bin/java -server 
-Xmx1024m 
'-Xdebug' 
'-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=35054' 
-Djava.io.tmpdir=$PWD/tmp 
'-Dspark.driver.port=39341' 
'-Dspark.ui.port=0' 
-Dspark.yarn.app.container.log.dir=/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000002 
-XX:OnOutOfMemoryError='kill %p' 
org.apache.spark.executor.CoarseGrainedExecutorBackend 
--driver-url spark://CoarseGrainedScheduler@192.168.51.152:39341 
--executor-id 1 
--hostname ldsver52 
--cores 1 
--app-id application_1588486642253_0015 
--user-class-path file:$PWD/__app__.jar 
--user-class-path file:$PWD/streamsets-datacollector-bootstrap-3.3.2-SNAPSHOT.jar 
--user-class-path file:$PWD/kafka_2.11-0.10.0.1.jar 
--user-class-path file:$PWD/kafka-clients-0.10.2.1.jar 
--user-class-path file:$PWD/metrics-core-2.2.0.jar 
--user-class-path file:$PWD/spark-streaming-kafka-0-10_2.11-2.1.0.jar 
--user-class-path file:$PWD/streamsets-datacollector-container-3.3.2-SNAPSHOT.jar 
--user-class-path file:$PWD/streamsets-datacollector-common-3.3.2-SNAPSHOT.jar 
--user-class-path file:$PWD/streamsets-datacollector-api-3.3.2-SNAPSHOT.jar 
--user-class-path file:$PWD/streamsets-datacollector-cluster-bootstrap-3.3.2-SNAPSHOT.jar 

1>/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000002/stdout 

2>/home/app/appdata/hadoop/dataDir/yarn-logs/userlogs/application_1588486642253_0015/container_1588486642253_0015_01_000002/stderr

"hadoop_shell_errorcode=$?

SparkStreaming启动和执行 BootstrapClusterStreaming.main()的逻辑:

ApplicationMaster.main(){
    SparkHadoopUtil.get.runAsSparkUser { () =>
        ApplicationMaster master = new ApplicationMaster(amArgs, new YarnRMClient);
        master.run(){
            if(isClusterMode){
                ApplicationMaster.runDriver(securityMgr);{
                    # 定义用户App应用的运行(RecordWindowAggr.main()),并结束AppMaster:finish(SUCCEEDED)
                    userClassThread:Thread = startUserApplication(){
                        // spark应用中Driver类main入口所在类:如 BootstrapClusterStreaming.main()
                        val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
                        userThread:Thread = new Thread(){ run(){
                            mainMethod.invoke(userArgs.toArray) #RecordWindowAggr.main()
                        }}
                        userThread.setName("Driver") // 此处, 在AppMaster进程中,应该能看到名叫Driver的线程;
                        userThread.start();{//
                            userThread.run(){
                                mainMethod.invoke(userArgs.toArray);{//BootstrapClusterStreaming.main()
                                    // BootstrapClusterStreaming的主要执行逻辑
                                    BootstrapClusterStreaming.main(){
                                        binding = SparkStreamingBindingFactory.build(BootstrapCluster.getProperties());{
                                            return bindingFactory[Kafka010SparkStreamingBindingFactory].create(){new Kafka010SparkStreamingBinding();};
                                        }
                                        binding.init();{//SparkStreamingBindingFactory.init()
                                            Configuration hadoopConf = new SparkHadoopUtil().newConfiguration(conf);
                                            offsetHelper = new SdcClusterOffsetHelper(checkPointPath, hdfs, Utils.getKafkaMaxWaitTime(properties));
                                            JavaStreamingContextFactory javaStreamingContextFactory = getStreamingContextFactory();
                                            ssc = javaStreamingContextFactory.create();{//JavaStreamingContextFactoryImpl.create()
                                                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(duration));
                                                return createDStream(jssc, props);{//Kafka010SparkStreamingBindingFactory.createDStream()
                                                    JavaInputDStream<ConsumerRecord<byte[], byte[]>> stream = KafkaUtils.createDirectStream();//创建KafkaDStream;
                                                    //Driver逻辑:KafkaRDD消费并在mapPartition中通过 dataChannel.dataQueue队列转到SDC框架取消费处理;
                                                    Driver$.MODULE$.foreach(stream.dstream(), KafkaOffsetManagerImpl.get());{
                                                        dstream.foreachRDD(rdd => {
                                                            rdd.mapPartitions(iterator => {
                                                                val batch = iterator.map({ pair  => new Pair(pair._1, pair._2)});
                                                                ClusterFunctionProvider.getClusterFunction.startBatch(batch).asInstanceOf[java.util.Iterator[Record]].asScala;
                                                            }
                                                            offsetManager.saveOffsets(rdd)
                                                        })
                                                    }
                                                    stream.foreachRDD(new CommitOffset(stream));
                                                    return jssc;
                                                }
                                            }
                                            ssc.checkpoint(rddCheckpointDir.toString());
                                        }
                                        BootstrapCluster.createTransformers(binding.getStreamingContext().sparkContext());
                                        binding.startContext();
                                        binding.awaitTermination();
                                    }
                                }
                            }
                        }
                        return userThread;
                    }
                    userClassThread[Thread].join();//等待 BootstrapClusterStreaming.main()线程的执行完成;
                }
            }
        }
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351