【NIFI】2.NIFI开发人员指南


1.介绍

本开发人员指南的目的是为读者提供了解Apache NiFi扩展的开发方式所需的信息,并帮助他们解释开发组件背后的思考过程。它提供了对用于开发扩展的API的介绍和说明。但是,由于本指南旨在补充API的JavaDoc,而不是替换它们,因此并未详细介绍API中的每种方法。本指南还假定读者熟悉Java 7和Apache Maven。

2.NiFi组件

NiFi提供了几个扩展点,使开发人员能够向应用程序添加功能以满足他们的需求。以下列表对最常见的扩展点进行了高级描述:

  • 处理器(Processer)
    Processor接口的机制是通过NiFi暴露访问FlowFile,和其属性和内容 。Processor是构成NiFi数据流的基本模块。Processor可以完成以下任务:
    创建FlowFiles,读取FlowFile内容,编写FlowFile内容,读取FlowFile属性,更新FlowFile属性,提取数据,出口数据,路线数据,提取数据,修改资料
  • ReportingTask
    ReportingTask接口的机制是允许将度量标准,监视信息和内部NiFi状态发布到外部端点,例如日志文件,电子邮件和远程Web服务。
  • ControllerService
    ControllerService在单个JVM中跨处理器,其他ControllerService和ReportingTask提供共享状态和功能。一个示例用例可能包括将非常大的数据集加载到内存中。通过在ControllerService中执行此工作,数据可以一次加载并通过此服务公开给所有处理器,而不需要许多不同的处理器自己加载数据集。
  • FlowFilePrioritizer
    FlowFilePrioritizer接口提供了一种机制,通过该机制,可以对队列中的FlowFile进行优先级排序或排序,以便可以按对特定用例最有效的顺序处理FlowFiles。
  • 授权提供者
    AuthorityProvider负责确定应授予给定用户哪些特权和角色(如果有)。

3.处理器API

处理器是NiFi中使用最广泛的组件。处理器是唯一有权访问以创建,删除,修改或检查FlowFiles(数据和属性)的组件。
使用Java的ServiceLoader机制加载和实例化所有处理器。这意味着所有处理器都必须遵守以下规则:
处理器必须具有默认构造函数。
处理器的JAR文件必须包含META-INF / services目录中名为的条目 org.apache.nifi.processor.Processor。这是一个文本文件,其中每一行都包含处理器的完全限定的类名。
尽管Processor是可以直接实现的接口,但是这样做org.apache.nifi.processor.AbstractProcessor几乎是很少见的,因为几乎所有处理器实现都是该类的基类。本AbstractProcessor类提供的功能的显著量,这使得开发的处理器更容易,更方便的任务。对于本文档,我们将AbstractProcessor在处理Processor API时重点关注该类。
并发注释
NiFi是一个高度并发的框架。这意味着所有扩展都必须是线程安全的。如果不熟悉用Java编写并发软件,强烈建议您熟悉Java并发性原理。

3.1支持API

为了理解Processor API,我们必须首先(至少在较高层次上)理解几个支持的类和接口,下面将对其进行讨论。

3.1.1 FlowFile

FlowFile是一种逻辑概念,它使一条数据与该数据的一组属性相关联。这些属性包括FlowFile的唯一标识符,其名称,大小以及任何其他特定于流的值。虽然FlowFile的内容和属性可以更改,但FlowFile对象是不可变的。通过ProcessSession可以对FlowFile进行修改。

FlowFiles的核心属性在org.apache.nifi.flowfile.attributes.CoreAttributes枚举中定义。你会看到最常用的属性是filename,path和uuid。括号中的字符串是CoreAttributes枚举中属性的值以及它在UI / API中的显示方式。

  • 文件名(filename):FlowFile的文件名。文件名不应包含任何目录结构。
  • UUID(uuid):分配给此FlowFile的通用唯一标识符,用于将FlowFile与系统中的其他FlowFile区分开。
  • 路径(path):FlowFile的路径指示FlowFile所属的相对目录,并且不包含文件名。
    绝对路径(absolute.path):FlowFile的绝对路径指示FlowFile所属的绝对目录,并且不包含文件名。
  • 优先级(priority):表示FlowFile优先级的数值。
  • MIME类型(mime.type):此FlowFile的MIME类型。
  • 丢弃原因(discard.reason):指定丢弃FlowFile的原因。
  • 备用标识符(alternate.identifier):表示除FlowFile的UUID之外的已知标识符,该标识符引用该FlowFile。

Additional Common Attributes

3.1.2 ProcessSession

ProcessSession,通常简称为“session”,提供了一种机制,通过它可以实现FlowFiles创建,销毁,检查,克隆以及将其转移到其他处理器。此外,ProcessSession提供了一种机制,用于通过添加或删除属性或修改FlowFile的内容来创建修改版本的FlowFiles。ProcessSession还公开了一种发出来源事件的机制,该机制提供了跟踪FlowFile的传输路径和历史记录的功能。在一个或多个FlowFiles上执行操作后,可以提交或回滚ProcessSession。

3.1.3 ProcessContext

ProcessContext在处理器与框架之间的提供了桥梁。它提供了处理器怎样进行当前配置和允许处理器执行特定框架的任务的信息,例如产生其资源,以便框架可以调度其他处理器运行,而无需消耗不必要的资源。

3.1.4 PropertyDescriptor

PropertyDescriptor定义一个可以由Processor,ReportingTask或ControllerService使用的属性。属性的定义包括其名称,属性描述,可选的默认值,验证逻辑,以及有关是否需要该属性才能使Processor有效的指示符。通过实例化该类的实例PropertyDescriptor.Builder ,调用适当的方法以填充有关属性的详细信息并最终调用该build方法来创建PropertyDescriptor 。

3.1.5 Validator

PropertyDescriptor必须指定一个或多个验证器,这些验证器可用于确保用户输入的属性值有效。如果验证器指示属性值无效,组件无法运行或者说在属性变为有效之前不能使用。如果未指定验证器,则该组件将被视为无效,并且NiFi将报告该属性不受支持。

3.1.6 ValidationContext

验证属性值时,ValidationContext用于获取ControllerServices,创建PropertyValue对象,以及使用表达式语言编译和评估属性值。

3.1.7 PropertyValue

所有返回给Processor的属性值都以PropertyValue对象的形式返回。该对象具有方便的方法,可以将值从字符串转换为其他形式,例如数字和时间段,以及提供用于评估表达式语言的API。

3.1.8 Relationship

Relationship定义了FlowFile可能从处理器传输到的路由。Relationships通过实例化Relationship.Builder 创建,调用适当的方法来填充Relationship的详细信息,最后调用该build()方法 。

3.1.9 StateManager

StateManager为处理器,报告任务和控制器服务提供了一种易于存储和检索状态的机制。该API与ConcurrentHashMap相似,但是每个操作都需要一个Scope。Scope表明是在局部还是在整个集群范围内检索/存储。有关更多信息,请参见 状态管理器部分。

3.1.10 ProcessorInitializationContext

创建处理器后,它的initialize()方法将被调用,同时创建InitializationContext对象。该对象向处理器暴露在整个处理器的生命周期内都不会改变的配置,例如处理器的唯一标识符。

3.1.11 组件日志

鼓励处理器通过该ComponentLog接口处理其日志记录 ,而不是由第三方记录器的实例获得。这是因为通过ComponentLog进行的日志记录允许框架将超出可配置严重性级别的日志消息呈现给用户界面,允许在发生重要事件时通知监视数据流的人员。此外,它为所有的处理器提供一种统一的日志格式,通过在调试模式下记录堆栈跟踪并在日志消息中提供处理器的唯一标识符。

3.2 AbstractProcessor API

由于绝大多数处理器将通过继承AbstractProcessor来创建,因此我们将在本节中研究它的抽象类。AbstractProcessor提供了一些处理器开发人员感兴趣的方法。如下:

3.2.1 Processor Initialization

当要创建处理器时,在其他方法被调用之前,AbstractProcessor 的init()方法将被调用。该方法只有一个参数,类型为 ProcessorInitializationContext。上下文对象为处理器提供了ComponentLog,处理器的唯一标识符和ControllerServiceLookup,可用于与已配置的ControllerServices进行交互。每个这样的对象是由AbstractProcessor存储,并且可以由子类经由getLogger,getIdentifier和 getControllerServiceLookup方法分别获得。

3.2.2 Exposing Processor’s Relationships

为了使处理器将FlowFile传输到新的目的地进行后续处理,处理器必须首先能够向框架公开其当前支持的所有关系。这允许应用程序的用户通过在处理器之间创建连接并为这些连接分配适当的关系来将处理器彼此连接。
处理器通过覆盖getRelationships方法公开有效的关系集 。这个方法没有参数,并返回Set的Relationship 对象。对于大多数处理器,此Set将是静态的,但是其他处理器将根据用户配置动态生成Set。对于那些Set是静态的Processor,建议在Processor的构造函数或init方法中创建一个不可变的Set并返回该值,而不是动态生成Set。这种模式使其更干净的代码和更好的性能。

3.2.3 Exposing Processor’s Properties

大多数处理器在使用前都需要一定数量的用户配置。处理器支持的属性通过getSupportedPropertyDescriptors方法公开给框架 。这个方法没有参数,并返回List的 PropertyDescriptor对象。列表中对象的顺序很重要,因为它决定了在用户界面中呈现属性的顺序。
甲PropertyDescriptor目的是通过创建一个新的实例构造PropertyDescriptor.Builder对象,调用构建器的适当的方法,并最终调用build方法。
尽管此方法涵盖了大多数用例,但有时还是希望允许用户配置名称未知的其他属性。这可以通过重写getSupportedDynamicPropertyDescriptor方法来实现 。此方法以String作为其唯一参数,该参数指示属性的名称。该方法返回一个PropertyDescriptor对象,该 对象可用于验证属性名称和值。从此方法返回的任何PropertyDescriptor都应构建isDynamic,并将PropertyDescriptor.Builder类中的值设置为true 。AbstractProcessor的默认行为是不允许任何动态创建的属性。

3.2.4 Validating Processor Properties

如果处理器的配置无效,则无法启动它。可以通过在PropertyDescriptor上设置Validator或通过PropertyDescriptor.Builder的allowableValues方法来限制属性的允许值,来实现Processor属性的验证identifiesControllerService。
但是,有时仅验证处理器的属性还不够。为此,AbstractProcessor公开了一个customValidate方法。该方法采用type的单个参数ValidationContext。此方法的返回值是一个Collection的 ValidationResult描述验证过程中发现的任何问题的对象。仅应返回isValid方法返回的那些ValidationResult对象 false。仅当所有属性均根据其关联的“验证器”和“允许值”有效时,才会调用此方法。即,仅当所有属性本身都有效时,才调用此方法,并且此方法允许对处理器的配置进行整体验证。

3.2.5 Responding to Changes in Configuration

有时需要让处理器在其属性发生更改时迅速做出反应。该onPropertyModified 方法允许处理器做到这一点。当用户更改处理器的属性值时,onPropertyModified将为每个修改后的属性调用该 方法。该方法采用三个参数:PropertyDescriptor(指示修改了哪个属性),旧值和新值。如果该属性没有先前的值,则第二个参数为null。如果删除了该属性,则第三个参数为null。重要的是要注意,无论值是否有效,都将调用此方法。仅在实际修改值时才调用此方法,而不是在用户更新Processor而不更改其值时调用此方法。在调用此方法时,可以确保调用此方法的线程是处理器中当前正在执行代码的唯一线程,除非处理器本身创建了自己的线程。

3.2.6 Performing the Work

当处理器有工作要做时,安排它onTrigger通过框架调用其方法来这样做。该方法有两个参数:a ProcessContext和a ProcessSession。该onTrigger方法的第一步通常是通过get在ProcessSession上调用方法之一来获取要在其上执行工作的FlowFile 。对于从外部来源将数据导入NiFi的处理器,将跳过此步骤。然后,处理器可以自由检查FlowFile属性。添加,删除或修改属性;读取或修改FlowFile内容;并将FlowFiles传输到适当的Relationships。

3.2.7 When Processors are Triggered

onTrigger仅当计划运行处理器并且该处理器存在工作时,才会调用处理器的方法。如果满足以下任何条件,则据说存在处理器的工作:
以处理器为目的地的连接在其队列中至少有一个FlowFile
处理器没有传入的连接
处理器带有@TriggerWhenEmpty批注
存在一些因素,这些因素将在onTrigger调用Processor 方法时起作用。首先,除非用户将处理器配置为运行,否则不会触发处理器。如果安排处理器运行,则框架会定期(该时间段由用户在用户界面中配置)检查处理器是否有工作要做,如上所述。如果是这样,框架将检查处理器的下游目标。如果处理器的任何出站连接已满,则默认情况下不会安排处理器运行。
但是,@TriggerWhenAnyDestinationAvailable可以将注释添加到处理器的类。在这种情况下,将更改需求,以使只有一个下游目标必须“可用”(如果连接队列未满,则将目标视为“可用”),而不是要求所有下游目标均可用。
@TriggerSerially 注释也与处理器调度有关。使用此注释的处理器永远不会有多个线程onTrigger同时运行该方法。但是,必须指出的是,执行代码的线程可能会因调用而改变。因此,仍必须注意确保处理器是线程安全的!

3.3 生命周期

NiFi API通过使用Java注解提供生命周期支持。该org.apache.nifi.annotation.lifecycle软件包包含一些用于生命周期管理的注释。以下注释可以应用于NiFi组件中的Java方法,以指示框架何时应调用这些方法。为了讨论组件生命周期,我们将NiFi组件定义为Processor,ControllerServices或ReportingTask。

3.3.1 @OnAdded

3.3.2@OnEnabled

3.3.3 @OnRemoved

3.3.4 @OnScheduled

3.3.5 @OnUnscheduled

3.3.6 @OnStopped

3.3.7 @OnShutdown

3.4 组件通知

3.4.1 @OnPrimaryNodeStateChange

3.5 Restricted

受限组件是一种组件,可以用来执行操作员通过NiFi REST API / UI提供的任意未经消毒的代码,也可以用于使用NiFi OS凭据在NiFi主机系统上获取或更改数据。这些组件可能会由其他经过授权的NiFi用户使用,以超出应用程序的预期用途,提升特权,或者可能公开有关NiFi进程或主机系统内部的数据。所有这些功能都应被视为特权,管理员应意识到这些功能,并为一部分受信任的用户显式启用它们。
可以使用@Restricted批注标记处理器,控制器服务或报告任务。这将导致该组件被视为受限组件,并要求将用户明确添加到可以访问受限组件的用户列表中。一旦允许用户访问受限制的组件,将在允许所有其他权限的情况下允许他们创建和修改那些组件。如果无法访问受限制的组件,则用户仍然会知道这些类型的组件的存在,但是即使拥有足够的权限,也无法创建或修改它们。

3.6

3.6.1 范围

3.6.2 存储和检索状态

3.6.3 单元测试

3.7 报告处理器活动


4. 记录组件

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容