2018-12-13
druid代码里有一个任务的概念Task
接口定义
/**
* Represents a task that can run on a worker. The general contracts surrounding Tasks are:
* <ul>
* <li>Tasks must operate on a single datasource.</li>
* <li>Tasks should be immutable, since the task ID is used as a proxy for the task in many locations.</li>
* <li>Task IDs must be unique. This can be done by naming them using UUIDs or the current timestamp.</li>
* <li>Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
* useful for producing sharded segments.</li>
* <li>Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
* to release locks early if they desire.</li>
* </ul>
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "hadoop_convert_segment", value = HadoopConverterTask.class),
@JsonSubTypes.Type(name = "hadoop_convert_segment_sub", value = HadoopConverterTask.ConverterSubTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentBackwardsCompatibleTask.class), // Backwards compat - Deprecated
@JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated
@JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class),
@JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class),
@JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class),
@JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
})
public interface Task
{
/**
* Returns ID of this task. Must be unique across all tasks ever created.
*
* @return task ID
*/
String getId();
/**
* Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
* a common convention is to set group ID equal to task ID.
*
* @return task group ID
*/
String getGroupId();
/**
* Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
* be used for task scheduling, cluster resource management, etc.
*
* @return task priority
*
* @see Tasks for default task priorities
*/
default int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
}
/**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.
*
* @return {@link TaskResource} for this task
*/
TaskResource getTaskResource();
/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.
*
* @return task type label
*/
String getType();
/**
* Get the nodeType for if/when this task publishes on zookeeper.
*
* @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to
* publish to zookeeper.
*/
String getNodeType();
/**
* Returns the datasource this task operates on. Each task can operate on only one datasource.
*/
String getDataSource();
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
*
* @param <T> query result type
*
* @return query runners for this task
*/
<T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
* extra classpath should be prepended, this should return null or the empty string.
*/
String getClasspathPrefix();
/**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
* <p/>
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
*
* @return true if ready, false if not ready yet
*
* @throws Exception if the task should be considered a failure
*/
boolean isReady(TaskActionClient taskActionClient) throws Exception;
/**
* Returns whether or not this task can restore its progress from its on-disk working directory. Restorable tasks
* may be started with a non-empty working directory. Tasks that exit uncleanly may still have a chance to attempt
* restores, meaning that restorable tasks should be able to deal with potentially partially written on-disk state.
*/
boolean canRestore();
/**
* Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
* {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
* extreme prejudice.
*/
void stopGracefully();
/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the task
* should be considered a failure.
*
* @param toolbox Toolbox for this task
*
* @return Some kind of finished status (isRunnable must be false).
*
* @throws Exception if this task failed
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;
Map<String, Object> getContext();
@Nullable
default <ContextValueType> ContextValueType getContextValue(String key)
{
return getContext() == null ? null : (ContextValueType) getContext().get(key);
}
default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
{
final ContextValueType value = getContextValue(key);
return value == null ? defaultValue : value;
}
}
可见task都是可实例化为json的,这就是druid存储在meta数据库里的task信息
其中每个task运行都会返回TaskStatus这个类的实例,很实用
public class TaskStatus
{
public static final int MAX_ERROR_MSG_LENGTH = 100;
public static TaskStatus running(String taskId)
{
return new TaskStatus(taskId, TaskState.RUNNING, -1, null);
}
public static TaskStatus success(String taskId)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null);
}
public static TaskStatus success(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg);
}
public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, TaskState.FAILED, -1, null);
}
public static TaskStatus failure(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg);
}
public static TaskStatus fromCode(String taskId, TaskState code)
{
return new TaskStatus(taskId, code, -1, null);
}
// The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
// The full error message will be available via a TaskReport.
private static String truncateErrorMsg(String errorMsg)
{
if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_LENGTH) {
return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "...";
} else {
return errorMsg;
}
}
private final String id;
private final TaskState status;
private final long duration;
private final String errorMsg;
@JsonCreator
protected TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") TaskState status,
@JsonProperty("duration") long duration,
@JsonProperty("errorMsg") String errorMsg
)
{
this.id = id;
this.status = status;
this.duration = duration;
this.errorMsg = truncateErrorMsg(errorMsg);
// Check class invariants.
Preconditions.checkNotNull(id, "id");
Preconditions.checkNotNull(status, "status");
}
@JsonProperty("id")
public String getId()
{
return id;
}
@JsonProperty("status")
public TaskState getStatusCode()
{
return status;
}
@JsonProperty("duration")
public long getDuration()
{
return duration;
}
@JsonProperty("errorMsg")
public String getErrorMsg()
{
return errorMsg;
}
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
*
* @return whether the task is runnable.
*/
@JsonIgnore
public boolean isRunnable()
{
return status == TaskState.RUNNING;
}
/**
* Inverse of {@link #isRunnable}.
*
* @return whether the task is complete.
*/
@JsonIgnore
public boolean isComplete()
{
return !isRunnable();
}
/**
* Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isSuccess, or isFailure will
* be true at any one time.
*
* @return whether the task succeeded.
*/
@JsonIgnore
public boolean isSuccess()
{
return status == TaskState.SUCCESS;
}
/**
* Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isSuccess, or
* isFailure will be true at any one time.
*
* @return whether the task failed
*/
@JsonIgnore
public boolean isFailure()
{
return status == TaskState.FAILED;
}
public TaskStatus withDuration(long _duration)
{
return new TaskStatus(id, status, _duration, errorMsg);
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
.add("errorMsg", errorMsg)
.toString();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskStatus that = (TaskStatus) o;
return getDuration() == that.getDuration() &&
java.util.Objects.equals(getId(), that.getId()) &&
status == that.status &&
java.util.Objects.equals(getErrorMsg(), that.getErrorMsg());
}
@Override
public int hashCode()
{
return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg());
}
}