Ambari 集成 Flink

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

本篇接Ambari集成组件的方法。只有概述是不够的,本篇是Ambari集成Flink组件的实战内容。

本项目GitHub地址:paul8263/flink-ambari (github.com)

Ambari组件集成项目目录结构

FLINK
├── alerts.json -- 告警配置文件,用于standalone模式检测Flink JobManager web ui端口是否可访问
├── configuration -- Flink自身配置文件配置项模板。Ambari展示配置页面使用
│   ├── flink-conf.xml
│   ├── flink-log4j-cli.xml
│   ├── flink-log4j-console.xml
│   ├── flink-log4j-session.xml
│   ├── flink-log4j.xml
│   ├── flink-standalone.xml -- 自己定制的配置项,是否启用standalone模式,决定了是否启用运行状态检查
│   ├── flink-yarn-session.xml -- 自己定制的配置项,通过Ambari界面启动yarn-session时候的参数值
│   ├── masters.xml -- 对应standalone模式的masters文件
│   └── workers.xml -- 同上
├── metainfo.xml -- 组件元数据信息
├── package
│   ├── scripts -- Ambari 运维脚本
│   │   ├── flink_master.py -- Flink JobManager节点/客户端维护脚本
│   │   ├── flink_slave.py -- Flink TaskManager节点维护脚本
│   │   ├── flink_utils.py -- Flink 运维脚本封装
│   │   ├── __init__.py
│   │   ├── params.py -- Flink配置参数变量,从上面configuration中获取
│   │   ├── repoin.py -- 读取HDP软件源的URL
│   │   ├── service_check.py -- 运行服务检查的逻辑所在
│   │   └── yarn_utils.py -- 封装操作Yarn的相关命令
│   └── templates
│       └── flink-conf.yaml.j2 -- flink-conf.yaml配置文件模板
└── quicklinks
    └── quicklinks.json -- Flink组件Ambari展示页面右侧的快速链接。Standalone模式支持点击后跳转到Flink管理页面

配置项编写

在编写前先介绍配置文件组织形式和Ambari组件配置页面展示形式的对应关系。Ambari组件配置页面包含多个配置组(卷展栏),每个配置组包含多个配置项。这里一个配置组对应着一个configuration中的xml配置项模板。如果组件的配置项有很多,最好是通过多个配置项模板的方式,将这些配置项归类,便于管理。

对于Flink组件而言,它包含多个配置文件。我们按照配置项所在的配置文件,将它们分组归类。

我们选择Flink源代码包中flink-conf.yaml默认包含的配置项,添加到Flink组件的配置页面中。这些配置项是较为常用的。

下面讲解配置项的模板的编写方式。由于配置项非常多,不可能一一列举,这里只选择几个有代表性的分析。完整的配置项代码请参考GitHub。

对于文本类型的配置项,可以使用如下方式定义:

<property>
    <name>jobmanager_rpc_address</name>
    <value>localhost</value>
    <display-name>jobmanager.rpc.address</display-name>
    <description>The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.</description>
</property>

这里定义了一个配置项jobmanager_rpc_address,在页面上显示为jobmanager.rpc.address,默认值为localhostdescription这一大段内容为配置项的描述。鼠标悬停在配置页面该配置项的时候会展示description的内容。

对于数字类型的配置项,尤其是带单位提示的,可以按照如下配置。例如:

<property>
    <name>jobmanager_memory_process_size</name>
    <value>1600</value>
    <display-name>jobmanager.memory.process.size</display-name>
    <description>The total process memory size for the JobManager.</description>
    <value-attributes>
        <type>int</type>
        <minimum>0</minimum>
        <maximum>32768</maximum>
        <unit>MB</unit>
        <increment-step>256</increment-step>
        <overridable>false</overridable>
    </value-attributes>
</property>

我们可以定义额外的value-attributes项目,其中:

  • type: 配置项的参数类型,int表示整数型
  • minimum: 允许的最小值
  • maximum: 允许的最大值
  • unit: 单位
  • increment-step: 步进值
  • overridable: 官网无明确解释,字面意思是是否可覆盖

接下来还有一个问题,遇到这些固定配置项之外的我们怎么办。上面的配置项在页面展示为单行文本框input。为了便于用于填写多行的自定义配置,我们可以定义一个在页面展示为textarea的配置项。例如:

<property>
    <name>custom_properties</name>
    <value>
# This is the place for your custom properties.
# e.g.
# your_key: your_value
    </value>
    <display-name>Custom properties</display-name>
    <description>The place for your custom properties</description>
    <value-attributes>
        <type>content</type>
    </value-attributes>
</property>

这里的关键是指定typecontent,从而实现将默认的单行文本框替换为多行的textarea。

只支持on Yarn的Flink部署

这种情况下Flink是无服务无状态的客户端模式。即无需再Ambari中作为服务启动,仅仅需要安装在需要提交Flink作业的节点上。这种无服务无状态的组件在Ambari中称之为CLIENT(客户端)。metainfo.xml文件中component配置如下所示:

<component>
    <name>FLINK_CLIENT</name>
    <displayName>Flink Client</displayName>
    <category>CLIENT</category>
    <cardinality>1+</cardinality>
    <versionAdvertised>true</versionAdvertised>
    <commandScript>
        <script>scripts/flink_master.py</script>
        <scriptType>PYTHON</scriptType>
        <timeout>1200</timeout>
    </commandScript>
    <logs>
        <log>
            <logId>flink_master</logId>
            <primary>true</primary>
        </log>
    </logs>
    <configFiles>
        ...
    </configFiles>
</component>

对于CLIENT而言,我们需要在脚本中提供installconfigurestatus方法。

我们在flink_master.py文件中编写如下方法:

def install(self, env):
    import params
    env.set_params(params)
    install_flink()
    self.configure(env)

def configure(self, env):
    import params
    env.set_params(params)
    configure_flink()
    
def status(self, env):
    import params
    env.set_params(params)
    if not params.standalone_enabled:
        raise ClientComponentHasNoStatus()
    elif not is_job_manager_running():
        raise ComponentIsNotRunning()

其中install_flink方法使用在目标机器执行wget命令下载tar包然后解压的方式安装Flink。

def install_flink():
    Logger.info('Creating Flink group')
    import params

# 创建用户组和用户
    try:
        grp.getgrnam(params.flink_group)
    except KeyError:
        Group(params.flink_group)

    Logger.info('Creating Flink user')
    try:
        pwd.getpwnam(params.flink_user)
    except KeyError:
        User(params.flink_user,
             gid=params.flink_group,
             groups=[params.flink_group],
             ignore_failures=True
             )
# 创建安装目录
    Logger.info('Creating Flink install directory')
    Directory([params.flink_installation_dir],
              mode=0755,
              cd_access='a',
              owner=params.flink_user,
              group=params.flink_group,
              create_parents=True
              )
# 删除已存在的安装包
    Logger.info('Check existing files')
    if os.path.exists(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)):
        Logger.info('Flink tarball exists. Delete it before downloading')
        Execute("rm -f {0}".format(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)))

    if os.path.exists(params.FLINK_HOME):
        Logger.info('Flink binary has been extracted. Delete it before installation')
        Execute("rm -rf {0}".format(params.FLINK_HOME))

# 下载安装包
    Logger.info('Downloading Flink binaries from: {0}'.format(params.flink_download_url))
    Execute("cd {0}; wget {1} -O {2}".format(params.flink_installation_dir, params.flink_download_url, params.FLINK_TAR_NAME),
            user=params.flink_user)

# 解压安装包
    Logger.info('Extracting Flink binaries')
    Execute("cd {0}; tar -zxvf {1}".format(params.flink_installation_dir, params.FLINK_TAR_NAME), user=params.flink_user)
    File(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME), action='delete')

# 给予log目录最大权限,使用非安装用户stantalone模式启动Flink的时候,如果没有配置log目录权限,日志会写入失败
    Logger.info('Modify log folder access permissions')
    Execute("cd {0}; chmod 777 log".format(params.FLINK_HOME), user=params.flink_user)
# 删除tar包,只保留解压后的目录
    Logger.info('Delete Flink tarball')
    Execute("rm -f {0}".format(os.path.join(params.flink_installation_dir, params.FLINK_TAR_NAME)))
# 创建相关命令软链接到`/bin`,之后Flink相关命令可以全局使用。可选步骤
    Logger.info('Creating symbolic links')
    create_symbolic_link()

    Logger.info('Flink installation completed')

configure方法从Ambari的Flink配置向导中读取Flink各个配置文件项目的值,写入到对应的配置文件中。代码如下:

def configure_flink():
    Logger.info('Configuring Flink')
    import params
    flink_conf_dir = os.path.join(params.FLINK_HOME, 'conf')

    File(os.path.join(flink_conf_dir, 'flink-conf.yaml'),
         content=Template("flink-conf.yaml.j2"),
         owner=params.flink_user,
         group=params.flink_group
         )

    File(os.path.join(flink_conf_dir, "log4j.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_props
         )

    File(os.path.join(flink_conf_dir, "log4j-cli.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_cli_props
         )

    File(os.path.join(flink_conf_dir, "log4j-console.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_console_props
         )

    File(os.path.join(flink_conf_dir, "log4j-session.properties"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.log4j_session_props
         )

    File(os.path.join(flink_conf_dir, "masters"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.masters
         )

    File(os.path.join(flink_conf_dir, "workers"),
         owner=params.flink_user,
         group=params.flink_group,
         content=params.workers
         )

status方法用来判断Flink组件的运行状态:启动/停止/无状态。在CLIENT模式当然是无状态的。

def status(self, env):
    import params
    env.set_params(params)
    if not params.standalone_enabled:
        raise ClientComponentHasNoStatus()
    elif not is_job_manager_running():
        raise ComponentIsNotRunning()

注意,这里为了兼容有状态(Flink standalone)和无状态两种方式,使用params.standalone_enabled变量作为标记。如果不使用standalone部署方式,抛出ClientComponentHasNoStatus异常,代表组件没有状态。

如果是standalone模式,is_job_manager_running()方法通过ps命令检查jobmanager进程是否存在的方式,判断standalone集群的运行状态。如果组件没有运行,需要抛出ComponentIsNotRunning异常。

支持standalone模式的Flink部署

standalone模式要求FLink组件在Ambari中式有状态的,通过Ambari可以控制Flink standalone集群的启动和停止,并且Ambari能够正确的跟踪到standalone集群的运行状态。

standalone模式的metainfo.xml文件component部分配置如下。其中MASTER代表了有状态服务的主服务,SLAVE代表了从属服务。

<component>
    <name>FLINK_MASTER</name>
    <displayName>JobManager</displayName>
    <category>MASTER</category>
    <cardinality>1+</cardinality>
    <versionAdvertised>true</versionAdvertised>
    <commandScript>
        <script>scripts/flink_master.py</script>
        <scriptType>PYTHON</scriptType>
        <timeout>1200</timeout>
    </commandScript>
    <logs>
        <log>
            <logId>flink_master</logId>
            <primary>true</primary>
        </log>
    </logs>
    <customCommands>
        <customCommand>
            <name>START_YARN_SESSION</name>
            <script>scripts/flink_master.py</script>
            <scriptType>PYTHON</scriptType>
        </customCommand>
        <customCommand>
            <name>STOP_YARN_SESSION</name>
            <script>scripts/flink_master.py</script>
            <scriptType>PYTHON</scriptType>
        </customCommand>
    </customCommands>
    <configFiles>
        ...
    </configFiles>
</component>
<component>
    <name>FLINK_SLAVE</name>
    <displayName>TaskManager</displayName>
    <category>SLAVE</category>
    <cardinality>1+</cardinality>
    <versionAdvertised>true</versionAdvertised>
    <commandScript>
        <script>scripts/flink_slave.py</script>
        <scriptType>PYTHON</scriptType>
        <timeout>1200</timeout>
    </commandScript>
    <logs>
        <log>
            <logId>flink_slave</logId>
            <primary>true</primary>
        </log>
    </logs>
    <configFiles>
        ...
    </configFiles>
</component>

对于有状态的服务,脚本中除了提供installconfigurestatus方法外,还需要提供start(启动)和stop(停止)方法。

def stop(self, env):
    import params
    env.set_params(params)
    stop_flink_standalone_cluster()

def start(self, env):
    import params
    env.set_params(params)
    self.configure(env)
    start_flink_standalone_cluster()

其中start_flink_standalone_cluster()stop_flink_standalone_cluster()分别调用Flink bin目录中的start-cluster.shstop-cluster.sh。只需要在JobManager节点调用一次就可以控制整个集群(Flink jobmanager和taskmanager所在节点分别在masters和workers配置文件中)。

Flink standalone集群状态不正常的时候我们期望平台能够给出告警信息。Ambari平台在检测到配置的端口(JobManager的rest端口)不通的时候会给出告警,显示我们自己配置的告警信息。

针对Flink standalone集群的告警配置alert.json文件内容如下:

{
  "FLINK": {
    "service": [],
    "FLINK_MASTER": [
      {
        "name": "FLINK_MASTER",
        "label": "Flink Job Manager",
        "description": "This host-level alert is triggered if the Job Manager port is unreachable.",
        "interval": 1,
        "scope": "ANY",
        "source": {
          "type": "PORT",
          "uri": "{{flink-conf/rest_port}}",
          "default_port": 8081,
          "reporting": {
            "ok": {
              "text": "TCP OK - {0:.3f}s response on port {1}"
            },
            "warning": {
              "text": "TCP OK - {0:.3f}s response on port {1}",
              "value": 1.5
            },
            "critical": {
              "text": "Connection failed: {0} to {1}:{2}",
              "value": 5
            }
          }
        }
      }
    ]
  }
}

注意这里的端口是从配置项中读取的,如果没有配置,使用默认的8081端口。

参考文献

配置项定义方式:https://cwiki.apache.org/confluence/display/AMBARI/Configuration+support+in+Ambari

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容