Ambari 集成 Flink

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

本篇接Ambari集成组件的方法。只有概述是不够的,本篇是Ambari集成Flink组件的实战内容。

本项目GitHub地址:paul8263/flink-ambari (github.com)

Ambari组件集成项目目录结构

FLINK
├── alerts.json -- 告警配置文件,用于standalone模式检测Flink JobManager web ui端口是否可访问
├── configuration -- Flink自身配置文件配置项模板。Ambari展示配置页面使用
│   ├── flink-conf.xml
│   ├── flink-log4j-cli.xml
│   ├── flink-log4j-console.xml
│   ├── flink-log4j-session.xml
│   ├── flink-log4j.xml
│   ├── flink-standalone.xml -- 自己定制的配置项,是否启用standalone模式,决定了是否启用运行状态检查
│   ├── flink-yarn-session.xml -- 自己定制的配置项,通过Ambari界面启动yarn-session时候的参数值
│   ├── masters.xml -- 对应standalone模式的masters文件
│   └── workers.xml -- 同上
├── metainfo.xml -- 组件元数据信息
├── package
│   ├── scripts -- Ambari 运维脚本
│   │   ├── flink_master.py -- Flink JobManager节点/客户端维护脚本
│   │   ├── flink_slave.py -- Flink TaskManager节点维护脚本
│   │   ├── flink_utils.py -- Flink 运维脚本封装
│   │   ├── __init__.py
│   │   ├── params.py -- Flink配置参数变量,从上面configuration中获取
│   │   ├── repoin.py -- 读取HDP软件源的URL
│   │   ├── service_check.py -- 运行服务检查的逻辑所在
│   │   └── yarn_utils.py -- 封装操作Yarn的相关命令
│   └── templates
│       └── flink-conf.yaml.j2 -- flink-conf.yaml配置文件模板
└── quicklinks
    └── quicklinks.json -- Flink组件Ambari展示页面右侧的快速链接。Standalone模式支持点击后跳转到Flink管理页面

配置项编写

在编写前先介绍配置文件组织形式和Ambari组件配置页面展示形式的对应关系。Ambari组件配置页面包含多个配置组(卷展栏),每个配置组包含多个配置项。这里一个配置组对应着一个configuration中的xml配置项模板。如果组件的配置项有很多,最好是通过多个配置项模板的方式,将这些配置项归类,便于管理。

对于Flink组件而言,它包含多个配置文件。我们按照配置项所在的配置文件,将它们分组归类。

我们选择Flink源代码包中flink-conf.yaml默认包含的配置项,添加到Flink组件的配置页面中。这些配置项是较为常用的。

下面讲解配置项的模板的编写方式。由于配置项非常多,不可能一一列举,这里只选择几个有代表性的分析。完整的配置项代码请参考GitHub。

对于文本类型的配置项,可以使用如下方式定义:

<property>
    <name>jobmanager_rpc_address</name>
    <value>localhost</value>
    <display-name>jobmanager.rpc.address</display-name>
    <description>The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.</description>
</property>

这里定义了一个配置项jobmanager_rpc_address,在页面上显示为jobmanager.rpc.address,默认值为localhostdescription这一大段内容为配置项的描述。鼠标悬停在配置页面该配置项的时候会展示description的内容。

对于数字类型的配置项,尤其是带单位提示的,可以按照如下配置。例如:

<property>
    <name>jobmanager_memory_process_size</name>
    <value>1600</value>
    <display-name>jobmanager.memory.process.size</display-name>
    <description>The total process memory size for the JobManager.</description>
    <value-attributes>
        <type>int</type>
        <minimum>0</minimum>
        <maximum>32768</maximum>
        <unit>MB</unit>
        <increment-step>256</increment-step>
        <overridable>false</overridable>
    </value-attributes>
</property>

我们可以定义额外的value-attributes项目,其中:

  • type: 配置项的参数类型,int表示整数型
  • minimum: 允许的最小值
  • maximum: 允许的最大值
  • unit: 单位
  • increment-step: 步进值
  • overridable: 官网无明确解释,字面意思是是否可覆盖

接下来还有一个问题,遇到这些固定配置项之外的我们怎么办。上面的配置项在页面展示为单行文本框input。为了便于用于填写多行的自定义配置,我们可以定义一个在页面展示为textarea的配置项。例如:

<property>
    <name>custom_properties</name>
    <value>
# This is the place for your custom properties.
# e.g.
# your_key: your_value
    </value>
    <display-name>Custom properties</display-name>
    <description>The place for your custom properties</description>
    <value-attributes>
        <type>content</type>
    </value-attributes>
</property>

这里的关键是指定typecontent,从而实现将默认的单行文本框替换为多行的textarea。

只支持on Yarn的Flink部署

这种情况下Flink是无服务无状态的客户端模式。即无需再Ambari中作为服务启动,仅仅需要安装在需要提交Flink作业的节点上。这种无服务无状态的组件在Ambari中称之为CLIENT(客户端)。metainfo.xml文件中component配置如下所示:

<component>
    <name>FLINK_CLIENT</name>
    <displayName>Flink Client</displayName>
    <category>CLIENT</category>
    <cardinality>1+</cardinality>
    <versionAdvertised>true</versionAdvertised>
    <commandScript>
        <script>scripts/flink_master.py</script>
        <scriptType>PYTHON</scriptType>
        <timeout>1200</timeout>
    </commandScript>
    <logs>
        <log>
            <logId>flink_master</logId>
            <primary>true</primary>
        </log>
    </logs>
    <configFiles>
        ...
    </configFiles>
</component>

对于CLIENT而言,我们需要在脚本中提供installconfigurestatus方法。

我们在flink_master.py文件中编写如下方法:

def install(self, env):
    import params
    env.set_params(params)
    install_flink()
    self.configure(env)

def configure(self, env):
    import params
    env.set_params(params)
    configure_flink()
    
def status(self, env):
    import params
    env.set_params(params)
    if not params.standalone_enabled:
        raise ClientComponentHasNoStatus()
    elif not is_job_manager_running():
        raise ComponentIsNotRunning()

其中install_flink方法使用在目标机器执行wget命令下载tar包然后解压的方式安装Flink。

def install_flink():
    Logger.info('Creating Flink group')
    import params

# 创建用户组和用户
    try:
        grp.getgrnam(params.flink_group)
    except KeyError:
        Group(params.flink_group)

    Logger.info('Creating Flink user')
    try:
        pwd.getpwnam(params.flink_user)
    except KeyError:
        User(params.flink_user,
             gid=params.flink_group,
             groups=[params.flink_group],
             ignore_failures=True
             )
# 创建安装目录
    Logger.info('Creating Flink install directory')
    Directory([params.flink_installation_dir],
              mode=0755,
              cd_access='a',
              owner=params.flink_user,
              group=params.flink_group,
              create_parents=True
              )
# 删除已存在的安装包
    Logger.info('Check existing files')
    if os.path.exists(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)):
        Logger.info('Flink tarball exists. Delete it before downloading')
        Execute("rm -f {0}".format(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)))

    if os.path.exists(params.FLINK_HOME):
        Logger.info('Flink binary has been extracted. Delete it before installation')
        Execute("rm -rf {0}".format(params.FLINK_HOME))

# 下载安装包
    Logger.info('Downloading Flink binaries from: {0}'.format(params.flink_download_url))
    Execute("cd {0}; wget {1} -O {2}".format(params.flink_installation_dir, params.flink_download_url, params.FLINK_TAR_NAME),
            user=params.flink_user)

# 解压安装包
    Logger.info('Extracting Flink binaries')
    Execute("cd {0}; tar -zxvf {1}".format(params.flink_installation_dir, params.FLINK_TAR_NAME), user=params.flink_user)
    File(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME), action='delete')

# 给予log目录最大权限,使用非安装用户stantalone模式启动Flink的时候,如果没有配置log目录权限,日志会写入失败
    Logger.info('Modify log folder access permissions')
    Execute("cd {0}; chmod 777 log".format(params.FLINK_HOME), user=params.flink_user)
# 删除tar包,只保留解压后的目录
    Logger.info('Delete Flink tarball')
    Execute("rm -f {0}".format(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)))
# 创建相关命令软链接到`/bin`,之后Flink相关命令可以全局使用。可选步骤
    Logger.info('Creating symbolic links')
    create_symbolic_link()

    Logger.info('Flink installation completed')

configure方法从Ambari的Flink配置向导中读取Flink各个配置文件项目的值,写入到对应的配置文件中。代码如下:

def configure_flink():
    Logger.info('Configuring Flink')
    import params
    flink_conf_dir = os.path.join(params.FLINK_HOME, 'conf')

    File(os.path.join(flink_conf_dir, 'flink-conf.yaml'),
         content=Template("flink-conf.yaml.j2"),
         owner=params.flink_user,
         group=params.flink_group
         )

    File(os.path.join(flink_conf_dir, "log4j.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_props
         )

    File(os.path.join(flink_conf_dir, "log4j-cli.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_cli_props
         )

    File(os.path.join(flink_conf_dir, "log4j-console.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_console_props
         )

    File(os.path.join(flink_conf_dir, "log4j-session.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_session_props
         )

    File(os.path.join(flink_conf_dir, "masters"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.masters
         )

    File(os.path.join(flink_conf_dir, "workers"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.workers
         )

status方法用来判断Flink组件的运行状态:启动/停止/无状态。在CLIENT模式当然是无状态的。

def status(self, env):
    import params
    env.set_params(params)
    if not params.standalone_enabled:
        raise ClientComponentHasNoStatus()
    elif not is_job_manager_running():
        raise ComponentIsNotRunning()

注意,这里为了兼容有状态(Flink standalone)和无状态两种方式,使用params.standalone_enabled变量作为标记。如果不使用standalone部署方式,抛出ClientComponentHasNoStatus异常,代表组件没有状态。

如果是standalone模式,is_job_manager_running()方法通过ps命令检查jobmanager进程是否存在的方式,判断standalone集群的运行状态。如果组件没有运行,需要抛出ComponentIsNotRunning异常。

支持standalone模式的Flink部署

standalone模式要求FLink组件在Ambari中式有状态的,通过Ambari可以控制Flink standalone集群的启动和停止,并且Ambari能够正确的跟踪到standalone集群的运行状态。

standalone模式的metainfo.xml文件component部分配置如下。其中MASTER代表了有状态服务的主服务,SLAVE代表了从属服务。

<component>
    <name>FLINK_MASTER</name>
    <displayName>JobManager</displayName>
    <category>MASTER</category>
    <cardinality>1+</cardinality>
    <versionAdvertised>true</versionAdvertised>
    <commandScript>
        <script>scripts/flink_master.py</script>
        <scriptType>PYTHON</scriptType>
        <timeout>1200</timeout>
    </commandScript>
    <logs>
        <log>
            <logId>flink_master</logId>
            <primary>true</primary>
        </log>
    </logs>
    <customCommands>
        <customCommand>
            <name>START_YARN_SESSION</name>
            <script>scripts/flink_master.py</script>
            <scriptType>PYTHON</scriptType>
        </customCommand>
        <customCommand>
            <name>STOP_YARN_SESSION</name>
            <script>scripts/flink_master.py</script>
            <scriptType>PYTHON</scriptType>
        </customCommand>
    </customCommands>
    <configFiles>
        ...
    </configFiles>
</component>
<component>
    <name>FLINK_SLAVE</name>
    <displayName>TaskManager</displayName>
    <category>SLAVE</category>
    <cardinality>1+</cardinality>
    <versionAdvertised>true</versionAdvertised>
    <commandScript>
        <script>scripts/flink_slave.py</script>
        <scriptType>PYTHON</scriptType>
        <timeout>1200</timeout>
    </commandScript>
    <logs>
        <log>
            <logId>flink_slave</logId>
            <primary>true</primary>
        </log>
    </logs>
    <configFiles>
        ...
    </configFiles>
</component>

对于有状态的服务,脚本中除了提供installconfigurestatus方法外,还需要提供start(启动)和stop(停止)方法。

def stop(self, env):
    import params
    env.set_params(params)
    stop_flink_standalone_cluster()

def start(self, env):
    import params
    env.set_params(params)
    self.configure(env)
    start_flink_standalone_cluster()

其中start_flink_standalone_cluster()stop_flink_standalone_cluster()分别调用Flink bin目录中的start-cluster.shstop-cluster.sh。只需要在JobManager节点调用一次就可以控制整个集群(Flink jobmanager和taskmanager所在节点分别在masters和workers配置文件中)。

Flink standalone集群状态不正常的时候我们期望平台能够给出告警信息。Ambari平台在检测到配置的端口(JobManager的rest端口)不通的时候会给出告警,显示我们自己配置的告警信息。

针对Flink standalone集群的告警配置alert.json文件内容如下:

{
  "FLINK": {
    "service": [],
    "FLINK_MASTER": [
      {
        "name": "FLINK_MASTER",
        "label": "Flink Job Manager",
        "description": "This host-level alert is triggered if the Job Manager port is unreachable.",
        "interval": 1,
        "scope": "ANY",
        "source": {
          "type": "PORT",
          "uri": "{{flink-conf/rest_port}}",
          "default_port": 8081,
          "reporting": {
            "ok": {
              "text": "TCP OK - {0:.3f}s response on port {1}"
            },
            "warning": {
              "text": "TCP OK - {0:.3f}s response on port {1}",
              "value": 1.5
            },
            "critical": {
              "text": "Connection failed: {0} to {1}:{2}",
              "value": 5
            }
          }
        }
      }
    ]
  }
}

注意这里的端口是从配置项中读取的,如果没有配置,使用默认的8081端口。

参考文献

配置项定义方式:https://cwiki.apache.org/confluence/display/AMBARI/Configuration+support+in+Ambari

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容