Ceilometer 源码学习- Polling Agent

简介

Ceilometer是Openstack中用于数据采集的基础设施,包括多个组件:Central Agent,Compute Agent,Notification Agent,Collector等。其中Central Agent和Compute Agent分别运行在控制节点和计算节点上,通过定期调用其他服务的api来完成数据采集。由于二者的区别只是所负责的数据来源,这里我们统称为Polling Agent。

功能作用

Polling Agent的功能很简单:
“周期性地向其他服务主动拉取需要的数据,并将数据发送到消息队列。”
结构图如下:

image.png

站在设计者的角度,要完成上述功能,需要处理的有如下几个基本问题:

  • 怎么执行拉取?
  • 向哪些服务拉取数据,周期怎么定义的?
  • 对于某个服务收集哪些数据以及如何收集?

下面分别针对上述问题依次介绍Ceilometer的实现方式:

  1. 常驻进程:自然的我们需要一个常驻进程来完成上述调度任务,基本操作包括:

    • 记录全局状态;
    • 周期性的触发;
    • 负责消息的发送。
  2. 插件形式:Ceilometer中用定义插件的方式定义多个收集器(Pollster),程序从配置文件中获得需要加载的收集器列表,用插件的形式是一个很好的选择,因为:

    • python对插件的良好支持:stevedore
    • 简化核心逻辑;
    • 方便扩展。
  3. 共同基类:数据来源多种多样,针对不同的数据来源获取数据方式各有不同,但他们需要完成同样的的动作,Ceilometer中设计Pollster的共同基类,定义了如下接口,是每个Pollster都是要实现的:

    • 默认获取数据来源的方式:default_discovery;
    • 拉取数据:get_samples

流程简介

正是由于上面所说的实现方式使得Polling Agent的核心逻辑变得非常简单,不需要关注具体的数据收集过程,而将自己解放成一个调度管理者,下面将简单介绍其实现逻辑。在此之前为了方便说明,先介绍其中涉及到的角色或组件:

AgentManager:Polling Agent的核心类,Central Agent和Compute Agent用不同的参数初始化AgentManager;
Pollster:数据收集器,以插件的形式动态载入;
Discover:以一定的方式发现数据源Resource;
Pipeline:Ceilometer通过pipleline.yml文件的形式定义了所收集数据的一系列转换发送操作,很好的降低了各组件的耦合性和系统的复杂性。该文件中以sources标签定义了不同数据的分组信息,这部分是在Polling Agent中需要关心的;
PollingTask:望文生义,表示一个拉取数据的任务;
Resource:代表一个可提供数据的源,如一个镜像或一个虚拟机实例

基本流程如下:

AgentManger初始化,主要完成如下动作:
从配置文件中动态加载所有收集器插件Pollster;
从配置文件中动态加载所有资源发现器插件Discover。

AgentManger启动:
从pipeline文件读取sources信息;
为每一个从文件中加载的Pollster根据Pipeline信息分配一个PollingTask;
为每个PollingTask建立Timer定时执行。

PollingTask执行:
通过Pollster的default_discovery函数定义,从已加载的资源发现器Discover中选取合适的一个;
调用Discover的discovery函数获取Resource;
调用Pollster的get_samples函数,从Resource中获得采样数据;
发送给消息队列。

1. 入口

  • Ceilometer采用pbr的方式管理配置,
  • setup.cfg中定义了Polling Agent 入口位置,如下:
console_scripts =
    ceilometer-polling = ceilometer.cmd.polling:main

2. ceilometer.cmd.polling.main

相应的,在ceilometer.cmd.polling.main文件中找到该函数,如下:

def main():
    service.prepare_service()
    sm = cotyledon.ServiceManager()
    sm.add(create_polling_service)
    sm.run()
def create_polling_service(worker_id):
    return manager.AgentManager(CONF.polling_namespaces,
                                CONF.pollster_list,
                                worker_id)
  • prepare_service中做了一些初始化工作,如初始化日志,加载配置文件等;
  • create_polling_service为核心,配置并启动了manager.AgentManager,进一步了解到主要工作发生在该类的中,即AgentManager

3. AgentManager初始化

ceilometer.agent.manager.AgentManager初始化部分代码,如下:

class AgentManager(service_base.PipelineBasedService):

    def __init__(self, namespaces=None, pollster_list=None, worker_id=0):
        .......
        # 从配置文件中动态加载收集器Pollster
        extensions = (self._extensions('poll', namespace).extensions
                      for namespace in namespaces)
        .......

        self.extensions = list(itertools.chain(*list(extensions))) + list(
            itertools.chain(*list(extensions_fb)))
        .......
        # 从配置文件中动态加载资源发现器Discoveries
        discoveries = (self._extensions('discover', namespace).extensions
                       for namespace in namespaces)
        self.discoveries = list(itertools.chain(*list(discoveries)))
        .......
  • 可以看出_extensions函数中通过stevedore加载了配置文件中的对应namespace下的插件;
  • 初始化过程init中,主要做了两件事情:
    • 加载ceilometer.poll.central/compute下的插件到self.extensions,即上面所说的收集器Pollster;
    • 加载ceilometer.discover下的插件到self.discovery_manager,即上面所说的资源发现器Discover。
      而在配置文件setup.cfg中可以看到对应的定义,截取部分在这里:
...
ceilometer.poll.central =
      ip.floating = ceilometer.network.floatingip:FloatingIPPollster
      image = ceilometer.image.glance:ImagePollster
      image.size = ceilometer.image.glance:ImageSizePollster
      ...
...   
ceilometer.discover =
      local_instances = ceilometer.compute.discovery:InstanceDiscovery
      endpoint = ceilometer.agent.discovery.endpoint:EndpointDiscovery
      tenant = ceilometer.agent.discovery.tenant:TenantDiscovery
      ...
 ...

4. AgentManager启动运行

在初始化完AgentManager后,就需要启动这个AgentManager,代码实现如下:

    def run(self):
        # 读取pipeline.yaml配置文件
        self.polling_manager = pipeline.setup_polling()
         ......
        # 定时拉取
        self.start_polling_tasks()
         ......

下面分别介绍这两行代码的功能:

  • pipeline.setup_polling中加载解析pipeline.yaml文件,来看一个pipeline.yaml中的示例,更多内容:pipeline
---
  sources:
      - name: meter_source
        interval: 600
        meters:
            - "*"
        sinks:
            - meter_sink
      - name: cpu_source
        ...
      ...
  sinks:
      - name: meter_sink
        transformers:
        publishers:
      ...
  ...

ceilometer中用pipeline配置文件的方式定义meter数据从收集到处理到发送的过程,在Polling Agent中我们只需要关心sources部分,在上述pipeline.setup_polling()中读取pipeline文件并解析封装其中的sources内容,供后面使用。

  • start_polling_tasks代码如下:
    def start_polling_tasks(self):
        ......
        # 创建PollingTask
        data = self.setup_polling_tasks()
        ......
      # PollingTask定时执行
        for interval, polling_task in data.items():
            delay_time = (interval + delay_polling_time if delay_start
                          else delay_polling_time)

            @periodics.periodic(spacing=interval, run_immediately=False)
            def task(running_task):
                self.interval_task(running_task)

            utils.spawn_thread(utils.delayed, delay_time,
                               self.polling_periodics.add, task, polling_task)

其中,setup_polling_tasks中新建PollingTask,并根据上一步中封装的sources内容,将每一个收集器Pollster根据其interval设置分配到不同的PollingTask中,interval相同的收集器会分配到同一个PollingTask中。之后每个PollingTask都根据其运行周期设置Timer定时执行。 注意,其中interval_task函数指定timer需要执行的任务。

5. PollingTask 执行

    def poll_and_notify(self):
        """Polling sample and notify."""
        ......
        # 循环处理PollingTask中的每一个收集器Pollster
        for source_name in self.pollster_matches:
            for pollster in self.pollster_matches[source_name]:
                ......
               # Discover发现可用的数据源
                if not candidate_res and pollster.obj.default_discovery:
                    candidate_res = self.manager.discover(
                        [pollster.obj.default_discovery], discovery_cache)
               ......
              # 做一些处理和过滤
               ......
                    # 从数据源处拉取采样数据
                    samples = pollster.obj.get_samples(
                        manager=self.manager,
                        cache=cache,
                        resources=polling_resources
                    )
               ......
                    # 发送数据到消息队列
                    for sample in samples:
                        .....
                        if self._batch:
                            sample_batch.append(sample_dict)
                        else:
                            self._send_notification([sample_dict])

                    if sample_batch:
                        self._send_notification(sample_batch)

                except plugin_base.PollsterPermanentError as err:
                    .....

可以看出,在这段代码中完成了比较核心的几个步骤:

  • 资源发现器Discover发现可用数据源;
  • 收集器Pollster拉取采样数据;
  • 发送数据到消息队列。

6. Pollster示例

上面介绍了Polling Agent中如何是如何加载Pollster执行数据的收集工作的。下面以获取CPU基本信息的CPUPollster为例,看一下具体的实现:

class CPUPollster(pollsters.BaseComputePollster):

    def get_samples(self, manager, cache, resources):
        for instance in resources:
            LOG.debug('checking instance %s', instance.id)
            try:
                cpu_info = self.inspector.inspect_cpus(instance)
                LOG.debug("CPUTIME USAGE: %(instance)s %(time)d",
                          {'instance': instance,
                           'time': cpu_info.time})
                cpu_num = {'cpu_number': cpu_info.number}
                yield util.make_sample_from_instance(
                    instance,
                    name='cpu',
                    type=sample.TYPE_CUMULATIVE,
                    unit='ns',
                    volume=cpu_info.time,
                    additional_metadata=cpu_num,
                    monotonic_time=monotonic.monotonic()
                )
            except virt_inspector.InstanceNotFoundException as err:
               ......
class BaseComputePollster(plugin_base.PollsterBase):

    @property
    def inspector(self):
        try:
            inspector = self._inspector
        except AttributeError:
            inspector = virt_inspector.get_hypervisor_inspector()
            BaseComputePollster._inspector = inspector
        return inspector

    @property
    def default_discovery(self):
        return 'local_instances'

      ......

代码路径ceilometer.compute.virt.libvirt.inspector.LibvirtInspector.inspect_cpus

 def inspect_cpus(self, instance):
        domain = self._get_domain_not_shut_off_or_raise(instance)
        # 根据不同的inspector,从hypervisor上获取cpu时间
        stats = self.connection.domainListGetStats([domain], 0)[0][1]
        cpu_time = 0
        current_cpus = stats.get('vcpu.current')
      
        for vcpu in six.moves.range(stats.get('vcpu.maximum', 0)):
            try:
                cpu_time += (stats.get('vcpu.%s.time' % vcpu) +
                             stats.get('vcpu.%s.wait' % vcpu))
                current_cpus -= 1
            except TypeError:

                pass
           ......
        return virt_inspector.CPUStats(number=stats['vcpu.current'],
                                       time=cpu_time)

像上边介绍过的,Pollster需要实现两个接口:

  • default_discovery:指定默认的discover
  • get_samples:对每个CPU获取采样数据

7. disocover示例

class InstanceDiscovery(plugin_base.DiscoveryBase):
    
    def discover(self, manager, param=None):
        """Discover resources to monitor."""
        ......
        instances = []
        ......
        if not self.last_run or secs_from_last_update >= self.expiration_time:
            try:
               ......
                instances = self.nova_cli.instance_get_all_by_host(
                    cfg.CONF.host)
                self.last_run = utc_now
            except Exception:
                ......

        for instance in instances:
            if getattr(instance, 'OS-EXT-STS:vm_state', None) in ['deleted',
                                                                  'error']:
                self.instances.pop(instance.id, None)
            else:
                self.instances[instance.id] = instance

        return self.instances.values()

可以看到上面的CPUPollster所指定的Discover中获取所有的健康的instance列表, 这些instance列表最终会作为数据来源传给CPUPollster的get_samples的resources参数

8. 其他

除了上述提到的内容外,还有一些点需要注意:

  • polling agent采用tooz实现了agent的高可用,不同的agent实例之间通过tooz进行通信。在base.AgentManager的初始化和运行过程中都有相关处理,其具体实现可以在ceilometer/coordination.py中看到。;
  • 除了上述动态加载Pollster和Discover的方式外,pipeline还提供的静态的加载方式,可以在pipeline文件中通过sources的resources和discovery参数指定。

以上代码基于N版的Ceilometer

参考:
https://www.dazhuanlan.com/2019/11/20/5dd4c5028e4c4/
https://blog.csdn.net/mengalong/article/details/81239370
https://catkang.github.io/2015/11/03/source-ceilometer-polling.html

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容