实时任务Peon进程创建segment

创建Segment过程概述

Peon进程由middle manager进程启动
提供http接口接收原始数据
将一行行的数据做merge,当行数达到maxRowsInMemory(当前集群配置是50000)或者intermediatePersistPeriod(10分钟),将内存中的数据序列化成Segment文件,持久化到本地磁盘
达到时间窗口后,停止接收数据,并将上步创建的segment合并成一个大的segment
新创建的segment通过hdfs移交到历史节点

启动Peon进程

middleManager接收到overlord分配过来的任务后,创建线程,设置jvm的命令并执行,包括classpath,堆内堆外内存设置,druid端口(用与peon进程对外的查询接口,这里分配的端口导致有端口被占用的bug)
任务json配置等准备工作。
执行jvm命令后,当前线程将一直等待直到任务完成。
Peon进程和导入相关的启动过程是:

  • 创建接收原始数据的http接口,当前集群配置的EventReceiverRirehoseFactory
  • 创建并执行,ExecutorLifeCycle根据json配置创建任务RealTimeIndexTask,创建lock文件对任务加锁,将任务交给ThreadPoolTaskRunner
@LifecycleStart
public void start() throws InterruptedException
{
  final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile");
  final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile");
  final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream");
 
  try {
    task = jsonMapper.readValue(taskFile, Task.class);
 
    log.info(
        "Running with task: %s",
        jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task)
    );
  }
  catch (IOException e) {
    throw Throwables.propagate(e);
  }
 
  // Avoid running the same task twice on the same machine by locking the task base directory.
 
  final File taskLockFile = taskConfig.getTaskLockFile(task.getId());
 
  try {
    synchronized (this) {
      if (taskLockChannel == null && taskLockFileLock == null) {
        taskLockChannel = FileChannel.open(
            taskLockFile.toPath(),
            StandardOpenOption.CREATE,
            StandardOpenOption.WRITE
        );
 
        log.info("Attempting to lock file[%s].", taskLockFile);
        final long startLocking = System.currentTimeMillis();
        final long timeout = DateTimes.utc(startLocking).plus(taskConfig.getDirectoryLockTimeout()).getMillis();
        while (taskLockFileLock == null && System.currentTimeMillis() < timeout) {
          taskLockFileLock = taskLockChannel.tryLock();
          if (taskLockFileLock == null) {
            Thread.sleep(100);
          }
        }
 
        if (taskLockFileLock == null) {
          throw new ISE("Could not acquire lock file[%s] within %,dms.", taskLockFile, timeout - startLocking);
        } else {
          log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking);
        }
      } else {
        throw new ISE("Already started!");
      }
    }
  }
  catch (IOException e) {
    throw Throwables.propagate(e);
  }
 
  if (taskExecutorConfig.isParentStreamDefined()) {
    // Spawn monitor thread to keep a watch on parent's stdin
    // If stdin reaches eof, the parent is gone, and we should shut down
    parentMonitorExec.submit(
        new Runnable()
        {
          @Override
          public void run()
          {
            try {
              while (parentStream.read() != -1) {
                // Toss the byte
              }
            }
            catch (Exception e) {
              log.error(e, "Failed to read from stdin");
            }
 
            // Kind of gross, but best way to kill the JVM as far as I know
            log.info("Triggering JVM shutdown.");
            System.exit(2);
          }
        }
    );
  }
 
  // Won't hurt in remote mode, and is required for setting up locks in local mode:
  try {
    if (!task.isReady(taskActionClientFactory.create(task))) {
      throw new ISE("Task[%s] is not ready to run yet!", task.getId());
    }
  }
  catch (Exception e) {
    throw new ISE(e, "Failed to run task[%s] isReady", task.getId());
  }
 
  statusFuture = Futures.transform(
      taskRunner.run(task),
      new Function<TaskStatus, TaskStatus>()
      {
        @Override
        public TaskStatus apply(TaskStatus taskStatus)
        {
          try {
            log.info(
                "Task completed with status: %s",
                jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
            );
 
            final File statusFileParent = statusFile.getParentFile();
            if (statusFileParent != null) {
              statusFileParent.mkdirs();
            }
            jsonMapper.writeValue(statusFile, taskStatus);
 
            return taskStatus;
          }
          catch (Exception e) {
            throw Throwables.propagate(e);
          }
        }
      }
  );
}

ThreadPoolTaskRunner

runner.png

数据导入

Peon进程启动后,在RealtimeIndexTask.run()方法中,完成任务的执行。

  • RealtimePlumber.startJob()完成任务的初始化配置,从磁盘加载已有的segment临时文件,启动持久化,merge,推送segment等线程
  • EventReceiverFirehose接收数据行,OnheapIncrementalIndex创建索引

启动任务进程

导入数据到OnheapIncrementalIndex

EventReceiverFirehose提供http接口/push-events,接收tranquitiy提交的批量数据
EventReceiverFirehose 对数据处理的逻辑是:
对接收的数据进行解析,解析异常直接报错失败,对解析后的数据行接入到阻塞队列。
队列大小默认10万,如果任务线程队列消费数据不及时,接口会阻塞。

POST
@Path("/push-events")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(
    InputStream in,
    @Context final HttpServletRequest req
)
{
  ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
 
  Optional<Response> producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
  if (producerSequenceResponse.isPresent()) {
    return producerSequenceResponse.get();
  }
 
  CountingInputStream countingInputStream = new CountingInputStream(in);
  Collection<Map<String, Object>> events = null;
  try {
    events = objectMapper.readValue(
        countingInputStream, new TypeReference<Collection<Map<String, Object>>>()
        {
        }
    );
  }
  catch (IOException e) {
    return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
  }
  finally {
    bytesReceived.addAndGet(countingInputStream.getCount());
  }
  log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
 
  final List<InputRow> rows = Lists.newArrayList();
  for (final Map<String, Object> event : events) {
    // Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
    rows.addAll(parser.parseBatch(event));
  }
 
  try {
    addRows(rows);
    return Response.ok(
        objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
        contentType
    ).build();
  }
  catch (InterruptedException e) {
    ...;
  }
}
  
public void addRows(Iterable<InputRow> rows) throws InterruptedException
{
  for (final InputRow row : rows) {
    boolean added = false;
    while (!closed && !added) {
      added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
      if (!added) {
        long currTime = System.currentTimeMillis();
        long lastTime = lastBufferAddFailMsgTime.get();
        if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
          log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
        }
      }
    }
 
    if (!added) {
      throw new IllegalStateException("Cannot add events to closed firehose!");
    }
  }
}

RealtimeIndexTask持有上部的firehose的实例,消费缓存对垒,交给realtimePlumber处理:“
1 根据数据行的时间戳获取sink,每个interval对应一个sink,一般一个peon进程就一个sink;
sink中有多个Firehydrant,有一个负责响应查询到增量导入数据,其余只是负责查询
2.添加数据行到sink的房前firehydrant,由Firehydrant的onheapIndecrementalindex完成增量索引创建
3.判读当前的firehydrant中已有的数据行数,如果达到配置maxRowsinmemory,或者处理时间超过配置的intermediatePersistperiod,把当前firehydrant数据持久化到磁盘

public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
  long messageTimestamp = row.getTimestampFromEpoch();
  final Sink sink = getSink(messageTimestamp);
  metrics.reportMessageMaxTimestamp(messageTimestamp);
  if (sink == null) {
    return -1;
  }
 
  final IncrementalIndexAddResult addResult = sink.add(row, false);
  if (config.isReportParseExceptions() && addResult.getParseException() != null) {
    throw addResult.getParseException();
  }
 
  if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
    persist(committerSupplier.get());
  }
 
  return addResult.getRowCount();
}

一个sink包含了最终生成的segment数据,一个segment数据比较大,不合适都放在内存中,在创建segment中,会达到一定数据后持久化到磁盘,会有小小的segment的问题件,有一个个的Firehydrant表示,已经持久化到磁盘的数据对应的Firehydrant对象只负责响应查询,Sink中还有当前Firehydrant,既响应查询也不断将数据加入到Firehydrant对象中。
如果当前数据行时间字所在的interval没有对应的sink,就会创建新的sink对象。创建Sink的过程中,就会创建当前的FireHydrant对象

private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
  final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
      .withMinTimestamp(minTimestamp)
      .withTimestampSpec(schema.getParser())
      .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
      .withDimensionsSpec(schema.getParser())
      .withMetrics(schema.getAggregators())
      .withRollup(schema.getGranularitySpec().isRollup())
      .build();
  //创建OnheapIncrementalIndex对象
  final IncrementalIndex newIndex = new IncrementalIndex.Builder()
      .setIndexSchema(indexSchema)
      .setReportParseExceptions(reportParseExceptions)
      .setMaxRowCount(maxRowsInMemory)
      .buildOnheap();
 
  final FireHydrant old;
  synchronized (hydrantLock) {
    if (writable) {
      old = currHydrant;
      int newCount = 0;
      int numHydrants = hydrants.size();
      if (numHydrants > 0) {
        FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
        newCount = lastHydrant.getCount() + 1;
        if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) {
          Map<String, ColumnCapabilitiesImpl> oldCapabilities;
          if (lastHydrant.hasSwapped()) {
            oldCapabilities = Maps.newHashMap();
            ReferenceCountingSegment segment = lastHydrant.getIncrementedSegment();
            try {
              QueryableIndex oldIndex = segment.asQueryableIndex();
              for (String dim : oldIndex.getAvailableDimensions()) {
                dimOrder.add(dim);
                oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities());
              }
            }
            finally {
              segment.decrement();
            }
          } else {
            IncrementalIndex oldIndex = lastHydrant.getIndex();
            dimOrder.addAll(oldIndex.getDimensionOrder());
            oldCapabilities = oldIndex.getColumnCapabilities();
          }
          newIndex.loadDimensionIterable(dimOrder, oldCapabilities);
        }
      }
      currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier());
      if (old != null) {
        numRowsExcludingCurrIndex.addAndGet(old.getIndex().size());
      }
      //新创建的FireHydrant加入到Sink
      hydrants.add(currHydrant);
    } else {
      // Oops, someone called finishWriting while we were making this new index.
      newIndex.close();
      throw new ISE("finishWriting() called during swap");
    }
  }
 
  return old;
}

数据行的添加最终是在OnheapIncrementalIndex对象的addToFacts方法完成:
OnheapIncrementalIndex实现可以理解成有一个Map对象以维度列和时间列TimeAndDims作为key,指标列作为value,当新的数据行加入时,通过key(TimeAndDims)确认对应的Aggregator聚合器对象,
维度值出现null值,作为一个单独的值加到维度字典编码中
Aggregator聚合器对象完成对数据行的累加操作
聚合时,使用ColumnSelectorFactory获取每行的指标值,和查询时通过游标获取列值不同,这里通过threadlocal方式获取,每次聚合前IncrementalIndex将数据InputRow放入到threadlocal
猜测这样实现而不是聚合时直接传入指标值的方式,主要是因为指标值聚合前需要类型转换和值的各种转换,这块逻辑主要在ColumnSelector完成,
这样,IncrementalIndex只负责将数据添加到threadlocal,Aggregator只需要从ColumnSelector获取要聚合的数据RollupFactsHolder(跳表)记录了维度值->行号,aggregators记录了行号->指标值数组的映射

protected AddToFactsResult addToFacts(
    AggregatorFactory[] metrics,
    boolean deserializeComplexMetrics,
    boolean reportParseExceptions,
    InputRow row,
    AtomicInteger numEntries,
    TimeAndDims key,
    ThreadLocal<InputRow> rowContainer,
    Supplier<InputRow> rowSupplier,
    boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException
{
  List<String> parseExceptionMessages;
  final int priorIndex = facts.getPriorIndex(key);
 
  Aggregator[] aggs;
 
  if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) {
    aggs = concurrentGet(priorIndex);
    parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
  } else {
    aggs = new Aggregator[metrics.length];
    factorizeAggs(metrics, aggs, rowContainer, row);
    parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
 
    final int rowIndex = indexIncrement.getAndIncrement();
    concurrentSet(rowIndex, aggs);
 
    // Last ditch sanity checks
    if (numEntries.get() >= maxRowCount
        && facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX
        && !skipMaxRowsInMemoryCheck) {
      throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
    }
    final int prev = facts.putIfAbsent(key, rowIndex);
    if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
      numEntries.incrementAndGet();
    } else {
      // We lost a race
      aggs = concurrentGet(prev);
      parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
      // Free up the misfire
      concurrentRemove(rowIndex);
      // This is expected to occur ~80% of the time in the worst scenarios
    }
  }
 
  return new AddToFactsResult(numEntries.get(), parseExceptionMessages);
}

前FireHydrant中已有的数据行数如果达到配置的maxRowsInMemory,或者处理时间超过配置的intermediatePersistPeriod,将把当前FireHydrant数据持久化到磁盘

任务移交

任务移交.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容

  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,286评论 0 9
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,841评论 0 5
  • 又来到了一个老生常谈的问题,应用层软件开发的程序员要不要了解和深入学习操作系统呢? 今天就这个问题开始,来谈谈操...
    tangsl阅读 4,114评论 0 23
  • Druid.io(以下简称Druid)是面向海量数据的、用于实时查询与分析的OLAP存储系统。Druid的四大关键...
    大诗兄_zl阅读 6,459评论 0 9
  • 文/tangsl(简书作者) 原文链接:http://www.jianshu.com/p/2b993a4b913e...
    西葫芦炒胖子阅读 3,755评论 0 5