Netflix Conductor源码分析--Client层源码分析

一、Client层总体介绍
在正式介绍Client层源码前,我们先来看一下如何在client端与server端通信,demo代码如下:

TaskClient taskClient = new TaskClient();
taskClient.setRootURI("http://localhost:8080/api/");        //Point this to the server API
int threadCount = 2;            //number of threads used to execute workers.  To avoid starvation, should be same or more than number of workers
Worker worker1 = new OrderWorker("order");
Worker worker2 = new PaymentWorker("payment");
//Create WorkflowTaskCoordinator
WorkflowTaskCoordinator.Builder builder = new WorkflowTaskCoordinator.Builder();
WorkflowTaskCoordinator coordinator = builder.withWorkers(worker1, worker2).withThreadCount(threadCount).withTaskClient(taskClient).build();
//Start for polling and execution of the tasks
coordinator.init();

代码说明:

1、第一步需要创建TaskClient类并设置server端的API URL路径以便客户端能够与服务端通信。

2、创建任务工作者Worker对象,具体的任务是由Worker来执行。

3、将Worker对象传入WorkerflowTaskCoordinator对象中,WorkerflowTaskCoordinator负责启动线程池来执行Worker任务,同时维护与server端的心跳以及最新任务数据的拉取操作。

通过阅读上述代码引出了几个类名称的解释:

  • WorkerflowTaskCoordinator:工作流的协调者,负责管理Task Worker的线程池以及和服务端的通信。
  • TaskClient:conductor的任务管理客户端类,负责从server端轮询任务以及更新任务状态等。
  • Builder:用于创建WorkerflowTaskCoordinator实例的建造类。

这三个类的类图如图1-1所示,从图中可以看到类的依赖、组合等关系。

图1-1

图1-1展示是Client层最核心的三个类的依赖关系,我们接下来的源码解析就是围绕这三个类来展开。

整个Client模块的包结构和关键类如图1-2所示:


图1-2

其中:

  • config包是关于Client的一些配置类
  • exceptions包是自定义的client异常类
  • http包是与服务端通信的基础类,包括基础基类ClientBase,还有元数据、负载、客户端任务,工作流等通信类
  • task包主要包括工作流协调者和工作流任务统计类
  • worker包主要包括Worker工作者接口类

二、Client层源码执行的全流程解析

我们拿文章 深入浅出Netflix Conductor使用 中介绍的案例来讲解源码流程(文章中包括了任务、工作流的DSL定义以及如何使用),流程图形表示如图1-3所示:

图1-3

这张图的含义非常简单,用户走下单流程到order模块,如果下单成功则走payment支付模块进行支付,如果下单失败则走失败模块进行重试等操作。

在Swagger界面上输入如下参数启动工作流,如图1-4


图1-4

启动的过程实际上是通过Swagger API接口调用server端的相关类,而client端则是通过拉取的方式来得到需要自己执行任务的通知和输入参数。

启动完工作流之后Client端的代码进入WorkerflowTaskCoordinator中的init方法,代码如下所示:

public synchronized void init() {
    if(threadCount == -1) {
        threadCount = workers.size();
    }
    logger.info("Initialized the worker with {} threads", threadCount);
    this.workerQueue = new LinkedBlockingQueue<Runnable>(workerQueueSize);
    AtomicInteger count = new AtomicInteger(0);
    this.executorService = new ThreadPoolExecutor(threadCount, threadCount,
            0L, TimeUnit.MILLISECONDS,
            workerQueue,
            (runnable) -> {
                Thread thread = new Thread(runnable);
                thread.setName(workerNamePrefix + count.getAndIncrement());
                return thread;
            });
    this.scheduledExecutorService = Executors.newScheduledThreadPool(workers.size());
  
   //定时轮询server状态策略,默认每隔1秒进行轮询,根据任务名获取当前任务信息
    workers.forEach(worker -> {
        scheduledExecutorService.scheduleWithFixedDelay(()->pollForTask(worker), worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS);
    });
}

代码说明:

这段代码通过JDK中的scheduledExecutorService.scheduleWithFixedDelay方法每隔一秒对server端进行轮询,轮询任务的方法是pollForTask,代码如下:

private void pollForTask(Worker worker) {
    if(eurekaClient != null && !eurekaClient.getInstanceRemoteStatus().equals(InstanceStatus.UP)) {
        logger.debug("Instance is NOT UP in discovery - will not poll");
        return;
    }
    if(worker.paused()) {
        WorkflowTaskMetrics.incrementTaskPausedCount(worker.getTaskDefName());
        logger.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
        return;
    }
    String domain = Optional.ofNullable(PropertyFactory.getString(worker.getTaskDefName(), DOMAIN, null))
            .orElse(PropertyFactory.getString(ALL_WORKERS, DOMAIN, null));
    logger.debug("Polling {}, domain={}, count = {} timeout = {} ms", worker.getTaskDefName(), domain, worker.getPollCount(), worker.getLongPollTimeoutInMS());
    List<Task> tasks = Collections.emptyList();
    try{
        // get the remaining capacity of worker queue to prevent queue full exception
        int realPollCount = Math.min(workerQueue.remainingCapacity(), worker.getPollCount());
        if (realPollCount <= 0) {
            logger.warn("All workers are busy, not polling. queue size = {}, max = {}", workerQueue.size(), workerQueueSize);
            return;
        }
        //获取当前客户端的任务名称
        String taskType = worker.getTaskDefName();
        //根据当前客户端的任务名称从server端的状态机获取是否有自己要执行的任务,如果有任务则获取执行,只能获取一次。
        tasks = getPollTimer(taskType)
                .record(() -> taskClient.batchPollTasksInDomain(taskType, domain, worker.getIdentity(), realPollCount, worker.getLongPollTimeoutInMS()));
        incrementTaskPollCount(taskType, tasks.size());
        logger.debug("Polled {}, domain {}, received {} tasks in worker - {}", worker.getTaskDefName(), domain, tasks.size(), worker.getIdentity());
    } catch (Exception e) {
        WorkflowTaskMetrics.incrementTaskPollErrorCount(worker.getTaskDefName(), e);
        logger.error("Error when polling for tasks", e);
    }
    //根据获取的任务列表,以线程的方式启动执行任务
    for (Task task : tasks) {
        try {
            executorService.submit(() -> {
                try {
                    logger.debug("Executing task {}, taskId - {} in worker - {}", task.getTaskDefName(), task.getTaskId(), worker.getIdentity());
                    //这步就是执行用户自定义的任务逻辑
                    execute(worker, task);
                } catch (Throwable t) {
                    //执行失败,置任务状态为失败,并将失败结果返回到server端
                    task.setStatus(Task.Status.FAILED);
                    TaskResult result = new TaskResult(task);
                    handleException(t, result, worker, task);
                }
            });
        } catch (RejectedExecutionException e) {
            WorkflowTaskMetrics.incrementTaskExecutionQueueFullCount(worker.getTaskDefName());
            logger.error("Execution queue is full, returning task: {}", task.getTaskId(), e);
            returnTask(worker, task);
        }
    }
}

代码说明:

每隔一秒从服务端的(tasks/poll/batch/{taskType})获取当前需要执行的任务列表,任务只能获取一次不能重新获取。然后将任务通过异步线程的方式启动执行,每一个任务都是由用户自定义的逻辑实现,任务的返回值被封装到了TaskResult类中,execute方法的内容如下所示:

private void execute(Worker worker, Task task) {
    String taskType = task.getTaskDefName();
    try {
        if(!worker.preAck(task)) {
            logger.debug("Worker decided not to ack the task {}, taskId = {}", taskType, task.getTaskId());
            return;
        }
        if (!taskClient.ack(task.getTaskId(), worker.getIdentity())) {
            WorkflowTaskMetrics.incrementTaskAckFailedCount(worker.getTaskDefName());
            logger.error("Ack failed for {}, taskId = {}", taskType, task.getTaskId());
            returnTask(worker, task);
            return;
        }
    } catch (Exception e) {
        logger.error(String.format("ack exception for task %s, taskId = %s in worker - %s", task.getTaskDefName(), task.getTaskId(), worker.getIdentity()), e);
        WorkflowTaskMetrics.incrementTaskAckErrorCount(worker.getTaskDefName(), e);
        returnTask(worker, task);
        return;
    }
    com.google.common.base.Stopwatch stopwatch = com.google.common.base.Stopwatch.createStarted();
    TaskResult result = null;
    try {
        //前面大部分都是做监控和统计功能的,在这里不细说
        //这段代码是真正执行用户Task任务的代码,执行完后返回值被封装为TaskResult对象
        logger.debug("Executing task {} in worker {} at {}", task, worker.getClass().getSimpleName(), worker.getIdentity());
        result = worker.execute(task);
        result.setWorkflowInstanceId(task.getWorkflowInstanceId());
        result.setTaskId(task.getTaskId());
        result.setWorkerId(worker.getIdentity());
    } catch (Exception e) {
        logger.error("Unable to execute task {}", task, e);
        if (result == null) {
            task.setStatus(Task.Status.FAILED);
            result = new TaskResult(task);
        }
        handleException(e, result, worker, task);
    } finally {
        stopwatch.stop();
        WorkflowTaskMetrics.getExecutionTimer(worker.getTaskDefName())
                .record(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    }
    logger.debug("Task {} executed by worker {} at {} with status {}", task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), task.getStatus());
    //更新任务状态,成功或者失败
    updateWithRetry(updateRetryCount, task, result, worker);
}

代码说明:

通过worker.execute方法执行用户定义的任务逻辑,不管是否成功都执行updatewithRetry方法更新server端的任务状态和任务执行返回结果。

访问的URL是/api/tasks。

三、完整流程时序图


image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • 毕业四年,一场婚礼把我们散落四方的兄弟聚在一起。烟雨蒙蒙,我们一起城头走一走,静静的城楼,和我的心境刚好契合。一场...
    小陈流浪记阅读 223评论 0 1
  • 中秋节的晚上,吃完晚饭,点点就着急了,要下楼。说是要去找嫦娥。还给我讲了嫦娥和后羿的故事。因为今天是阴天,没有月亮...
    初夏雨_5d40阅读 126评论 0 0
  • 今天开始书法课正式假期上课了,由于前几天你去济南耽误了两天,以后你认真练习,把耽误的课补上。 ...
    孙岑瑶阅读 63评论 0 0
  • 使用前的准备 ( 一 ) 更新系统 命令:sudo apt-get updatesudo apt-get upgr...
    仰望星空的流浪猫阅读 5,364评论 0 2