druid源码

2018-12-13

druid代码里有一个任务的概念Task
接口定义

/**
 * Represents a task that can run on a worker. The general contracts surrounding Tasks are:
 * <ul>
 * <li>Tasks must operate on a single datasource.</li>
 * <li>Tasks should be immutable, since the task ID is used as a proxy for the task in many locations.</li>
 * <li>Task IDs must be unique. This can be done by naming them using UUIDs or the current timestamp.</li>
 * <li>Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
 * useful for producing sharded segments.</li>
 * <li>Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
 * to release locks early if they desire.</li>
 * </ul>
 */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
    @JsonSubTypes.Type(name = "append", value = AppendTask.class),
    @JsonSubTypes.Type(name = "merge", value = MergeTask.class),
    @JsonSubTypes.Type(name = "kill", value = KillTask.class),
    @JsonSubTypes.Type(name = "move", value = MoveTask.class),
    @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
    @JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
    @JsonSubTypes.Type(name = "index", value = IndexTask.class),
    @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
    @JsonSubTypes.Type(name = "hadoop_convert_segment", value = HadoopConverterTask.class),
    @JsonSubTypes.Type(name = "hadoop_convert_segment_sub", value = HadoopConverterTask.ConverterSubTask.class),
    @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
    @JsonSubTypes.Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
    @JsonSubTypes.Type(name = "noop", value = NoopTask.class),
    @JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentBackwardsCompatibleTask.class), // Backwards compat - Deprecated
    @JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated
    @JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class),
    @JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class),
    @JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class),
    @JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
})
public interface Task
{
  /**
   * Returns ID of this task. Must be unique across all tasks ever created.
   *
   * @return task ID
   */
  String getId();

  /**
   * Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
   * a common convention is to set group ID equal to task ID.
   *
   * @return task group ID
   */
  String getGroupId();

  /**
   * Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
   * be used for task scheduling, cluster resource management, etc.
   *
   * @return task priority
   *
   * @see Tasks for default task priorities
   */
  default int getPriority()
  {
    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
  }

  /**
   * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
   * require.
   *
   * @return {@link TaskResource} for this task
   */
  TaskResource getTaskResource();

  /**
   * Returns a descriptive label for this task type. Used for metrics emission and logging.
   *
   * @return task type label
   */
  String getType();

  /**
   * Get the nodeType for if/when this task publishes on zookeeper.
   *
   * @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to
   * publish to zookeeper.
   */
  String getNodeType();

  /**
   * Returns the datasource this task operates on. Each task can operate on only one datasource.
   */
  String getDataSource();

  /**
   * Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
   * should return null.
   *
   * @param <T> query result type
   *
   * @return query runners for this task
   */
  <T> QueryRunner<T> getQueryRunner(Query<T> query);

  /**
   * Returns an extra classpath that should be prepended to the default classpath when running this task. If no
   * extra classpath should be prepended, this should return null or the empty string.
   */
  String getClasspathPrefix();

  /**
   * Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
   * actions must be idempotent, since this method may be executed multiple times. This typically runs on the
   * coordinator. If this method throws an exception, the task should be considered a failure.
   * <p/>
   * This method must be idempotent, as it may be run multiple times per task.
   *
   * @param taskActionClient action client for this task (not the full toolbox)
   *
   * @return true if ready, false if not ready yet
   *
   * @throws Exception if the task should be considered a failure
   */
  boolean isReady(TaskActionClient taskActionClient) throws Exception;

  /**
   * Returns whether or not this task can restore its progress from its on-disk working directory. Restorable tasks
   * may be started with a non-empty working directory. Tasks that exit uncleanly may still have a chance to attempt
   * restores, meaning that restorable tasks should be able to deal with potentially partially written on-disk state.
   */
  boolean canRestore();

  /**
   * Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
   * {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
   * extreme prejudice.
   */
  void stopGracefully();

  /**
   * Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
   * holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the task
   * should be considered a failure.
   *
   * @param toolbox Toolbox for this task
   *
   * @return Some kind of finished status (isRunnable must be false).
   *
   * @throws Exception if this task failed
   */
  TaskStatus run(TaskToolbox toolbox) throws Exception;

  Map<String, Object> getContext();

  @Nullable
  default <ContextValueType> ContextValueType getContextValue(String key)
  {
    return getContext() == null ? null : (ContextValueType) getContext().get(key);
  }

  default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
  {
    final ContextValueType value = getContextValue(key);
    return value == null ? defaultValue : value;
  }
}

可见task都是可实例化为json的,这就是druid存储在meta数据库里的task信息

其中每个task运行都会返回TaskStatus这个类的实例,很实用

public class TaskStatus
{
  public static final int MAX_ERROR_MSG_LENGTH = 100;

  public static TaskStatus running(String taskId)
  {
    return new TaskStatus(taskId, TaskState.RUNNING, -1, null);
  }

  public static TaskStatus success(String taskId)
  {
    return new TaskStatus(taskId, TaskState.SUCCESS, -1, null);
  }

  public static TaskStatus success(String taskId, String errorMsg)
  {
    return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg);
  }

  public static TaskStatus failure(String taskId)
  {
    return new TaskStatus(taskId, TaskState.FAILED, -1, null);
  }

  public static TaskStatus failure(String taskId, String errorMsg)
  {
    return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg);
  }

  public static TaskStatus fromCode(String taskId, TaskState code)
  {
    return new TaskStatus(taskId, code, -1, null);
  }

  // The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
  // The full error message will be available via a TaskReport.
  private static String truncateErrorMsg(String errorMsg)
  {
    if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_LENGTH) {
      return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "...";
    } else {
      return errorMsg;
    }
  }

  private final String id;
  private final TaskState status;
  private final long duration;
  private final String errorMsg;

  @JsonCreator
  protected TaskStatus(
      @JsonProperty("id") String id,
      @JsonProperty("status") TaskState status,
      @JsonProperty("duration") long duration,
      @JsonProperty("errorMsg") String errorMsg
  )
  {
    this.id = id;
    this.status = status;
    this.duration = duration;
    this.errorMsg = truncateErrorMsg(errorMsg);

    // Check class invariants.
    Preconditions.checkNotNull(id, "id");
    Preconditions.checkNotNull(status, "status");
  }

  @JsonProperty("id")
  public String getId()
  {
    return id;
  }

  @JsonProperty("status")
  public TaskState getStatusCode()
  {
    return status;
  }

  @JsonProperty("duration")
  public long getDuration()
  {
    return duration;
  }

  @JsonProperty("errorMsg")
  public String getErrorMsg()
  {
    return errorMsg;
  }

  /**
   * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
   * isSuccess, or isFailure will be true at any one time.
   *
   * @return whether the task is runnable.
   */
  @JsonIgnore
  public boolean isRunnable()
  {
    return status == TaskState.RUNNING;
  }

  /**
   * Inverse of {@link #isRunnable}.
   *
   * @return whether the task is complete.
   */
  @JsonIgnore
  public boolean isComplete()
  {
    return !isRunnable();
  }

  /**
   * Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isSuccess, or isFailure will
   * be true at any one time.
   *
   * @return whether the task succeeded.
   */
  @JsonIgnore
  public boolean isSuccess()
  {
    return status == TaskState.SUCCESS;
  }

  /**
   * Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isSuccess, or
   * isFailure will be true at any one time.
   *
   * @return whether the task failed
   */
  @JsonIgnore
  public boolean isFailure()
  {
    return status == TaskState.FAILED;
  }

  public TaskStatus withDuration(long _duration)
  {
    return new TaskStatus(id, status, _duration, errorMsg);
  }

  @Override
  public String toString()
  {
    return Objects.toStringHelper(this)
        .add("id", id)
        .add("status", status)
        .add("duration", duration)
        .add("errorMsg", errorMsg)
        .toString();
  }

  @Override
  public boolean equals(Object o)
  {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    TaskStatus that = (TaskStatus) o;
    return getDuration() == that.getDuration() &&
        java.util.Objects.equals(getId(), that.getId()) &&
        status == that.status &&
        java.util.Objects.equals(getErrorMsg(), that.getErrorMsg());
  }

  @Override
  public int hashCode()
  {
    return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg());
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343