SDC-KafkaDSource源码和原理

KafkaDSource 两种具体运行类:

  • StandaloneKafkaSource extends BaseKafkaSource[extends BaseSource[extends BaseStage<Source.Context>[implements Stage<C>] implements Source] implements OffsetCommitter ]
  • ClusterKafkaSource extends BaseKafkaSource implements OffsetCommitter, ClusterSource, ErrorListener

KafakDSource之 StandaloneKafkaSource的实现

KafakDSour原理图
  1. 根据运行模式, 创建相应Factory: Standalone or Cluster KafkaSourceFactory
  2. 基于factory创建相应的 KafkaDSource: 并用delegate 封装;
  3. 进行Kafaka相关的初始化: 检查zk, kafkaServer等,并创建 KafkaConsumer;
  4. 将kafkaConsumer.poll()封装到ConsumerRunner对象中,并用定时线程每20ms拉去一起到recordQueue中;
originStage.init(Info,Context){ // DStage.init(Info info, C context)
    if(stage == null) {
        stage:Stage<C> = createStage(){ // 由DClusterSourceOffsetCommitter.createStage()实现
            Stage<Source.Context> result = super[DSourceOffsetCommitter].createStage(){ // DSourceOffsetCommitter.createStage()
                source:Source = (Source) super[DSource].createStage(){ // DSource.create()
                    return createSource();{ // 抽象类, 由 KafkaDSource.createSource()实现
                        KafkaDSource.createSource(){
                            // 同时创建 StandaloneKafkaSourceFactory 和 ClusterKafkaSourceFactory两种工程类,装进DelegationgKafka
                            this.delegatingKafkaSource = new DelegatingKafkaSource(new StandaloneKafkaSourceFactory(kafkaConfigBean),
                                new ClusterKafkaSourceFactory(kafkaConfigBean));
                            return delegatingKafkaSource;   
                        }
                    }
                }
                
                offsetCommitter = (OffsetCommitter) source;
                return source;
            }
            
            LOG.info("Created source of type: {}", source);
                * Created source of type: com.bigdata.sdc.origin.kafka.DelegatingKafkaSource@2ce6c6ec
            clusterSource = (ClusterSource) source;
            return result;
        }
    }
    
    List<ConfigIssue> issues = stage.init(info, context){ // BaseStage.init(Info info, C context)
        issues.addAll(init(){ // DelegatingKafkaSource.init()
            boolean isClusterMode = (getContext().getExecutionMode()==ExecutionMode.CLUSTER_BATCH 
                || getContext().getExecutionMode()==ExecutionMode.CLUSTER_YARN_STREAMING 
                || getContext().getExecutionMode()==ExecutionMode.CLUSTER_MESOS_STREAMING);
            
            // 当Standalone或Preview时, 创建 Standalone
            if (getContext().isPreview()|| !isClusterMode ){
                delegate = standaloneKafkaSourceFactory.create(){
                    BaseKafkaSource baseKafkaSource = new StandaloneKafkaSource(conf){
                        super(conf){ // new BaseKafkaSource(){}
                            this.conf = conf;
                            
                            SdcKafkaValidationUtilFactory kafkaUtilFactory= SdcKafkaValidationUtilFactory.getInstance(){
                                return FactoriesBean.getKafkaValidationUtilFactory(){
                                    static FactoriesBean factoriesBean;
                                    // 此方法采用 动态加载机制(SPI)来从 resources/META-INF/services 目录下FactoriesBean接口名文件中加上其实现类; 若没指定任何实现类,则下面static代码报异常;
                                    static ServiceLoader<FactoriesBean> factoriesBeanLoader = ServiceLoader.load(FactoriesBean.class);
                                    static{
                                        for (FactoriesBean bean : factoriesBeanLoader) {
                                            LOG.info("Found FactoriesBean loader {}", bean.getClass().getName());
                                            factoriesBean = bean;
                                            serviceCount++;
                                        }
                                        if (serviceCount != 1) {
                                            throw new RuntimeException(Utils.format("Unexpected number of FactoriesBean: {} instead of 1", serviceCount));
                                        }
                                    }
                                    
                                    return factoriesBean.createSdcKafkaValidationUtilFactory(){ // Kafka11FactoriesBean.createSdcKafkaValidationUtilFactory()
                                        reutrn new Kafka09ValidationUtilFactory(); // 应该是kafka_09和kafka_11的消费api相同,所以代码还是依赖Kafka09Factory的.
                                    }
                                }
                            }
                            
                            kafkaValidationUtil = kafkaUtilFactory.create(){ // Kafka09ValidationUtilFactory.create()
                                return new KafkaValidationUtil09(); //class KafkaValidationUtil09 extends BaseKafkaValidationUtil implements SdcKafkaValidationUtil
                            }
                        }
                    }
                    retrun baseKafkaSource
                }
            } 
            // 当集群模式且不是Preview时, 才创建Cluster Source
            else{ 
                delegate = clusterKafkaSourceFactory.create();
            }
            
            List<ConfigIssue> issues=delegate.init(getInfo(), getContext()){ // BaseStage.init(Info info, C context)
                issues.addAll(init(){ // 抽象方法,由实现类 StandaloneKafkaSource.init()
                    
                    /** 重要1 的Kafka初始化方法:
                    *   - 验证 broker :   kafkaValidationUtil.validateKafkaBrokerConnectionString();
                    *   - 和Zookeeper的配置 kafkaValidationUtil.validateZkConnectionString();
                    *   - 正式创建 KafkaConsumer : SdcKafkaConsumerFactory.create(settings).create();
                    */
                    List<ConfigIssue> issues = super[BaseKafkaSource].init(){ // BaseKafkaSource.init()
                        errorRecordHandler = new DefaultErrorRecordHandler(getContext());
                        if (conf.topic == null || conf.topic.isEmpty()) {
                            issues.add(getContext().createConfigIssue());
                        }
                        if (conf.maxWaitTime < 1) {
                            issues.add(getContext().createConfigIssue());
                        }
                        
                        conf.init(getContext(), issues);
                        
                        conf.dataFormatConfig.init(getContext(),conf.dataFormat);
                        
                        // 创建Kafka消费数据的解析器
                        parserFactory = conf.dataFormatConfig.getParserFactory();
                        
                        // Validate broker config
                        List<HostAndPort> kafkaBrokers = kafkaValidationUtil.validateKafkaBrokerConnectionString(){// BaseKafkaValidationUtil.validateKafkaBrokerConnectionString()
                            String[] brokers = connectionString.split(",");
                            for (String broker : brokers) {
                                kafkaBrokers.add(HostAndPort.fromString(broker));
                            }
                            return kafkaBrokers;
                        }
                        
                        int partitionCount = kafkaValidationUtil.getPartitionCount();
                        if (partitionCount < 1) {
                            issues.add(getContext().createConfigIssue());
                        }else {
                            originParallelism = partitionCount; // 元数据并行度=topic的 partition数量;
                        }
                        
                        // Validate zookeeper config
                        kafkaValidationUtil.validateZkConnectionString(){ // BaseKafkaValidationUtil.validateZkConnectionString()
                            int off = connectString.indexOf('/');
                            if (off >= 0) {
                                connectString = connectString.substring(0, off);
                            }
                            String brokers[] = connectString.split(",");
                            for(String broker : brokers) {
                                kafkaBrokers.add(HostAndPort.fromString(broker));
                            }
                            return kafkaBrokers;
                        }
                        
                        //consumerGroup
                        if (conf.consumerGroup == null || conf.consumerGroup.isEmpty()) {
                            issues.add(getContext().createConfigIssue());
                        }
                        
                        //validate connecting to kafka, 重要步骤1: 正式创建KafkaConsumer, 
                        if (issues.isEmpty()) {
                            Map<String, Object> kafkaConsumerConfigs = new HashMap<>();
                            kafkaConsumerConfigs.putAll(conf.kafkaConsumerConfigs);
                            kafkaConsumerConfigs.put(KafkaConstants.KEY_DESERIALIZER_CLASS_CONFIG, conf.keyDeserializer.getKeyClass());
                            kafkaConsumerConfigs.put(KafkaConstants.VALUE_DESERIALIZER_CLASS_CONFIG, conf.valueDeserializer.getValueClass());
                            kafkaConsumerConfigs.put(KafkaConstants.CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG,conf.dataFormatConfig.schemaRegistryUrls);
                            ConsumerFactorySettings settings = new ConsumerFactorySettings(zk,bootstrapServers,topic,maxWaitTime,kafkaConsumerConfigs,consumerGroup,batchSize);
                            
                            // 创建 SDC的KafakaConsumer基础包装类: BasedKafkaConsumer09
                            SdcKafkaConsumerFactory kafkaConsumerFactory= SdcKafkaConsumerFactory.create(settings){
                                SdcKafkaConsumerFactory kafkaConsumerFactory = FactoriesBean.getKafkaConsumerFactory(){
                                    // factoriesBean变量生成的代码如上:
                                    factoriesBean.createSdcKafkaConsumerFactory(){ // Kafka11FactoriesBean.createSdcKafkaConsumerFactory()
                                        return new Kafka11ConsumerFactory();
                                    }
                                }
                                kafkaConsumerFactory.init(settings){ // Kafka11ConsumerFactory.init()
                                    this.settings = settings;
                                }
                                return kafkaConsumerFactory;
                            }
                            
                            kafkaConsumer = kafkaConsumerFactory.create(){ // Kafka11ConsumerFactory.create()
                                return new KafkaConsumer11(bootStrapServers,topic,consumerGroup,kafkaConsumerConfigs){
                                    super(bootStrapServers, topic, consumerGroup, kafkaConsumerConfigs, context, batchSize){// new KafkaConsumer09()
                                        super(topic, context, batchSize){ // new BaseKafkaConsumer09()
                                            this.recordQueue = new ArrayBlockingQueue<>(batchSize); // batchSize= 20为KafkaConfigBean.maxBatchSize的值;
                                            this.executorService = new ScheduledThreadPoolExecutor(1);
                                            this.rebalanceInProgress = new AtomicBoolean(false);
                                            this.needToCallPoll = new AtomicBoolean(false);
                                        }
                                        this.bootStrapServers = bootStrapServers;
                                        this.consumerGroup = consumerGroup;
                                    }
                                }
                            }
                            
                            /** 重要步骤: 正式创建KafkaConsumer, 
                            *   -创建 官方的KafkaConsumer
                            *   - 订阅当个Topic
                            *   - 获取该Topic的全部分区;
                            */
                            kafkaConsumer.validate(issues, getContext()){ // BaseKafkaConsumer09.validate()
                                createConsumer(){ // BaseKafkaConsumer09.createConsumer()
                                    configureKafkaProperties(kafkaConsumerProperties){
                                        // 默认fasel,关闭 enable.auto.commit,禁用自动提交; 即offset叫由SDC自己管理;
                                        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_ENABLED_DEFAULT);
                                        if (this.context.isPreview()) {
                                            // preview模式, auto.offset.reset=earliest,
                                            props.setProperty(KafkaConstants.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.AUTO_OFFSET_RESET_PREVIEW_VALUE);
                                        }
                                        addUserConfiguredProperties(props)
                                    }
                                    kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties);
                                }
                                
                                // 订阅主题: 消费者自动再均衡(reblance)的功能
                                subscribeConsumer(){
                                    kafkaConsumer.subscribe(Collections.singletonList(topic), this);
                                }
                                // 获取该Topic的全部分区
                                kafkaConsumer.partitionsFor(topic);
                            }
                            
                        }
                        return issues;
                    }
                    
                    /** 重要步骤2: 用100ms定时线程执行KafkaConsumerRunner.run(), 以作为生成者不断从KafakServer 拉去数据到 consumer.recordQueue队列中
                    *
                    */
                    if (issues.isEmpty()){
                        // 对于预览模式, batch间最大waitTime固定为1秒;
                        if (getContext().isPreview()) conf.maxWaitTime = 1000;
                        
                        kafkaConsumer.init(){ // BaseKafkaConsumer09.init()
                            kafkaConsumerRunner = new KafkaConsumerRunner(this:BaseKafkaConsumer09){ // KafkaConsumerRunner implements Runnable()
                                @Override public void run(){
                                    ConsumerRecords<String, byte[]> poll;
                                    synchronized (consumer.pollCommitMutex){
                                        poll = consumer.kafkaConsumer.poll(CONSUMER_POLLING_WINDOW_MS); //固定100 (毫秒)
                                        consumer.needToCallPoll.set(false);
                                        
                                        LOG.trace("Read {} messages from Kafka",poll.count(),consumer.recordQueue.size());
                                        for(ConsumerRecord<String, byte[]> r : poll) {
                                            if(consumer.needToCallPoll.get()) {
                                                consumer.recordQueue.clear();
                                                return;
                                            }
                                            
                                            boolean isQueueOK = putConsumerRecord(r){
                                                try{
                                                    // 注意, ArrayBlockingQueue<ConsumerRecord<String, byte[]>> recordQueue=new ArrayBlockingQueue<>(batchSize);
                                                    consumer.recordQueue.put(record); 
                                                    return true;
                                                }catch (InterruptedException e) {
                                                    // 中断当前线程 20毫秒 执行kafkaConsumerRunner.run()从Kafka拉下数据的poll线程; 只需中断标志,仍然继续执行;
                                                    Thread.currentThread().interrupt();
                                                    return false;
                                                }
                                            }
                                            
                                            if (! isQueueOK) { return;}
                                        }
                                    }
                                }
                            }
                            
                            executorService.scheduleWithFixedDelay(kafkaConsumerRunner, 0, 20, TimeUnit.MILLISECONDS);
                            isInited = true;
                        }
                        LOG.info("Successfully initialized Kafka Consumer");
                            * Successfully initialized Kafka Consumer
                        
                        // 这个是干嘛的?
                        getContext().publishLineageEvent( getContext().createLineageEvent(LineageEventType.ENTITY_READ));
                        
                    }
                });
            }
            
            return issues;
        });
        
        if (requiresSuperInit && !superInitCalled) {
             issues.add(context.createConfigIssue(null, null, Errors.API_20));
        }
        return issues;
    }
    
    return issues;
    
}

DSource.producer() 单个批次处理逻辑

SourceRunner.runProduce(offset,maxBatchSize){
    BatchMakerImpl batchMaker = new BatchMakerImpl(((Source.Context) getContext()).getOutputLanes()){
        if(outputLanes.size() == 1) {
            singleLaneOutput = outputLanes.iterator().next();
        }else{
            singleLaneOutput = null;
        }
        
        for(String lane : outputLanes) {
            laneToRecordsMap.put(lane, new ArrayList<Record>());
        }
    }
    
    String newOffset = getStage().produce(lastOffset, maxBatchSize, batchMaker){ // DSource.produce()
        return getSource().produce(lastSourceOffset, maxBatchSize, batchMaker){ // DelegatingKafkaSource.produce()
            return delegate.produce(lastSourceOffset, maxBatchSize, batchMaker){ // StandaloneKafkaSource.produce()
                int recordCounter = 0;
                // 取配置或 管理器传入?的maxBatchSize 作为当前batchSize;
                int batchSize = conf.maxBatchSize > maxBatchSize ? maxBatchSize : conf.maxBatchSize;
                
                while (recordCounter < batchSize && (startTime + conf.maxWaitTime) > System.currentTimeMillis()) {
                    
                    MessageAndOffset message = kafkaConsumer.read(){ // BaseKafkaConsumer11.read() ->调用继承的BaseKafkaConsumer09.read() 
                        gaugeMap.put(REBALANCE_IN_PROGRESS, rebalanceInProgress.get());
                        gaugeMap.put(WAITING_ON_POLL, needToCallPoll.get());
                        // On rebalancing or if we need to call poll first, there is no point to read any messages from the buffer
                        // 当AbstractCoordinator.ensureActiveGroup()中, 当 needsJoinPrepare=true(初始化或initiateJoinGroup()后)时,会将rebalanceInProgress置位true
                        // 在commit()中当KafkaConsuemr.commitSync()(向zk?)同步元数据时失败跑CommitFailedException时,就设needToCallPoll=true
                        if(rebalanceInProgress.get() || needToCallPoll.get()) {
                          // Small back off to give Kafka time to rebalance or us to get a chance to call poll()
                          try {
                            Thread.sleep(500);
                          } catch (InterruptedException e) {
                            // Not really important to us
                            Thread.currentThread().interrupt();
                          }

                          // Return no message
                          LOG.debug(
                              "Generating empty batch since the consumer is not ready to consume (rebalance={}, needToPoll={})",
                              rebalanceInProgress.get(),
                              needToCallPoll.get()
                          );
                          return null;
                        }

                        ConsumerRecord<String, byte[]> next;
                        try {
                           // If no record is available within the given time return null
                           next = recordQueue.poll(500, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                          throw createReadException(e);
                        }
                        MessageAndOffset messageAndOffset = null;
                        if(next != null) {
                          updateEntry(next);
                          messageAndOffset = new MessageAndOffset(next.value(), next.offset(), next.partition());
                        }
                        return messageAndOffset;
                        
                        
                    }
                    
                    
                    if (message != null) {
                        // 调用父类 BaseKafkaSource的processKafkaMessageDefault()将byte[]消息转换成Record;
                        records = processKafkaMessageDefault(message.getPartition(),message.getOffset(),getMessageID(message),(byte[]) message.getPayload()){
                            BaseKafkaSource.(partition,offset,messageId,payload){
                                if (payload == null) { //当消息的value为Null时,Kafka发送个错误Record to Pipeline
                                    Record record = getContext().createRecord(messageId);
                                    errorRecordHandler.onError(record,messageId)
                                }
                                return ;
                            }
                            
                            // 实际是创建JsonCharDataParser(), 其实际封装 jackjson.ReaderBasedJsonParser()对象;
                            DataParser parser = parserFactory.getParser(messageId, payload){//WrapperDataParserFactory.getParser()
                                DataParser parser = this.factory.getParser(id, data){
                                    return this.getParser(id, data, 0, data.length){//DataParserFactory.getParser()
                                        return this.getParser(id, (InputStream)(new ByteArrayInputStream(data, offset, len)), (String)"0"){//DataParserFactory.getParser()
                                            return this.createParser(id, this.createReader(is), Long.parseLong(offset)){// //JsonDataParserFactory.getParser(String id, InputStream is, String offset)
                                                return new JsonCharDataParser(this.getSettings().getContext(), id, reader, offset, ((JsonMode)this.getSettings().getMode(JsonMode.class)).getFormat(), this.getSettings().getMaxRecordLen()){
                                                    this.context = context;
                                                    this.readerId = readerId;
                                                    this.maxObjectLen = maxObjectLen;
                                                    this.parser = ((ContextExtensions)context).createJsonObjectReader(reader, readerOffset, maxObjectLen, mode, Object.class){//ProtoContext.createJsonObjectReader()
                                                        return JsonWriterReaderFactory.createObjectReader(reader, initialPosition, mode, objectClass, maxObjectLen){//JsonWriterReaderFactory.createObjectReader()
                                                            return new OverrunJsonObjectReaderImpl(new OverrunReader(reader, OverrunReader.getDefaultReadLimit(), false, false), initialPosition, maxObjectLen, mode, objectClass){
                                                                super(reader, initialPosition, mode, objectClass, objectMapper){
                                                                    this.reader = reader;
                                                                    if (mode == Mode.MULTIPLE_OBJECTS) {
                                                                        if (initialPosition > 0L) {
                                                                            IOUtils.skipFully(reader, initialPosition);
                                                                            this.posCorrection += initialPosition;
                                                                        }

                                                                        if (reader.markSupported()) {
                                                                            reader.mark(64);
                                                                            int count = 0;
                                                                            byte firstByte = -1;
                                                                            while(count++ < 64 && (firstByte = (byte)reader.read()) != -1 && firstByte <= 32) {
                                                                            }
                                                                            if (firstByte > 32) {
                                                                                this.firstNonSpaceChar = firstByte;
                                                                            }
                                                                            reader.reset();
                                                                        }
                                                                    }

                                                                    this.jsonParser = this.getObjectMapper().getFactory().createParser(reader){
                                                                        IOContext ctxt = this._createContext(r, false);
                                                                        return this._createParser(this._decorate(r, ctxt), ctxt){
                                                                            return new ReaderBasedJsonParser(ctxt, this._parserFeatures, r, this._objectCodec, this._rootCharSymbols.makeChild(this._factoryFeatures));
                                                                        }
                                                                    }
                                                                    if (mode == Mode.ARRAY_OBJECTS && initialPosition > 0L) {
                                                                        this.fastForwardJsonParser(initialPosition);
                                                                    }
                                                                                                                    
                                                                }
                                                                
                                                                this.countingReader = (OverrunReader)this.getReader();
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                                
                                return new WrapperDataParserFactory.WrapperDataParser(parser);
                            }
                            
                            // 从字符串转换成 -> Map(json) ;  Map -> Record
                            Record record = parser.parse(){ // WrapperDataParser.parse()
                                return this.dataParser.parse(){// JsonCharDataParser.parse()
                                    long offset = this.parser.getReaderPosition(){
                                        long maxAsOffset= Math.max(this.jsonParser.getTokenLocation().getCharOffset(), 0L);
                                        return this.mode == Mode.ARRAY_OBJECTS ? maxAsOffset : maxAsOffset + this.posCorrection;
                                    }
                                    /**
                                    * Object json里面是一个EnforceMap 对象: class EnforceMap extends LinkedHashMap;
                                    * List -> EnforcerList extends ArrayList
                                    */
                                    
                                    Object json = this.parser.read(){ //JsonObjectReaderImpl.read()
                                        switch(this.mode) {
                                            case ARRAY_OBJECTS:
                                                value = this.readObjectFromArray();
                                                break;
                                            case MULTIPLE_OBJECTS:
                                                value = this.readObjectFromStream(){ //OverrunJsonObjectReaderImpl.readObjectFromStream()
                                                    this.countingReader.resetCount();
                                                    this.limitOffset = this.getJsonParser().getCurrentLocation().getCharOffset() + (long)this.maxObjectLen;
                                                    TL.set(this);
                                                    var1 = super.readObjectFromStream(){// JsonObjectReaderImpl.readObjectFromStream()
                                                        if (this.nextToken != null) {
                                                            value = this.jsonParser.readValueAs(this.getExpectedClass()){//JsonParser.readValueAs(Class<T> valueType==Object.class)
                                                                return this._codec(){
                                                                    ObjectCodec c = this.getCodec();
                                                                    if (c == null) {
                                                                        throw new IllegalStateException("No ObjectCodec defined for parser, needed for deserialization");
                                                                    }else{
                                                                        return c;
                                                                    }
                                                                }
                                                                    .readValue(this, valueType){//ObjectMapper.readValue(JsonParser p, Class<T> valueType)
                                                                        return this._readValue(this.getDeserializationConfig(), p, this._typeFactory.constructType(valueType));{// ObjectMapper._readValue(DeserializationConfig cfg, JsonParser p, JavaType valueType)
                                                                            
                                                                            JsonToken t = this._initForReading(p);
                                                                            DefaultDeserializationContext ctxt;
                                                                            Object result;
                                                                            if (t == JsonToken.VALUE_NULL) {
                                                                                ctxt = this.createDeserializationContext(p, cfg);
                                                                                result = this._findRootDeserializer(ctxt, valueType).getNullValue(ctxt);
                                                                            } else if (t != JsonToken.END_ARRAY && t != JsonToken.END_OBJECT) {
                                                                                ctxt = this.createDeserializationContext(p, cfg);
                                                                                JsonDeserializer<Object> deser = this._findRootDeserializer(ctxt, valueType);
                                                                                if (cfg.useRootWrapping()) {
                                                                                    result = this._unwrapAndDeserialize(p, ctxt, cfg, valueType, deser);
                                                                                } else {
                                                                                    result = deser.deserialize(p, ctxt);{//UntypedObjectDeserializer.deserialize()
                                                                                        switch(p.getCurrentTokenId()) {
                                                                                            case 1,2,5:
                                                                                                if (this._mapDeserializer != null) {
                                                                                                    return this._mapDeserializer.deserialize(p, ctxt);{//OverrunJsonObjectReaderImpl.deserialise()
                                                                                                        return (Map)jp.readValueAs(OverrunJsonObjectReaderImpl.EnforcerMap.class);{
                                                                                                            JsonParse.readValueAs(); 递归又进一步解析子节点.
                                                                                                            // 如果对Json的解析是一个node一个node的递归解析, 那么指定类型,会不会快很多. 
                                                                                                        }
                                                                                                    }
                                                                                                }
                                                                                            case 3: 
                                                                                        }
                                                                                    }
                                                                                }
                                                                            } else {
                                                                                result = null;
                                                                            }

                                                                            p.clearCurrentToken();
                                                                            return result;
                                                                        }
                                                                }
                                                            }
                                                            this.nextToken = this.jsonParser.nextToken();
                                                            if (this.nextToken == null) {
                                                                ++this.posCorrection;
                                                            }
                                                        }
                                                    }
                                                }   
                                                break;
                                        }
                                        return value;
                                    }
                                    
                                    /**
                                    * 递归调用 jsonToField() 不断把Map中元素解析成 Integer/Double/String.. 等基本类型; 并用Field封装下.
                                    */
                                    if (json != null) {
                                        record = this.createRecord(offset, json);{//JsonCharDataParser.createRecord()
                                            Record record = this.context.createRecord(this.readerId + "::" + offset);
                                            record.set(this.jsonToField(json, offset){// JsonCharDataParser.jsonToField(Object json, long offset)
                                                
                                                Field field;
                                                if (json == null) {
                                                  field = Field.create(Field.Type.STRING, null);
                                                } else if (json instanceof List) {
                                                  List jsonList = (List) json;
                                                  List<Field> list = new ArrayList<>(jsonList.size());
                                                  for (Object element : jsonList) {
                                                      // 对其中每个元素 递归调用 jsonToField()
                                                    list.add(jsonToField(element, offset));
                                                  }
                                                  field = Field.create(list);
                                                } else if (json instanceof Map) {
                                                  Map<String, Object> jsonMap = (Map<String, Object>) json;
                                                  Map<String, Field> map = new LinkedHashMap<>();
                                                  for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
                                                    // 对每个元素 递归调用 jsonToField()
                                                    map.put(entry.getKey(), jsonToField(entry.getValue(), offset));
                                                  }
                                                  field = Field.create(map);
                                                } else if (json instanceof String) {
                                                  field = Field.create((String) json);
                                                } else if (json instanceof Boolean) {
                                                  field = Field.create((Boolean) json);
                                                } 
                                                // Integer,Long,String,Double, Float,byte[]等进行cast
                                                else if (json instanceof BigDecimal) {
                                                  field = Field.create((BigDecimal) json);
                                                } else if (json instanceof BigInteger) {
                                                  field = Field.create(new BigDecimal((BigInteger) json));
                                                } else {
                                                  throw new DataParserException(Errors.JSON_PARSER_01, readerId, offset, json.getClass().getSimpleName());
                                                }
                                                return field;
                                                
                                            });
                                            return record;
                                        }
                                    }
                                    return record;
                                }
                            }
                            
                            while (record != null) {
                                record.getHeader().setAttribute(HeaderAttributeConstants.TOPIC, conf.topic);
                                record.getHeader().setAttribute(HeaderAttributeConstants.PARTITION, partition);
                                record.getHeader().setAttribute(HeaderAttributeConstants.OFFSET, String.valueOf(offset));
                                records.add(record);
                                record = parser.parse();
                            }
                            
                            if (conf.produceSingleRecordPerMessage) {
                                records.forEach(record -> list.add(record.get()));
                                Record record= records.get(0).set(Field.create(Field.Type.LIST, list));
                            }
                            return records;
                        }
                        
                        // 若为Preview模式,截取 剩余小部分数据;
                        if (getContext().isPreview() && recordCounter + records.size() > batchSize) {
                            records = records.subList(0, batchSize - recordCounter);
                        }
                        for (Record record : records) {
                            batchMaker.addRecord(record);
                        }
                        recordCounter += records.size();
                    }
                }
                return lastSourceOffset;
            }
        }
    }
    
    // 整个KafkaSource.produce()方法执行完后, 提交 newOffset;
    if (getStage() instanceof OffsetCommitter) {
        ((OffsetCommitter)getStage()).commit(newOffset){//DSourceOffsetCommitter.commit(offset)
            offsetCommitter.commit(offset){//DelegatingKafkaSource.commit()
                delegate.commit(offset){
                    // Standalone模式:
                    StandaloneKafkaSource.commit(offset){// 其内部的KafkaConsumer09提交offset实现, 是从缓存读取,而不是外部转入;
                        kafkaConsumer.commit(){//KafkaConsumer11.commit() -> KafkaConsumer09.commit()
                            
                            synchronized (pollCommitMutex) {
                              // While rebalancing there is no point for us to commit offset since it's not allowed operation
                              if(rebalanceInProgress.get()) {
                                LOG.debug("Kafka is rebalancing, not commiting offsets");
                                return;
                              }
                              if(needToCallPoll.get()) {
                                LOG.debug("Waiting on poll to be properly called before continuing.");
                                return;
                              }

                              try {
                                if(topicPartitionToOffsetMetadataMap.isEmpty()) {
                                  LOG.debug("Skipping committing offsets since we haven't consume anything.");
                                  return;
                                }
                                // topicPartitionToOffsetMetadataMap: Map<TopicPartition, OffsetAndMetadata> , 管理每个分区的最新offset
                                kafkaConsumer.commitSync(topicPartitionToOffsetMetadataMap){
                                    acquire();
                                    try{
                                        coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE){//ConsumerCoordinator.commitOffsetsSync()
                                            
                                            invokeCompletedOffsetCommitCallbacks();
                                            if (offsets.isEmpty())
                                                return true;

                                            long remainingMs = timeoutMs;
                                            do {
                                                if (coordinatorUnknown()) {
                                                    if (!ensureCoordinatorReady(now, remainingMs))
                                                        return false;

                                                    remainingMs = timeoutMs - (time.milliseconds() - startMs);
                                                }

                                                RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
                                                client.poll(future, remainingMs);

                                                if (future.succeeded()) {
                                                    if (interceptors != null)
                                                        interceptors.onCommit(offsets);
                                                    return true;
                                                }
                                                
                                                // 进入此处,说明提交offset失败, 将进入重试或抛异常终止提交;
                                                // 若因 rebalanced导致CommitFailedEx,则会讲BasedKafakConsumer09.needToCallPoll置为ture,并recordQueue.clear();
                                                // 当BasedKafkaConsumer.read() 再从recordQueue拉(一个)数据时,就会先等500ms(为什么? 等KafkaConsumerRunner线程重新poll,或等待rebalance?),并返回空数据;
                                                if (!future.isRetriable())
                                                    throw future.exception();
                                                time.sleep(retryBackoffMs);
                                                now = time.milliseconds();
                                                remainingMs = timeoutMs - (now - startMs);
                                            } while (remainingMs > 0);

                                            return false;
                                            
                                            
                                        }
                                    }finally{
                                        release();
                                    }
                                }
                                
                              } catch(CommitFailedException ex) {
                                LOG.warn("Can't commit offset to Kafka: {}", ex.toString(), ex);
                                needToCallPoll.set(true);
                                recordQueue.clear();
                              } finally {
                                topicPartitionToOffsetMetadataMap.clear();
                              }
                            }
                            
                        }
                    }
                }
            }
        }
    }
}


©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容