创建Segment过程概述
Peon进程由middle manager进程启动
提供http接口接收原始数据
将一行行的数据做merge,当行数达到maxRowsInMemory(当前集群配置是50000)或者intermediatePersistPeriod(10分钟),将内存中的数据序列化成Segment文件,持久化到本地磁盘
达到时间窗口后,停止接收数据,并将上步创建的segment合并成一个大的segment
新创建的segment通过hdfs移交到历史节点
启动Peon进程
middleManager接收到overlord分配过来的任务后,创建线程,设置jvm的命令并执行,包括classpath,堆内堆外内存设置,druid端口(用与peon进程对外的查询接口,这里分配的端口导致有端口被占用的bug)
任务json配置等准备工作。
执行jvm命令后,当前线程将一直等待直到任务完成。
Peon进程和导入相关的启动过程是:
- 创建接收原始数据的http接口,当前集群配置的EventReceiverRirehoseFactory
- 创建并执行,ExecutorLifeCycle根据json配置创建任务RealTimeIndexTask,创建lock文件对任务加锁,将任务交给ThreadPoolTaskRunner
@LifecycleStart
public void start() throws InterruptedException
{
final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile");
final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile");
final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream");
try {
task = jsonMapper.readValue(taskFile, Task.class);
log.info(
"Running with task: %s",
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
// Avoid running the same task twice on the same machine by locking the task base directory.
final File taskLockFile = taskConfig.getTaskLockFile(task.getId());
try {
synchronized (this) {
if (taskLockChannel == null && taskLockFileLock == null) {
taskLockChannel = FileChannel.open(
taskLockFile.toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE
);
log.info("Attempting to lock file[%s].", taskLockFile);
final long startLocking = System.currentTimeMillis();
final long timeout = DateTimes.utc(startLocking).plus(taskConfig.getDirectoryLockTimeout()).getMillis();
while (taskLockFileLock == null && System.currentTimeMillis() < timeout) {
taskLockFileLock = taskLockChannel.tryLock();
if (taskLockFileLock == null) {
Thread.sleep(100);
}
}
if (taskLockFileLock == null) {
throw new ISE("Could not acquire lock file[%s] within %,dms.", taskLockFile, timeout - startLocking);
} else {
log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking);
}
} else {
throw new ISE("Already started!");
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
if (taskExecutorConfig.isParentStreamDefined()) {
// Spawn monitor thread to keep a watch on parent's stdin
// If stdin reaches eof, the parent is gone, and we should shut down
parentMonitorExec.submit(
new Runnable()
{
@Override
public void run()
{
try {
while (parentStream.read() != -1) {
// Toss the byte
}
}
catch (Exception e) {
log.error(e, "Failed to read from stdin");
}
// Kind of gross, but best way to kill the JVM as far as I know
log.info("Triggering JVM shutdown.");
System.exit(2);
}
}
);
}
// Won't hurt in remote mode, and is required for setting up locks in local mode:
try {
if (!task.isReady(taskActionClientFactory.create(task))) {
throw new ISE("Task[%s] is not ready to run yet!", task.getId());
}
}
catch (Exception e) {
throw new ISE(e, "Failed to run task[%s] isReady", task.getId());
}
statusFuture = Futures.transform(
taskRunner.run(task),
new Function<TaskStatus, TaskStatus>()
{
@Override
public TaskStatus apply(TaskStatus taskStatus)
{
try {
log.info(
"Task completed with status: %s",
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
);
final File statusFileParent = statusFile.getParentFile();
if (statusFileParent != null) {
statusFileParent.mkdirs();
}
jsonMapper.writeValue(statusFile, taskStatus);
return taskStatus;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
ThreadPoolTaskRunner
数据导入
Peon进程启动后,在RealtimeIndexTask.run()方法中,完成任务的执行。
- RealtimePlumber.startJob()完成任务的初始化配置,从磁盘加载已有的segment临时文件,启动持久化,merge,推送segment等线程
- EventReceiverFirehose接收数据行,OnheapIncrementalIndex创建索引
启动任务进程
导入数据到OnheapIncrementalIndex
EventReceiverFirehose提供http接口/push-events,接收tranquitiy提交的批量数据
EventReceiverFirehose 对数据处理的逻辑是:
对接收的数据进行解析,解析异常直接报错失败,对解析后的数据行接入到阻塞队列。
队列大小默认10万,如果任务线程队列消费数据不及时,接口会阻塞。
POST
@Path("/push-events")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(
InputStream in,
@Context final HttpServletRequest req
)
{
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
Optional<Response> producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
if (producerSequenceResponse.isPresent()) {
return producerSequenceResponse.get();
}
CountingInputStream countingInputStream = new CountingInputStream(in);
Collection<Map<String, Object>> events = null;
try {
events = objectMapper.readValue(
countingInputStream, new TypeReference<Collection<Map<String, Object>>>()
{
}
);
}
catch (IOException e) {
return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
}
finally {
bytesReceived.addAndGet(countingInputStream.getCount());
}
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) {
// Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
rows.addAll(parser.parseBatch(event));
}
try {
addRows(rows);
return Response.ok(
objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
contentType
).build();
}
catch (InterruptedException e) {
...;
}
}
public void addRows(Iterable<InputRow> rows) throws InterruptedException
{
for (final InputRow row : rows) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
if (!added) {
long currTime = System.currentTimeMillis();
long lastTime = lastBufferAddFailMsgTime.get();
if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
}
}
}
if (!added) {
throw new IllegalStateException("Cannot add events to closed firehose!");
}
}
}
RealtimeIndexTask持有上部的firehose的实例,消费缓存对垒,交给realtimePlumber处理:“
1 根据数据行的时间戳获取sink,每个interval对应一个sink,一般一个peon进程就一个sink;
sink中有多个Firehydrant,有一个负责响应查询到增量导入数据,其余只是负责查询
2.添加数据行到sink的房前firehydrant,由Firehydrant的onheapIndecrementalindex完成增量索引创建
3.判读当前的firehydrant中已有的数据行数,如果达到配置maxRowsinmemory,或者处理时间超过配置的intermediatePersistperiod,把当前firehydrant数据持久化到磁盘
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
long messageTimestamp = row.getTimestampFromEpoch();
final Sink sink = getSink(messageTimestamp);
metrics.reportMessageMaxTimestamp(messageTimestamp);
if (sink == null) {
return -1;
}
final IncrementalIndexAddResult addResult = sink.add(row, false);
if (config.isReportParseExceptions() && addResult.getParseException() != null) {
throw addResult.getParseException();
}
if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
persist(committerSupplier.get());
}
return addResult.getRowCount();
}
一个sink包含了最终生成的segment数据,一个segment数据比较大,不合适都放在内存中,在创建segment中,会达到一定数据后持久化到磁盘,会有小小的segment的问题件,有一个个的Firehydrant表示,已经持久化到磁盘的数据对应的Firehydrant对象只负责响应查询,Sink中还有当前Firehydrant,既响应查询也不断将数据加入到Firehydrant对象中。
如果当前数据行时间字所在的interval没有对应的sink,就会创建新的sink对象。创建Sink的过程中,就会创建当前的FireHydrant对象
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withTimestampSpec(schema.getParser())
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withDimensionsSpec(schema.getParser())
.withMetrics(schema.getAggregators())
.withRollup(schema.getGranularitySpec().isRollup())
.build();
//创建OnheapIncrementalIndex对象
final IncrementalIndex newIndex = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setReportParseExceptions(reportParseExceptions)
.setMaxRowCount(maxRowsInMemory)
.buildOnheap();
final FireHydrant old;
synchronized (hydrantLock) {
if (writable) {
old = currHydrant;
int newCount = 0;
int numHydrants = hydrants.size();
if (numHydrants > 0) {
FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
newCount = lastHydrant.getCount() + 1;
if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) {
Map<String, ColumnCapabilitiesImpl> oldCapabilities;
if (lastHydrant.hasSwapped()) {
oldCapabilities = Maps.newHashMap();
ReferenceCountingSegment segment = lastHydrant.getIncrementedSegment();
try {
QueryableIndex oldIndex = segment.asQueryableIndex();
for (String dim : oldIndex.getAvailableDimensions()) {
dimOrder.add(dim);
oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities());
}
}
finally {
segment.decrement();
}
} else {
IncrementalIndex oldIndex = lastHydrant.getIndex();
dimOrder.addAll(oldIndex.getDimensionOrder());
oldCapabilities = oldIndex.getColumnCapabilities();
}
newIndex.loadDimensionIterable(dimOrder, oldCapabilities);
}
}
currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier());
if (old != null) {
numRowsExcludingCurrIndex.addAndGet(old.getIndex().size());
}
//新创建的FireHydrant加入到Sink
hydrants.add(currHydrant);
} else {
// Oops, someone called finishWriting while we were making this new index.
newIndex.close();
throw new ISE("finishWriting() called during swap");
}
}
return old;
}
数据行的添加最终是在OnheapIncrementalIndex对象的addToFacts方法完成:
OnheapIncrementalIndex实现可以理解成有一个Map对象以维度列和时间列TimeAndDims作为key,指标列作为value,当新的数据行加入时,通过key(TimeAndDims)确认对应的Aggregator聚合器对象,
维度值出现null值,作为一个单独的值加到维度字典编码中
Aggregator聚合器对象完成对数据行的累加操作
聚合时,使用ColumnSelectorFactory获取每行的指标值,和查询时通过游标获取列值不同,这里通过threadlocal方式获取,每次聚合前IncrementalIndex将数据InputRow放入到threadlocal
猜测这样实现而不是聚合时直接传入指标值的方式,主要是因为指标值聚合前需要类型转换和值的各种转换,这块逻辑主要在ColumnSelector完成,
这样,IncrementalIndex只负责将数据添加到threadlocal,Aggregator只需要从ColumnSelector获取要聚合的数据RollupFactsHolder(跳表)记录了维度值->行号,aggregators记录了行号->指标值数组的映射
protected AddToFactsResult addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException
{
List<String> parseExceptionMessages;
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) {
aggs = concurrentGet(priorIndex);
parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
} else {
aggs = new Aggregator[metrics.length];
factorizeAggs(metrics, aggs, rowContainer, row);
parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
final int rowIndex = indexIncrement.getAndIncrement();
concurrentSet(rowIndex, aggs);
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount
&& facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX
&& !skipMaxRowsInMemoryCheck) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
final int prev = facts.putIfAbsent(key, rowIndex);
if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet();
} else {
// We lost a race
aggs = concurrentGet(prev);
parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
// Free up the misfire
concurrentRemove(rowIndex);
// This is expected to occur ~80% of the time in the worst scenarios
}
}
return new AddToFactsResult(numEntries.get(), parseExceptionMessages);
}
前FireHydrant中已有的数据行数如果达到配置的maxRowsInMemory,或者处理时间超过配置的intermediatePersistPeriod,将把当前FireHydrant数据持久化到磁盘