Flink 使用介绍相关文档目录
前言
本篇接Ambari集成组件的方法。只有概述是不够的,本篇是Ambari集成Flink组件的实战内容。
本项目GitHub地址:paul8263/flink-ambari (github.com)
Ambari组件集成项目目录结构
FLINK
├── alerts.json -- 告警配置文件,用于standalone模式检测Flink JobManager web ui端口是否可访问
├── configuration -- Flink自身配置文件配置项模板。Ambari展示配置页面使用
│ ├── flink-conf.xml
│ ├── flink-log4j-cli.xml
│ ├── flink-log4j-console.xml
│ ├── flink-log4j-session.xml
│ ├── flink-log4j.xml
│ ├── flink-standalone.xml -- 自己定制的配置项,是否启用standalone模式,决定了是否启用运行状态检查
│ ├── flink-yarn-session.xml -- 自己定制的配置项,通过Ambari界面启动yarn-session时候的参数值
│ ├── masters.xml -- 对应standalone模式的masters文件
│ └── workers.xml -- 同上
├── metainfo.xml -- 组件元数据信息
├── package
│ ├── scripts -- Ambari 运维脚本
│ │ ├── flink_master.py -- Flink JobManager节点/客户端维护脚本
│ │ ├── flink_slave.py -- Flink TaskManager节点维护脚本
│ │ ├── flink_utils.py -- Flink 运维脚本封装
│ │ ├── __init__.py
│ │ ├── params.py -- Flink配置参数变量,从上面configuration中获取
│ │ ├── repoin.py -- 读取HDP软件源的URL
│ │ ├── service_check.py -- 运行服务检查的逻辑所在
│ │ └── yarn_utils.py -- 封装操作Yarn的相关命令
│ └── templates
│ └── flink-conf.yaml.j2 -- flink-conf.yaml配置文件模板
└── quicklinks
└── quicklinks.json -- Flink组件Ambari展示页面右侧的快速链接。Standalone模式支持点击后跳转到Flink管理页面
配置项编写
在编写前先介绍配置文件组织形式和Ambari组件配置页面展示形式的对应关系。Ambari组件配置页面包含多个配置组(卷展栏),每个配置组包含多个配置项。这里一个配置组对应着一个configuration
中的xml
配置项模板。如果组件的配置项有很多,最好是通过多个配置项模板的方式,将这些配置项归类,便于管理。
对于Flink组件而言,它包含多个配置文件。我们按照配置项所在的配置文件,将它们分组归类。
我们选择Flink源代码包中flink-conf.yaml
默认包含的配置项,添加到Flink组件的配置页面中。这些配置项是较为常用的。
下面讲解配置项的模板的编写方式。由于配置项非常多,不可能一一列举,这里只选择几个有代表性的分析。完整的配置项代码请参考GitHub。
对于文本类型的配置项,可以使用如下方式定义:
<property>
<name>jobmanager_rpc_address</name>
<value>localhost</value>
<display-name>jobmanager.rpc.address</display-name>
<description>The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.</description>
</property>
这里定义了一个配置项jobmanager_rpc_address
,在页面上显示为jobmanager.rpc.address
,默认值为localhost
。description
这一大段内容为配置项的描述。鼠标悬停在配置页面该配置项的时候会展示description
的内容。
对于数字类型的配置项,尤其是带单位提示的,可以按照如下配置。例如:
<property>
<name>jobmanager_memory_process_size</name>
<value>1600</value>
<display-name>jobmanager.memory.process.size</display-name>
<description>The total process memory size for the JobManager.</description>
<value-attributes>
<type>int</type>
<minimum>0</minimum>
<maximum>32768</maximum>
<unit>MB</unit>
<increment-step>256</increment-step>
<overridable>false</overridable>
</value-attributes>
</property>
我们可以定义额外的value-attributes
项目,其中:
- type: 配置项的参数类型,int表示整数型
- minimum: 允许的最小值
- maximum: 允许的最大值
- unit: 单位
- increment-step: 步进值
- overridable: 官网无明确解释,字面意思是是否可覆盖
接下来还有一个问题,遇到这些固定配置项之外的我们怎么办。上面的配置项在页面展示为单行文本框input。为了便于用于填写多行的自定义配置,我们可以定义一个在页面展示为textarea
的配置项。例如:
<property>
<name>custom_properties</name>
<value>
# This is the place for your custom properties.
# e.g.
# your_key: your_value
</value>
<display-name>Custom properties</display-name>
<description>The place for your custom properties</description>
<value-attributes>
<type>content</type>
</value-attributes>
</property>
这里的关键是指定type
为content
,从而实现将默认的单行文本框替换为多行的textarea。
只支持on Yarn的Flink部署
这种情况下Flink是无服务无状态的客户端模式。即无需再Ambari中作为服务启动,仅仅需要安装在需要提交Flink作业的节点上。这种无服务无状态的组件在Ambari中称之为CLIENT
(客户端)。metainfo.xml
文件中component
配置如下所示:
<component>
<name>FLINK_CLIENT</name>
<displayName>Flink Client</displayName>
<category>CLIENT</category>
<cardinality>1+</cardinality>
<versionAdvertised>true</versionAdvertised>
<commandScript>
<script>scripts/flink_master.py</script>
<scriptType>PYTHON</scriptType>
<timeout>1200</timeout>
</commandScript>
<logs>
<log>
<logId>flink_master</logId>
<primary>true</primary>
</log>
</logs>
<configFiles>
...
</configFiles>
</component>
对于CLIENT而言,我们需要在脚本中提供install
,configure
和status
方法。
我们在flink_master.py
文件中编写如下方法:
def install(self, env):
import params
env.set_params(params)
install_flink()
self.configure(env)
def configure(self, env):
import params
env.set_params(params)
configure_flink()
def status(self, env):
import params
env.set_params(params)
if not params.standalone_enabled:
raise ClientComponentHasNoStatus()
elif not is_job_manager_running():
raise ComponentIsNotRunning()
其中install_flink
方法使用在目标机器执行wget
命令下载tar包然后解压的方式安装Flink。
def install_flink():
Logger.info('Creating Flink group')
import params
# 创建用户组和用户
try:
grp.getgrnam(params.flink_group)
except KeyError:
Group(params.flink_group)
Logger.info('Creating Flink user')
try:
pwd.getpwnam(params.flink_user)
except KeyError:
User(params.flink_user,
gid=params.flink_group,
groups=[params.flink_group],
ignore_failures=True
)
# 创建安装目录
Logger.info('Creating Flink install directory')
Directory([params.flink_installation_dir],
mode=0755,
cd_access='a',
owner=params.flink_user,
group=params.flink_group,
create_parents=True
)
# 删除已存在的安装包
Logger.info('Check existing files')
if os.path.exists(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)):
Logger.info('Flink tarball exists. Delete it before downloading')
Execute("rm -f {0}".format(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)))
if os.path.exists(params.FLINK_HOME):
Logger.info('Flink binary has been extracted. Delete it before installation')
Execute("rm -rf {0}".format(params.FLINK_HOME))
# 下载安装包
Logger.info('Downloading Flink binaries from: {0}'.format(params.flink_download_url))
Execute("cd {0}; wget {1} -O {2}".format(params.flink_installation_dir, params.flink_download_url, params.FLINK_TAR_NAME),
user=params.flink_user)
# 解压安装包
Logger.info('Extracting Flink binaries')
Execute("cd {0}; tar -zxvf {1}".format(params.flink_installation_dir, params.FLINK_TAR_NAME), user=params.flink_user)
File(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME), action='delete')
# 给予log目录最大权限,使用非安装用户stantalone模式启动Flink的时候,如果没有配置log目录权限,日志会写入失败
Logger.info('Modify log folder access permissions')
Execute("cd {0}; chmod 777 log".format(params.FLINK_HOME), user=params.flink_user)
# 删除tar包,只保留解压后的目录
Logger.info('Delete Flink tarball')
Execute("rm -f {0}".format(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)))
# 创建相关命令软链接到`/bin`,之后Flink相关命令可以全局使用。可选步骤
Logger.info('Creating symbolic links')
create_symbolic_link()
Logger.info('Flink installation completed')
configure
方法从Ambari的Flink配置向导中读取Flink各个配置文件项目的值,写入到对应的配置文件中。代码如下:
def configure_flink():
Logger.info('Configuring Flink')
import params
flink_conf_dir = os.path.join(params.FLINK_HOME, 'conf')
File(os.path.join(flink_conf_dir, 'flink-conf.yaml'),
content=Template("flink-conf.yaml.j2"),
owner=params.flink_user,
group=params.flink_group
)
File(os.path.join(flink_conf_dir, "log4j.properties"),
owner=params.flink_user,
group=params.flink_group,
content=params.log4j_props
)
File(os.path.join(flink_conf_dir, "log4j-cli.properties"),
owner=params.flink_user,
group=params.flink_group,
content=params.log4j_cli_props
)
File(os.path.join(flink_conf_dir, "log4j-console.properties"),
owner=params.flink_user,
group=params.flink_group,
content=params.log4j_console_props
)
File(os.path.join(flink_conf_dir, "log4j-session.properties"),
owner=params.flink_user,
group=params.flink_group,
content=params.log4j_session_props
)
File(os.path.join(flink_conf_dir, "masters"),
owner=params.flink_user,
group=params.flink_group,
content=params.masters
)
File(os.path.join(flink_conf_dir, "workers"),
owner=params.flink_user,
group=params.flink_group,
content=params.workers
)
status
方法用来判断Flink组件的运行状态:启动/停止/无状态。在CLIENT模式当然是无状态的。
def status(self, env):
import params
env.set_params(params)
if not params.standalone_enabled:
raise ClientComponentHasNoStatus()
elif not is_job_manager_running():
raise ComponentIsNotRunning()
注意,这里为了兼容有状态(Flink standalone)和无状态两种方式,使用params.standalone_enabled
变量作为标记。如果不使用standalone部署方式,抛出ClientComponentHasNoStatus
异常,代表组件没有状态。
如果是standalone模式,is_job_manager_running()
方法通过ps
命令检查jobmanager进程是否存在的方式,判断standalone集群的运行状态。如果组件没有运行,需要抛出ComponentIsNotRunning
异常。
支持standalone模式的Flink部署
standalone模式要求FLink组件在Ambari中式有状态的,通过Ambari可以控制Flink standalone集群的启动和停止,并且Ambari能够正确的跟踪到standalone集群的运行状态。
standalone模式的metainfo.xml
文件component
部分配置如下。其中MASTER
代表了有状态服务的主服务,SLAVE
代表了从属服务。
<component>
<name>FLINK_MASTER</name>
<displayName>JobManager</displayName>
<category>MASTER</category>
<cardinality>1+</cardinality>
<versionAdvertised>true</versionAdvertised>
<commandScript>
<script>scripts/flink_master.py</script>
<scriptType>PYTHON</scriptType>
<timeout>1200</timeout>
</commandScript>
<logs>
<log>
<logId>flink_master</logId>
<primary>true</primary>
</log>
</logs>
<customCommands>
<customCommand>
<name>START_YARN_SESSION</name>
<script>scripts/flink_master.py</script>
<scriptType>PYTHON</scriptType>
</customCommand>
<customCommand>
<name>STOP_YARN_SESSION</name>
<script>scripts/flink_master.py</script>
<scriptType>PYTHON</scriptType>
</customCommand>
</customCommands>
<configFiles>
...
</configFiles>
</component>
<component>
<name>FLINK_SLAVE</name>
<displayName>TaskManager</displayName>
<category>SLAVE</category>
<cardinality>1+</cardinality>
<versionAdvertised>true</versionAdvertised>
<commandScript>
<script>scripts/flink_slave.py</script>
<scriptType>PYTHON</scriptType>
<timeout>1200</timeout>
</commandScript>
<logs>
<log>
<logId>flink_slave</logId>
<primary>true</primary>
</log>
</logs>
<configFiles>
...
</configFiles>
</component>
对于有状态的服务,脚本中除了提供install
,configure
和status
方法外,还需要提供start
(启动)和stop
(停止)方法。
def stop(self, env):
import params
env.set_params(params)
stop_flink_standalone_cluster()
def start(self, env):
import params
env.set_params(params)
self.configure(env)
start_flink_standalone_cluster()
其中start_flink_standalone_cluster()
和stop_flink_standalone_cluster()
分别调用Flink bin目录中的start-cluster.sh
和stop-cluster.sh
。只需要在JobManager节点调用一次就可以控制整个集群(Flink jobmanager和taskmanager所在节点分别在masters和workers配置文件中)。
Flink standalone集群状态不正常的时候我们期望平台能够给出告警信息。Ambari平台在检测到配置的端口(JobManager的rest端口)不通的时候会给出告警,显示我们自己配置的告警信息。
针对Flink standalone集群的告警配置alert.json
文件内容如下:
{
"FLINK": {
"service": [],
"FLINK_MASTER": [
{
"name": "FLINK_MASTER",
"label": "Flink Job Manager",
"description": "This host-level alert is triggered if the Job Manager port is unreachable.",
"interval": 1,
"scope": "ANY",
"source": {
"type": "PORT",
"uri": "{{flink-conf/rest_port}}",
"default_port": 8081,
"reporting": {
"ok": {
"text": "TCP OK - {0:.3f}s response on port {1}"
},
"warning": {
"text": "TCP OK - {0:.3f}s response on port {1}",
"value": 1.5
},
"critical": {
"text": "Connection failed: {0} to {1}:{2}",
"value": 5
}
}
}
}
]
}
}
注意这里的端口是从配置项中读取的,如果没有配置,使用默认的8081端口。
参考文献
配置项定义方式:https://cwiki.apache.org/confluence/display/AMBARI/Configuration+support+in+Ambari