简介
Ceilometer是Openstack中用于数据采集的基础设施,包括多个组件:Central Agent,Compute Agent,Notification Agent,Collector等。其中Central Agent和Compute Agent分别运行在控制节点和计算节点上,通过定期调用其他服务的api来完成数据采集。由于二者的区别只是所负责的数据来源,这里我们统称为Polling Agent。
功能作用
Polling Agent的功能很简单:
“周期性地向其他服务主动拉取需要的数据,并将数据发送到消息队列。”
结构图如下:
站在设计者的角度,要完成上述功能,需要处理的有如下几个基本问题:
- 怎么执行拉取?
- 向哪些服务拉取数据,周期怎么定义的?
- 对于某个服务收集哪些数据以及如何收集?
下面分别针对上述问题依次介绍Ceilometer的实现方式:
-
常驻进程:自然的我们需要一个常驻进程来完成上述调度任务,基本操作包括:
- 记录全局状态;
- 周期性的触发;
- 负责消息的发送。
-
插件形式:Ceilometer中用定义插件的方式定义多个收集器(Pollster),程序从配置文件中获得需要加载的收集器列表,用插件的形式是一个很好的选择,因为:
- python对插件的良好支持:stevedore
- 简化核心逻辑;
- 方便扩展。
-
共同基类:数据来源多种多样,针对不同的数据来源获取数据方式各有不同,但他们需要完成同样的的动作,Ceilometer中设计Pollster的共同基类,定义了如下接口,是每个Pollster都是要实现的:
- 默认获取数据来源的方式:default_discovery;
- 拉取数据:get_samples
流程简介
正是由于上面所说的实现方式使得Polling Agent的核心逻辑变得非常简单,不需要关注具体的数据收集过程,而将自己解放成一个调度管理者,下面将简单介绍其实现逻辑。在此之前为了方便说明,先介绍其中涉及到的角色或组件:
AgentManager:Polling Agent的核心类,Central Agent和Compute Agent用不同的参数初始化AgentManager;
Pollster:数据收集器,以插件的形式动态载入;
Discover:以一定的方式发现数据源Resource;
Pipeline:Ceilometer通过pipleline.yml文件的形式定义了所收集数据的一系列转换发送操作,很好的降低了各组件的耦合性和系统的复杂性。该文件中以sources标签定义了不同数据的分组信息,这部分是在Polling Agent中需要关心的;
PollingTask:望文生义,表示一个拉取数据的任务;
Resource:代表一个可提供数据的源,如一个镜像或一个虚拟机实例
基本流程如下:
AgentManger初始化,主要完成如下动作:
从配置文件中动态加载所有收集器插件Pollster;
从配置文件中动态加载所有资源发现器插件Discover。
AgentManger启动:
从pipeline文件读取sources信息;
为每一个从文件中加载的Pollster根据Pipeline信息分配一个PollingTask;
为每个PollingTask建立Timer定时执行。
PollingTask执行:
通过Pollster的default_discovery函数定义,从已加载的资源发现器Discover中选取合适的一个;
调用Discover的discovery函数获取Resource;
调用Pollster的get_samples函数,从Resource中获得采样数据;
发送给消息队列。
1. 入口
- Ceilometer采用pbr的方式管理配置,
- setup.cfg中定义了Polling Agent 入口位置,如下:
console_scripts =
ceilometer-polling = ceilometer.cmd.polling:main
2. ceilometer.cmd.polling.main
相应的,在ceilometer.cmd.polling.main文件中找到该函数,如下:
def main():
service.prepare_service()
sm = cotyledon.ServiceManager()
sm.add(create_polling_service)
sm.run()
def create_polling_service(worker_id):
return manager.AgentManager(CONF.polling_namespaces,
CONF.pollster_list,
worker_id)
- prepare_service中做了一些初始化工作,如初始化日志,加载配置文件等;
- create_polling_service为核心,配置并启动了manager.AgentManager,进一步了解到主要工作发生在该类的中,即AgentManager
3. AgentManager初始化
ceilometer.agent.manager.AgentManager初始化部分代码,如下:
class AgentManager(service_base.PipelineBasedService):
def __init__(self, namespaces=None, pollster_list=None, worker_id=0):
.......
# 从配置文件中动态加载收集器Pollster
extensions = (self._extensions('poll', namespace).extensions
for namespace in namespaces)
.......
self.extensions = list(itertools.chain(*list(extensions))) + list(
itertools.chain(*list(extensions_fb)))
.......
# 从配置文件中动态加载资源发现器Discoveries
discoveries = (self._extensions('discover', namespace).extensions
for namespace in namespaces)
self.discoveries = list(itertools.chain(*list(discoveries)))
.......
- 可以看出_extensions函数中通过stevedore加载了配置文件中的对应namespace下的插件;
- 初始化过程init中,主要做了两件事情:
- 加载ceilometer.poll.central/compute下的插件到self.extensions,即上面所说的收集器Pollster;
- 加载ceilometer.discover下的插件到self.discovery_manager,即上面所说的资源发现器Discover。
而在配置文件setup.cfg中可以看到对应的定义,截取部分在这里:
...
ceilometer.poll.central =
ip.floating = ceilometer.network.floatingip:FloatingIPPollster
image = ceilometer.image.glance:ImagePollster
image.size = ceilometer.image.glance:ImageSizePollster
...
...
ceilometer.discover =
local_instances = ceilometer.compute.discovery:InstanceDiscovery
endpoint = ceilometer.agent.discovery.endpoint:EndpointDiscovery
tenant = ceilometer.agent.discovery.tenant:TenantDiscovery
...
...
4. AgentManager启动运行
在初始化完AgentManager后,就需要启动这个AgentManager,代码实现如下:
def run(self):
# 读取pipeline.yaml配置文件
self.polling_manager = pipeline.setup_polling()
......
# 定时拉取
self.start_polling_tasks()
......
下面分别介绍这两行代码的功能:
- pipeline.setup_polling中加载解析pipeline.yaml文件,来看一个pipeline.yaml中的示例,更多内容:pipeline;
---
sources:
- name: meter_source
interval: 600
meters:
- "*"
sinks:
- meter_sink
- name: cpu_source
...
...
sinks:
- name: meter_sink
transformers:
publishers:
...
...
ceilometer中用pipeline配置文件的方式定义meter数据从收集到处理到发送的过程,在Polling Agent中我们只需要关心sources部分,在上述pipeline.setup_polling()中读取pipeline文件并解析封装其中的sources内容,供后面使用。
- start_polling_tasks代码如下:
def start_polling_tasks(self):
......
# 创建PollingTask
data = self.setup_polling_tasks()
......
# PollingTask定时执行
for interval, polling_task in data.items():
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
@periodics.periodic(spacing=interval, run_immediately=False)
def task(running_task):
self.interval_task(running_task)
utils.spawn_thread(utils.delayed, delay_time,
self.polling_periodics.add, task, polling_task)
其中,setup_polling_tasks中新建PollingTask,并根据上一步中封装的sources内容,将每一个收集器Pollster根据其interval设置分配到不同的PollingTask中,interval相同的收集器会分配到同一个PollingTask中。之后每个PollingTask都根据其运行周期设置Timer定时执行。 注意,其中interval_task函数指定timer需要执行的任务。
5. PollingTask 执行
def poll_and_notify(self):
"""Polling sample and notify."""
......
# 循环处理PollingTask中的每一个收集器Pollster
for source_name in self.pollster_matches:
for pollster in self.pollster_matches[source_name]:
......
# Discover发现可用的数据源
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
......
# 做一些处理和过滤
......
# 从数据源处拉取采样数据
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
......
# 发送数据到消息队列
for sample in samples:
.....
if self._batch:
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
if sample_batch:
self._send_notification(sample_batch)
except plugin_base.PollsterPermanentError as err:
.....
可以看出,在这段代码中完成了比较核心的几个步骤:
- 资源发现器Discover发现可用数据源;
- 收集器Pollster拉取采样数据;
- 发送数据到消息队列。
6. Pollster示例
上面介绍了Polling Agent中如何是如何加载Pollster执行数据的收集工作的。下面以获取CPU基本信息的CPUPollster为例,看一下具体的实现:
class CPUPollster(pollsters.BaseComputePollster):
def get_samples(self, manager, cache, resources):
for instance in resources:
LOG.debug('checking instance %s', instance.id)
try:
cpu_info = self.inspector.inspect_cpus(instance)
LOG.debug("CPUTIME USAGE: %(instance)s %(time)d",
{'instance': instance,
'time': cpu_info.time})
cpu_num = {'cpu_number': cpu_info.number}
yield util.make_sample_from_instance(
instance,
name='cpu',
type=sample.TYPE_CUMULATIVE,
unit='ns',
volume=cpu_info.time,
additional_metadata=cpu_num,
monotonic_time=monotonic.monotonic()
)
except virt_inspector.InstanceNotFoundException as err:
......
class BaseComputePollster(plugin_base.PollsterBase):
@property
def inspector(self):
try:
inspector = self._inspector
except AttributeError:
inspector = virt_inspector.get_hypervisor_inspector()
BaseComputePollster._inspector = inspector
return inspector
@property
def default_discovery(self):
return 'local_instances'
......
代码路径ceilometer.compute.virt.libvirt.inspector.LibvirtInspector.inspect_cpus
def inspect_cpus(self, instance):
domain = self._get_domain_not_shut_off_or_raise(instance)
# 根据不同的inspector,从hypervisor上获取cpu时间
stats = self.connection.domainListGetStats([domain], 0)[0][1]
cpu_time = 0
current_cpus = stats.get('vcpu.current')
for vcpu in six.moves.range(stats.get('vcpu.maximum', 0)):
try:
cpu_time += (stats.get('vcpu.%s.time' % vcpu) +
stats.get('vcpu.%s.wait' % vcpu))
current_cpus -= 1
except TypeError:
pass
......
return virt_inspector.CPUStats(number=stats['vcpu.current'],
time=cpu_time)
像上边介绍过的,Pollster需要实现两个接口:
- default_discovery:指定默认的discover
- get_samples:对每个CPU获取采样数据
7. disocover示例
class InstanceDiscovery(plugin_base.DiscoveryBase):
def discover(self, manager, param=None):
"""Discover resources to monitor."""
......
instances = []
......
if not self.last_run or secs_from_last_update >= self.expiration_time:
try:
......
instances = self.nova_cli.instance_get_all_by_host(
cfg.CONF.host)
self.last_run = utc_now
except Exception:
......
for instance in instances:
if getattr(instance, 'OS-EXT-STS:vm_state', None) in ['deleted',
'error']:
self.instances.pop(instance.id, None)
else:
self.instances[instance.id] = instance
return self.instances.values()
可以看到上面的CPUPollster所指定的Discover中获取所有的健康的instance列表, 这些instance列表最终会作为数据来源传给CPUPollster的get_samples的resources参数
8. 其他
除了上述提到的内容外,还有一些点需要注意:
- polling agent采用tooz实现了agent的高可用,不同的agent实例之间通过tooz进行通信。在base.AgentManager的初始化和运行过程中都有相关处理,其具体实现可以在ceilometer/coordination.py中看到。;
- 除了上述动态加载Pollster和Discover的方式外,pipeline还提供的静态的加载方式,可以在pipeline文件中通过sources的resources和discovery参数指定。
以上代码基于N版的Ceilometer
参考:
https://www.dazhuanlan.com/2019/11/20/5dd4c5028e4c4/
https://blog.csdn.net/mengalong/article/details/81239370
https://catkang.github.io/2015/11/03/source-ceilometer-polling.html