NiFi采用了Zero-Master聚类模式。集群中的每个节点都对数据执行相同的任务,但是每个节点都使用不同的数据集。其中一个节点会自动选择(通过Apache ZooKeeper)作为集群协调器。然后集群中的所有节点将向此节点发送心跳/状态信息,并且此节点负责断开一段时间内没有心跳的节点。另外,当新节点选择加入集群时,新节点必须首先连接到当前选择的集群协调器,以获得最新的流。如果集群协调器确定允许节点加入(基于其配置的防火墙文件),则当前流被提供给该节点,并且该节点能够加入集群,假定该节点的flow拷贝与副本协调员提供的副本是一致的。如果节点的flow配置版本与群集协调器版本不同,则节点将不加入群集。
NiFi管理员或数据流管理器(DFM)可能会发现,在单个服务器上使用NiFi的一个实例不足以处理它们要处理的数据量。 所以,一个解决方案是在多个NiFi服务器上运行相同的数据流。 但是,这会产生管理问题,因为每次DFM想要更改或更新数据流时,都必须在每个服务器上进行这些更改,然后分别监视每个服务器。 通过对NiFi服务器做集群,可以增强的处理能力,并且通过一个单独的接口管理数据的变化和监控数据流。 集群允许DFM对于每一次更改仅有一次,并且将更改复制到集群的所有节点。 通过单个接口,DFM还可以监视所有节点的健康和状态。
NiFi集群是独一无二的,有其自己的术语。 在设置群集之前,了解以下术语非常重要。
NiFi集群协调器(NiFi Cluster Coordinator):NiFi集群集群协调器是NiFi集群中的节点,负责执行管理允许哪些节点在集群中,并为新加入的节点提供最新的flow 。 当DataFlow Manager管理集群中的数据流时,他们可以通过集群中任何节点的用户界面进行此操作。 所做的任何更改都会复制到群集中的所有节点。
节点(node):每个群集由一个或多个节点组成。 节点进行实际的数据处理。
主节点(Primary Node):每个集群都有一个主节点。 在这个节点上,可以运行“Isolated Processors(隔离的处理器)”(见下文)。 ZooKeeper会自动选择一个主节点。 如果该节点由于任何原因从集群断开连接,则将自动选择新的主节点。 用户可以通过查看用户界面的集群管理页面来确定当前选择哪个节点作为主节点。
隔离的处理器(Isolated Processors):在一个NiFi集群中,相同的数据流在所有节点上运行。因此,流中的每个组件都在每个节点上运行。但是,有些情况下DFM不希望每个处理器都在每个节点上运行。最常见的情况使用一个没啥扩展能力的协议与外部服务进行通信的处理器。例如,GetSFTP处理器从远程目录中提取,如果群集中的每个节点上运行的GetSFTP处理器同时尝试从相同的远程目录中提取,则可能存在竞争冲突。因此,DFM可以将主节点上的GetSFTP配置为独立运行,这意味着它只能在该节点上运行。它可以引入数据,并通过适当的数据流配置,在集群中的其余节点间进行负载平衡。请注意,虽然此功能存在,但仅使用独立的NiFi实例来提取数据并将其提供给群集也很常见。这取决于可用的资源情况以及管理员如何决定配置集群。
心跳(Heartbeats):节点通过“心跳”将其健康和状态传达给当前选定的集群协调员,让协调员知道他们仍然连接到集群并正常工作。缺省情况下,节点每5秒发出一次心跳,如果集群协调器在40秒内没有收到节点的心跳,则由于“缺少心跳”而断开节点。 (5秒设置可在nifi.properties文件中配置,请参阅本文档的“系统属性”部分以获取更多信息。)集群协调器断开节点的原因是协调器需要确保集群中的每个节点是同步的,并且如果一个节点没有定期发送心跳,协调器不能确定它仍然与群集的其余部分同步。如果在40秒之后,节点确实发送了新的心跳,协调器将自动请求节点重新加入集群,包括重新验证节点的数据流。由于缺少心跳而导致的断开以及一旦接收到心跳,重新连接都将在用户界面中的DFM中展现。
集群内的通信
如上所述,节点通过心跳与集群协调器进行通信。 当集群协调器被选定时,它将使用其连接信息更新Apache ZooKeeper中一个大家都知道的ZNode,以便节点知道往哪里发送心跳。 如果其中一个节点出现故障,集群中的其他节点将不会自动获取缺少节点的负载。 DFM可以为故障转移意外事件配置数据流; 但是,这取决于数据流设计,不会自动发生。
当DFM对数据流进行更改时,接收到更改流的请求的节点将这些更改传送到所有节点,并等待每个节点作出响应,表明它已对其本地流进行了更改。
处理断开的节点
DFM可以手动从集群断开一个节点。但是,如果某个节点由于其他原因(例如由于缺少心跳)而断开连接,则集群协调器将在用户界面上显示公告。在解决断开连接的节点问题之前,DFM将不能对数据流进行任何更改。 DFM或管理员将需要解决该节点的问题,并在解决它之前,数据流可能会发生变化。但是,值得注意的是,节点断开并不意味着它不工作,这可能有几个原因,包括节点由于网络问题而无法与群集协调器通信。
有些情况下即使节点未连接到群集,DFM仍然希望继续对流进行更改。在这种情况下,DFM可能会选择通过“集群管理”对话框完全从集群中删除节点。一旦被移除,节点就不能重新加入群集,直到它被重新启动。
流选举(Flow Election)
当一个集群首次启动时,NiFi必须确定哪个节点具有“正确”版本的流。这是通过对每个节点具有的流进行投票来完成的。当一个节点试图连接到一个集群时,它为集群协调器提供了一个本地流(local flow)的副本。如果尚未选择“正确的”流量,则将该节点的流量与每个其他节点的流量进行比较。如果另一个节点的流量匹配这个流量,则为此流量投票。如果没有与其他节点有相同的流量,那么这个流量将被添加候选的流量池中。经过一段时间后(通过nifi.cluster.flow.election.max.wait.time属性进行配置)或者某些节点数已投票(通过设置nifi.cluster.flow.election.max进行配置)。一个流量被选为流量的“正确”副本。所有具有不兼容流的节点将从群集中断开连接,而具有兼容流的节点将继承群集的流。选举是根据“大众投票”进行的,但要注意的是,除非所有流量都是空的,否则赢家将永远不会成为"empty flow"。这允许管理员删除节点的flow.xml.gz文件并重新启动节点,知道节点的流不会被投票为“正确的”流,除非找不到其他流。
基本群集设置
本节介绍由三个NiFi实例组成的简单三节点非安全集群的设置。
对于每个实例,需要更新nifi.properties文件中的一些属性。 特别是Web和Clustering属性应该根据您的情况进行评估并进行相应的调整。 所有的属性都在本指南的System Properties 部分描述; 但是在本节中,我们将重点介绍为最小简单群集设置的属性。
对于所有三个实例,可以使用默认设置保留集群公共属性。 但是,请注意,如果更改这些设置,则必须在集群中的每个实例上设置相同的设置。
对于每个节点,要配置的最小属性如下所示:
在“Web属性”部分下,设置您希望节点在其上运行的http或https端口。 另外,请考虑是否需要设置http或https host 属性。
在State Management部分下,将nifi.state.management.provider.cluster属性设置为Cluster State Provider的标识符。 确保已经在state-management.xml文件中配置了Cluster State Provider。 有关更多信息,请参阅"状态管理State Management"。
在“ Cluster Node属性”下,设置以下内容:
nifi.cluster.is.node - 将其设置为true。
nifi.cluster.node.address - 将其设置为节点的标准主机名。 如果留空,则默认为“localhost”。
nifi.cluster.node.protocol.port - 将其设置为高于1024的开放端口(任何更低的值都需要root权限)。
nifi.cluster.node.protocol.threads - 用于与集群中的其他节点通信的线程数。 此属性默认为10.线程池用于将请求复制到所有节点,并且线程池将永远不会少于此线程数。 它将根据需要增长到由nifi.cluster.node.protocol.max.threads属性设置的最大值。
nifi.cluster.node.protocol.max.threads - 用于与群集中的其他节点通信的最大线程数。 此属性默认为50.线程池用于向所有节点发出复制请求,并且线程池将具有由nifi.cluster.node.protocol.threads属性配置的“核心”大小。 但是,如有必要,线程池将使活动线程的数量增加到由此属性设置的限制。
nifi.zookeeper.connect.string - 连接到Apache ZooKeeper所需的连接字符串。 这是一个逗号分隔的hostname:port对列表。 例如,localhost:2181,localhost:2182,localhost:2183。
nifi.zookeeper.root.node - 应该在ZooKeeper中使用的根ZNode。 ZooKeeper提供了一个用于存储数据的目录结构。 这个结构中的每个目录都被称为ZNode。 这表示应该用于存储数据的根ZNode或目录。 默认值是/ root。 正确设置这一点非常重要,因为NiFi实例试图加入哪个集群由其连接的ZooKeeper实例和指定的ZooKeeper根节点决定。
nifi.cluster.flow.election.max.wait.time - 指定在将流选为“正确”流之前等待的时间。 如果已投票的节点数量等于nifi.cluster.flow.election.max.candidates属性指定的数量,则集群不会等待这么久。 默认值是5分钟。 请注意,第一次投票后,时间就会开始。
nifi.cluster.flow.election.max.candidates - 指定集群中提前选择流需要的节点数量。 这使得集群中的节点可以避免在开始处理之前等待很长时间,如果我们达到集群中至少这个数量的节点。
现在,可以启动群集。 启动实例的顺序并不重要。 连接到其中一个节点的URL,用户界面应与以下内容类似:
故障排除
如果遇到问题,并且您的集群不能按照描述的方式工作,请调查节点上的nifi-app.log和nifi-user.log文件。 如果需要,可以通过编辑conf / logback.xml文件将日志级别更改为DEBUG。 具体来说,在下面一行(而不是“INFO”)中设置level =“DEBUG”:
<logger name="org.apache.nifi.web.api.config" level="INFO" additivity="false">
<appender-ref ref="USER_FILE"/>
</ logger >