nifi源码剖析-Processor调用处理过程

从AbstractProcessor方法开始

public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {
    @Override
    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        final ProcessSession session = sessionFactory.createSession(); // 创建StandardProcessSession
        try {
            onTrigger(context, session);
            session.commit();  // checkout + commit 
        } catch (final Throwable t) {
            getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
            session.rollback(true);
            throw t;
        }
    }
    // 具体到Processor的onTrigger方法
    public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}

首先进入每个的Processor的onTrigger()方法,该方法里先会调用session.write()方法,然后调用session.transfer方法
write方法里构建流

StandardFlowFileQueue: 队列的putAll方法

调度的循环开始: TimerDrivenSchedulingAgent->doSchedule()方法(只是一种策略)

FlowController里设置调度策略和对应agent的关系的map(StandardProcessScheduler)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,268评论 19 139
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,418评论 11 349
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,839评论 18 399
  • 一个IoSession的I/O事件是注册在一个Selector对象上,并且每个Processor线程只轮询一个Se...
    Mars_M阅读 919评论 0 2
  • 我本以为它是可口的点心、没想到吃下去它竟然不如一个蹄膀美味。 我的良心 如...
    暗的黑走进光的阳阅读 259评论 0 0