1. NiFi主界面
NiFi启动后,我们可以使用浏览器访问用户界面http://localhost:8080/nifi。可以通过修改“nifi.properties”,编辑http端口,默认端口是8080.在用户界面(UI)中可以设计和监视数据流程。
主界面中有多个工具,可以创建、管理数据流程。
全局菜单包含如下菜单项:
2. 添加一个"Processor"
我们可以通过在画布上添加"Processor"来开始创建数据流程。将“Processor”图标()由组件状态栏的最左边拖到画布的中间(类似于方格纸的背景),然后将其放在此处。 当放开鼠标后会弹出一个“Add Proccesor”对话框:
NiFi当前版本有288Processor。做为一个简单的例子,这里只将本地磁盘中的文件导入NiFi。当开发人员创建处理器时,开发人员可以为该处理器分配“标签”。当开发人员开发一个“Processor”时,可以为这个Processor设置一个或多个标签(Tag)。这些标签可以被视为关键字。可以通过在对话框右上角的“过滤器(Filter)”框中键入关键字,来按这些标签或关键字名称进行过滤。例如,输入关键字“file”(必须输入英文)界面将显示一些用于处理文件的Processor。同样,通过输入“local”进行过滤也会很快缩小列表的范围。当选中列表中的一个Processor,将在对话框底部附近看到对“processor”的简短描述。可以概括的了解到“processor”的功能。 “GetFile Processor”的功能是"从磁盘目录中的文件创建FlowFiles。 NiFi会忽略至少没有读取权限的文件"。最后点击“add”按钮,Processor将被添加到画布中。
添加完成界面显示如下:
3. 配置“Processor”
双击画布中的“Proccessor“或者使用”Context menu“的”configure“菜单向都能进入,processor的配置对话框。
“configuration processor”对话框的“setting”页签,配置一些基本属性,每个processor都有相同的配置项。
“configuration processor”对话框的“scheduling”页签,配置一些任务调度属性,每个processor都有相同的配置项。
“configuration processor”对话框的“properties”页签,配置一些"processor"的属性,属性列表的内容取决于processor的类型。粗体属性是必须配置的属性。如果有必须属性没有配置则Processor不能启动。将“Input Directory”的属性设置为:“/Users/laofeng/Downloads/apps/data_in”,当processor启动后会扫描这个目录,发现新文件后会创建一个FlowFile发送到下游Processor。
有许多属性已经有了默认值,如果没有特别需要可以不修改这些默认值。将鼠标悬停在“?”上,会显示属性的说明性和是否支持表达式语言。
“configuration processor”对话框的“comments”页签,输入一些注释信息,类似代码里注释的作用。
4.连接Processor
每个处理器都有一组定义的“Relationships(关系)”,可以将数据发送到这些关系。当Processor将FlowFile的处理完成后,会将其传输到这些关系之一。这使用户可以根据处理结果来配置如何处理FlowFiles。例如,许多“Processor”定义两个关系:success(成功)和fail(失败)。然后,如果"processor"能够成功处理数据,数据将沿着“success”关系的connection流转到下游“Processor”。如果处理器由于某种原因无法处理数据,数据将沿着“fail”关系的connection流转下游“processor”,也可以自动关闭关系(Automatically Terminate Relationships),此种情况下数据将会被直接丢弃。
现在,我们可以将“GetFile Processor”的连接到到“LogAttribute Processor”。用鼠标悬停在“GetFile Processor”上,并且Connection Icon(
我们可以将该图标从“GetFile Processor”拖到“LogAttribute Processor”。将弹出一个对话框“Create Connection”,选择要为此连接包括的关系。因为GetFile只有一个关系成功,所以将自动为选中它。
单击“设置”选项卡,可以提供一些选项来配置此连接的行为方式:
在此界面上可以为“Connection”命名,否则将会以“relationship“的名称命名,本例中将会被命名为"success". Id,不需要设置,系统会生成一个UUID。“FlowFile Expiration”,队列中数据超时时间,默认值“0 sec”代表用不超时。
当上游“processor”生产数据快,下游“processor”生产数据慢时,数据在Connection队列中不断积累,迟早会导致系统崩溃。“backpressure thresholds” 背压阈值,允许我们设置一个量,当队列中数据达到一定规模时,将通知上游“processor”暂定执行,以释放Connection队列中的压力,避免系统崩溃。当队列中FlowFile达到一定数量时或者FlowFile占用空间超过一定大小时,启动背压。如图:队列对象数超过10000个启动背压,数据超过1G存储空间启动背压。
“Load Balance Strategy ”,配置负载均衡策略。
最后,在对话框的右侧有优先级排序器(Selected Prioritizers ),通过配置此Prioritizers的实现,可以控制如何对队列数据进行排序。可以将优先级从“Available Prioritizers ”列表拖到“Selected Prioritizers ”列表中,以激活Prioritizers。可以同时使用多个Prioritizers,以使FlowFile优先级计算更加精确。
5.启动流程
在正式运行这个dataflow(数据流程)之前,还要将“log LogAttribute processor”的“success relationship”关闭。
此时整个dataflow(数据流程)处于停止行状态,“proccsor name”旁边显示一个红色的小正方(
启动processor的两种方法:
-
选中processor,在operate面板中点击“start”按钮(
)
-
使用processor的右键菜单,使用“Start”菜单项
启动后“proccsor name”旁边显示一个绿色的小三角形(
停止流程
停止processor的两种方法:
-
选中processor,在operate面板中点击“stop”按钮(
)
-
使用processor的右键菜单,使用“Stop”菜单项。当processor处于运行状态时,右键菜单“start”菜单项将会变成“stop”菜单项
6.验证路程
向“/Users/laofeng/Downloads/apps/data_in”目录投入文件,在ui上监控流程运行。
复制一个图片global-menu.png,大小约32,770 字节。执行速度非常快没有几乎观察不到中间过程。
- “LogAttribute processor“将FlowFile的属性,输出到了nifi-app.log中,如下所示:
2020-11-16 22:35:24,656 INFO [Timer-Driven Process Thread-7] o.a.n.processors.standard.LogAttribute LogAttribute[id=d1325141-0175-1000-444c-61b0bd3447e5] logging for flow file StandardFlowFileRecord[uuid=aca7f068-914a-49d4-9fe9-e9549cbaaed1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1605449326182-2, container=default, section=2], offset=179510, length=32770],offset=0,name=global-menu.png,size=32770]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Mon Nov 16 22:35:24 CST 2020'
Key: 'lineageStartDate'
Value: 'Mon Nov 16 22:35:24 CST 2020'
Key: 'fileSize'
Value: '32770'
FlowFile Attribute Map Content
Key: 'absolute.path'
Value: '/Users/laofeng/Downloads/apps/data_in/'
Key: 'file.creationTime'
Value: '2020-11-16T17:37:34+0800'
Key: 'file.group'
Value: 'staff'
Key: 'file.lastAccessTime'
Value: '2020-11-16T21:37:28+0800'
Key: 'file.lastModifiedTime'
Value: '2020-11-16T17:37:34+0800'
Key: 'file.owner'
Value: 'laofeng'
Key: 'file.permissions'
Value: 'rw-r--r--'
Key: 'filename'
Value: 'global-menu.png'
Key: 'path'
Value: '/'
Key: 'uuid'
Value: 'aca7f068-914a-49d4-9fe9-e9549cbaaed1'
--------------------------------------------------
将文件原有的所有属性均处理成了FlowFile的属性,写入了日志文件。
7. Processor更多信息
由于每个processor都有多个不同的属性和关系,因此记住288个processor如何使用非常具有挑战性的。 为了解决这个问题,可以右键单击processor,然后选择“view usage”菜单项。 这将会弹出一个窗口显示当前processor的文档。只能说NiFi考虑周到。
弹出的文档窗口如下所示: