Ceilometer 源码学习- Polling Agent

简介

Ceilometer是Openstack中用于数据采集的基础设施,包括多个组件:Central Agent,Compute Agent,Notification Agent,Collector等。其中Central Agent和Compute Agent分别运行在控制节点和计算节点上,通过定期调用其他服务的api来完成数据采集。由于二者的区别只是所负责的数据来源,这里我们统称为Polling Agent。

功能作用

Polling Agent的功能很简单:
“周期性地向其他服务主动拉取需要的数据,并将数据发送到消息队列。”
结构图如下:

image.png

站在设计者的角度,要完成上述功能,需要处理的有如下几个基本问题:

  • 怎么执行拉取?
  • 向哪些服务拉取数据,周期怎么定义的?
  • 对于某个服务收集哪些数据以及如何收集?

下面分别针对上述问题依次介绍Ceilometer的实现方式:

  1. 常驻进程:自然的我们需要一个常驻进程来完成上述调度任务,基本操作包括:

    • 记录全局状态;
    • 周期性的触发;
    • 负责消息的发送。
  2. 插件形式:Ceilometer中用定义插件的方式定义多个收集器(Pollster),程序从配置文件中获得需要加载的收集器列表,用插件的形式是一个很好的选择,因为:

    • python对插件的良好支持:stevedore
    • 简化核心逻辑;
    • 方便扩展。
  3. 共同基类:数据来源多种多样,针对不同的数据来源获取数据方式各有不同,但他们需要完成同样的的动作,Ceilometer中设计Pollster的共同基类,定义了如下接口,是每个Pollster都是要实现的:

    • 默认获取数据来源的方式:default_discovery;
    • 拉取数据:get_samples

流程简介

正是由于上面所说的实现方式使得Polling Agent的核心逻辑变得非常简单,不需要关注具体的数据收集过程,而将自己解放成一个调度管理者,下面将简单介绍其实现逻辑。在此之前为了方便说明,先介绍其中涉及到的角色或组件:

AgentManager:Polling Agent的核心类,Central Agent和Compute Agent用不同的参数初始化AgentManager;
Pollster:数据收集器,以插件的形式动态载入;
Discover:以一定的方式发现数据源Resource;
Pipeline:Ceilometer通过pipleline.yml文件的形式定义了所收集数据的一系列转换发送操作,很好的降低了各组件的耦合性和系统的复杂性。该文件中以sources标签定义了不同数据的分组信息,这部分是在Polling Agent中需要关心的;
PollingTask:望文生义,表示一个拉取数据的任务;
Resource:代表一个可提供数据的源,如一个镜像或一个虚拟机实例

基本流程如下:

AgentManger初始化,主要完成如下动作:
从配置文件中动态加载所有收集器插件Pollster;
从配置文件中动态加载所有资源发现器插件Discover。

AgentManger启动:
从pipeline文件读取sources信息;
为每一个从文件中加载的Pollster根据Pipeline信息分配一个PollingTask;
为每个PollingTask建立Timer定时执行。

PollingTask执行:
通过Pollster的default_discovery函数定义,从已加载的资源发现器Discover中选取合适的一个;
调用Discover的discovery函数获取Resource;
调用Pollster的get_samples函数,从Resource中获得采样数据;
发送给消息队列。

1. 入口

  • Ceilometer采用pbr的方式管理配置,
  • setup.cfg中定义了Polling Agent 入口位置,如下:
console_scripts =
    ceilometer-polling = ceilometer.cmd.polling:main

2. ceilometer.cmd.polling.main

相应的,在ceilometer.cmd.polling.main文件中找到该函数,如下:

def main():
    service.prepare_service()
    sm = cotyledon.ServiceManager()
    sm.add(create_polling_service)
    sm.run()
def create_polling_service(worker_id):
    return manager.AgentManager(CONF.polling_namespaces,
                                CONF.pollster_list,
                                worker_id)
  • prepare_service中做了一些初始化工作,如初始化日志,加载配置文件等;
  • create_polling_service为核心,配置并启动了manager.AgentManager,进一步了解到主要工作发生在该类的中,即AgentManager

3. AgentManager初始化

ceilometer.agent.manager.AgentManager初始化部分代码,如下:

class AgentManager(service_base.PipelineBasedService):

    def __init__(self, namespaces=None, pollster_list=None, worker_id=0):
        .......
        # 从配置文件中动态加载收集器Pollster
        extensions = (self._extensions('poll', namespace).extensions
                      for namespace in namespaces)
        .......

        self.extensions = list(itertools.chain(*list(extensions))) + list(
            itertools.chain(*list(extensions_fb)))
        .......
        # 从配置文件中动态加载资源发现器Discoveries
        discoveries = (self._extensions('discover', namespace).extensions
                       for namespace in namespaces)
        self.discoveries = list(itertools.chain(*list(discoveries)))
        .......
  • 可以看出_extensions函数中通过stevedore加载了配置文件中的对应namespace下的插件;
  • 初始化过程init中,主要做了两件事情:
    • 加载ceilometer.poll.central/compute下的插件到self.extensions,即上面所说的收集器Pollster;
    • 加载ceilometer.discover下的插件到self.discovery_manager,即上面所说的资源发现器Discover。
      而在配置文件setup.cfg中可以看到对应的定义,截取部分在这里:
...
ceilometer.poll.central =
      ip.floating = ceilometer.network.floatingip:FloatingIPPollster
      image = ceilometer.image.glance:ImagePollster
      image.size = ceilometer.image.glance:ImageSizePollster
      ...
...   
ceilometer.discover =
      local_instances = ceilometer.compute.discovery:InstanceDiscovery
      endpoint = ceilometer.agent.discovery.endpoint:EndpointDiscovery
      tenant = ceilometer.agent.discovery.tenant:TenantDiscovery
      ...
 ...

4. AgentManager启动运行

在初始化完AgentManager后,就需要启动这个AgentManager,代码实现如下:

    def run(self):
        # 读取pipeline.yaml配置文件
        self.polling_manager = pipeline.setup_polling()
         ......
        # 定时拉取
        self.start_polling_tasks()
         ......

下面分别介绍这两行代码的功能:

  • pipeline.setup_polling中加载解析pipeline.yaml文件,来看一个pipeline.yaml中的示例,更多内容:pipeline
---
  sources:
      - name: meter_source
        interval: 600
        meters:
            - "*"
        sinks:
            - meter_sink
      - name: cpu_source
        ...
      ...
  sinks:
      - name: meter_sink
        transformers:
        publishers:
      ...
  ...

ceilometer中用pipeline配置文件的方式定义meter数据从收集到处理到发送的过程,在Polling Agent中我们只需要关心sources部分,在上述pipeline.setup_polling()中读取pipeline文件并解析封装其中的sources内容,供后面使用。

  • start_polling_tasks代码如下:
    def start_polling_tasks(self):
        ......
        # 创建PollingTask
        data = self.setup_polling_tasks()
        ......
      # PollingTask定时执行
        for interval, polling_task in data.items():
            delay_time = (interval + delay_polling_time if delay_start
                          else delay_polling_time)

            @periodics.periodic(spacing=interval, run_immediately=False)
            def task(running_task):
                self.interval_task(running_task)

            utils.spawn_thread(utils.delayed, delay_time,
                               self.polling_periodics.add, task, polling_task)

其中,setup_polling_tasks中新建PollingTask,并根据上一步中封装的sources内容,将每一个收集器Pollster根据其interval设置分配到不同的PollingTask中,interval相同的收集器会分配到同一个PollingTask中。之后每个PollingTask都根据其运行周期设置Timer定时执行。 注意,其中interval_task函数指定timer需要执行的任务。

5. PollingTask 执行

    def poll_and_notify(self):
        """Polling sample and notify."""
        ......
        # 循环处理PollingTask中的每一个收集器Pollster
        for source_name in self.pollster_matches:
            for pollster in self.pollster_matches[source_name]:
                ......
               # Discover发现可用的数据源
                if not candidate_res and pollster.obj.default_discovery:
                    candidate_res = self.manager.discover(
                        [pollster.obj.default_discovery], discovery_cache)
               ......
              # 做一些处理和过滤
               ......
                    # 从数据源处拉取采样数据
                    samples = pollster.obj.get_samples(
                        manager=self.manager,
                        cache=cache,
                        resources=polling_resources
                    )
               ......
                    # 发送数据到消息队列
                    for sample in samples:
                        .....
                        if self._batch:
                            sample_batch.append(sample_dict)
                        else:
                            self._send_notification([sample_dict])

                    if sample_batch:
                        self._send_notification(sample_batch)

                except plugin_base.PollsterPermanentError as err:
                    .....

可以看出,在这段代码中完成了比较核心的几个步骤:

  • 资源发现器Discover发现可用数据源;
  • 收集器Pollster拉取采样数据;
  • 发送数据到消息队列。

6. Pollster示例

上面介绍了Polling Agent中如何是如何加载Pollster执行数据的收集工作的。下面以获取CPU基本信息的CPUPollster为例,看一下具体的实现:

class CPUPollster(pollsters.BaseComputePollster):

    def get_samples(self, manager, cache, resources):
        for instance in resources:
            LOG.debug('checking instance %s', instance.id)
            try:
                cpu_info = self.inspector.inspect_cpus(instance)
                LOG.debug("CPUTIME USAGE: %(instance)s %(time)d",
                          {'instance': instance,
                           'time': cpu_info.time})
                cpu_num = {'cpu_number': cpu_info.number}
                yield util.make_sample_from_instance(
                    instance,
                    name='cpu',
                    type=sample.TYPE_CUMULATIVE,
                    unit='ns',
                    volume=cpu_info.time,
                    additional_metadata=cpu_num,
                    monotonic_time=monotonic.monotonic()
                )
            except virt_inspector.InstanceNotFoundException as err:
               ......
class BaseComputePollster(plugin_base.PollsterBase):

    @property
    def inspector(self):
        try:
            inspector = self._inspector
        except AttributeError:
            inspector = virt_inspector.get_hypervisor_inspector()
            BaseComputePollster._inspector = inspector
        return inspector

    @property
    def default_discovery(self):
        return 'local_instances'

      ......

代码路径ceilometer.compute.virt.libvirt.inspector.LibvirtInspector.inspect_cpus

 def inspect_cpus(self, instance):
        domain = self._get_domain_not_shut_off_or_raise(instance)
        # 根据不同的inspector,从hypervisor上获取cpu时间
        stats = self.connection.domainListGetStats([domain], 0)[0][1]
        cpu_time = 0
        current_cpus = stats.get('vcpu.current')
      
        for vcpu in six.moves.range(stats.get('vcpu.maximum', 0)):
            try:
                cpu_time += (stats.get('vcpu.%s.time' % vcpu) +
                             stats.get('vcpu.%s.wait' % vcpu))
                current_cpus -= 1
            except TypeError:

                pass
           ......
        return virt_inspector.CPUStats(number=stats['vcpu.current'],
                                       time=cpu_time)

像上边介绍过的,Pollster需要实现两个接口:

  • default_discovery:指定默认的discover
  • get_samples:对每个CPU获取采样数据

7. disocover示例

class InstanceDiscovery(plugin_base.DiscoveryBase):
    
    def discover(self, manager, param=None):
        """Discover resources to monitor."""
        ......
        instances = []
        ......
        if not self.last_run or secs_from_last_update >= self.expiration_time:
            try:
               ......
                instances = self.nova_cli.instance_get_all_by_host(
                    cfg.CONF.host)
                self.last_run = utc_now
            except Exception:
                ......

        for instance in instances:
            if getattr(instance, 'OS-EXT-STS:vm_state', None) in ['deleted',
                                                                  'error']:
                self.instances.pop(instance.id, None)
            else:
                self.instances[instance.id] = instance

        return self.instances.values()

可以看到上面的CPUPollster所指定的Discover中获取所有的健康的instance列表, 这些instance列表最终会作为数据来源传给CPUPollster的get_samples的resources参数

8. 其他

除了上述提到的内容外,还有一些点需要注意:

  • polling agent采用tooz实现了agent的高可用,不同的agent实例之间通过tooz进行通信。在base.AgentManager的初始化和运行过程中都有相关处理,其具体实现可以在ceilometer/coordination.py中看到。;
  • 除了上述动态加载Pollster和Discover的方式外,pipeline还提供的静态的加载方式,可以在pipeline文件中通过sources的resources和discovery参数指定。

以上代码基于N版的Ceilometer

参考:
https://www.dazhuanlan.com/2019/11/20/5dd4c5028e4c4/
https://blog.csdn.net/mengalong/article/details/81239370
https://catkang.github.io/2015/11/03/source-ceilometer-polling.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354

推荐阅读更多精彩内容