1.简介
1.1 服务的功能
获取openstack集群中cinder,nova,neutron, swift,swift-account-usage中的关于虚机,云盘,网络,存储相关的数据整理发送给prometheus server
1.2 代码地址
github链接: https://github.com/CanonicalLtd/prometheus-openstack-exporter.git
分支: master(和tag 0.1.4的版本一致)
1.3 exporter是怎么工作的
- prometheus server端周期性来exporter端抓取数据 (这里的周期是在prometheus server中用scrape_interval定义的)
- 我们的exporter服务启动后,会开启port的监听,一旦监听到有请求,会调用exporter里的一系列函数,将整理好的metrics返回给prometheus server,数据采集成功
2.源码
在看源码之前,我们需要知道的prometheus_client的一些概念
2.1 关于prometheus_client需要知道的这些
registry:就是一个监控项仓库,可以有很多个collector
collector:就是一个自定义的监控项收集器,一个collector中可以有很多个metrics
metrics: 监控项
metrics类型:count,gauge,summary,histogram
- count类型:计数器 特点:只增不减,可以是浮点型,不一定非要是整数
- gauge类型:瞬时值 特点:随意增减,浮点整数
- summary类型:存储了 quantile数据,针对于长尾效应设计的,可画出准确的正态分布图
- histogram类型: 收集正态分布数据,通过划分bucket,但是只是计数,比较粗略,客户端性能开销跟count和gauge差不多,图的准确性不如summary高,可画出粗略一点的正态分布图
其中 gauge、histogram、summary这三种metrics都没在这个exporter中出现,为了metrics类型完整(一家人就是要整整齐齐),所以把这三个也贴出来了;
写一个简单的collector举例:
from prometheus_client import Counter, Gauge, Histogram, Summary, \
CollectorRegistry, generate_latest, start_http_server
import time
import random
class TestCollector():
def __init__(self):
self.registry = CollectorRegistry()
self.count_metric = Counter('counter_metric_name',
'A counter metirc decription test')
self.gauge_metric = Gauge('gauge_metric_name',
'A gauge metric description test')
self.histogram_metric = Histogram('histogram_metric_name',
'A histogram description test',
buckets=(-5, 0, 5))
self.summary_metric = Summary('summary_metric_name',
'A summary description test',
['label1', 'label2'])
def test(self):
self.count_metric.inc(1)
self.gauge_metric.set(random.random())
self.histogram_metric.observe(random.randint(-10, 10))
self.summary_metric.labels('a', 'b').observe(17)
return generate_latest(registry=self.registry)
if __name__ == '__main__':
collector = TestCollector()
start_http_server(8000)
while True:
collector.test()
time.sleep(2)
获取到的metric如下,我省略掉了一些prometheus_client自带的变量
# HELP summary_metric_name A summary description test
# TYPE summary_metric_name summary
summary_metric_name_count{label1="a",label2="b"} 1678.0
summary_metric_name_sum{label1="a",label2="b"} 28526.0
# TYPE summary_metric_name_created gauge
summary_metric_name_created{label1="a",label2="b"} 1.584496763390804e+09
# HELP gauge_metric_name A gauge metric description test
# TYPE gauge_metric_name gauge
gauge_metric_name 0.8280131429442902
# HELP counter_metric_name_total A counter metirc decription test
# TYPE counter_metric_name_total counter
counter_metric_name_total 1678.0
# TYPE counter_metric_name_created gauge
counter_metric_name_created 1.584496762311975e+09
# HELP histogram_metric_name A histogram description test
# TYPE histogram_metric_name histogram
histogram_metric_name_bucket{le="-5.0"} 457.0
histogram_metric_name_bucket{le="0.0"} 848.0
histogram_metric_name_bucket{le="5.0"} 1262.0
histogram_metric_name_bucket{le="+Inf"} 1678.0
histogram_metric_name_count 1678.0
histogram_metric_name_sum 327.0
# TYPE histogram_metric_name_created gauge
histogram_metric_name_created 1.584496762312121e+09
2.2 代码层逻辑实现:
好了,现在开始干正事,代码的逻辑是下边这样的:
- 开启一个线程,周期性(这里的周期是配置文件中的cache_refresh_interval定义的)的在线程内
- 通过环境变量获取到openstack的client()
- 使用openstack的组件client()获取我们想要的数据,并将数据缓存至本地的二进制文件中
- 开启一个事件监听循环:
- 当有请求 /metrics的时候,在/metrics路由下 将各个组件的collector监控数据组合到一起,返回给客户端
- 各个组件(nova,cinder,neutron,swift)的collector类从上边已经缓存的缓存文件中纷纷读取自己的数据,做处理,放到自己的registry里,再返回到metrics路由
2.3源码
1. 从程序入口开始:
if __name__ == '__main__':
parser = argparse.ArgumentParser(usage=__doc__,
description='Prometheus OpenStack exporter',
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('config_file', nargs='?',
help='Configuration file path',
default='/etc/prometheus/prometheus-openstack-exporter.yaml',
type=argparse.FileType('r'))
args = parser.parse_args()
# 设置log的level,然后把log写到本地,或将日志输出到流 sys.stderr,sys.stdout
log.setLevel(logging.DEBUG)
for logsock in ('/dev/log', '/var/run/syslog'):
if path.exists(logsock):
log.addHandler(logging.handlers.SysLogHandler(address=logsock))
else:
log.addHandler(logging.StreamHandler())
# 加载下配置文件
config = yaml.safe_load(args.config_file.read())
numeric_log_level = getattr(logging, config.get('log_level', 'INFO').upper(), None)
if not isinstance(numeric_log_level, int):
raise ValueError('Invalid log level: %s' % config.get('log_level'))
log.setLevel(numeric_log_level)
# 开启获取集群数据的线程
data_gatherer = None
if data_gatherer_needed(config):
data_gatherer = DataGatherer()
data_gatherer.start()
# 开启HTTPServer监听
server = ForkingHTTPServer(('', config.get('listen_port')), handler)
server.serve_forever()
1.先加载到配置文件,2.初始化记录log,3.开启数据收集(data_gatherer.start()),4.启动事件监听(server.serve_forever())
接下来先从数据收集开始
2. 开启线程,收集集群数据 | DataGatherer(Thread)
单独开启一个线程,周期性通过client端获取我们所想要的数据,重写Thread的run()方法来完成我们想要在线程中进行的操作
class DataGatherer(Thread):
def run():
prodstack = {}
while True:
# 假如 prodstack = {} 写下这,每次刷新数据的时候都会把他先置空,再复制,倒不如直接update一步到位
try:
# 在这里获取client, 并将各个client的处理好的数据,更新到prodstack里
keystone, nova, neutron, cinder = get_clients()
prodstack.update(self._get_keystone_info(keystone))
prodstack.update(self._get_neutron_info(neutron))
prodstack.update(self._get_nova_info(nova, cinder, prodstack))
except Exception:
# Ignore failures, we will try again after refresh_interval.
# Most of them are termporary ie. connectivity problmes
# To alert on stale cache use openstack_exporter_cache_age_seconds metric
log.critical("Error getting stats: {}".format(traceback.format_exc()))
else:
# 在本地的二进制文件中写入加密的获取到的client的数据,作为缓存
with open(self.cache_file + '.new', "wb+") as f:
pickle.dump((prodstack, ), f, pickle.HIGHEST_PROTOCOL)
rename(self.cache_file + '.new', self.cache_file)
log.debug("Done dumping stats to {}".format(self.cache_file))
self.duration = time() - start_time
sleep(self.refresh_interval) #间隔多久再重新获取一次数据
在这里获取到client后,self_get_keystone_info()这样的函数是将从client端的有用的数据整理成字典格式,其他的get_xxx_info()也是这样的效果,将各个client端的数据累积到prodstack{}中,再已二进制的格式加密写进cache文件.
Q: 为什么要加缓存呢,实时获取数据不好吗?
A:实时获取数据对于监控来说当然更精确,但是因为我们的查询比较繁琐,在集群规模比较大的情况下,查询一次可能需要几分钟,频繁的调API去抓取或者多个exporter抓取集群数据,也可能会导致集群响应延迟更高,还会给集群造成负面影响。
3. HTTPServer的框架 | ForkingHTTPServer(..)
单独把起HTTPServer的框架摘出来, 起一个异步进程HTTPserver并监听, 相关的代码是这样的
from BaseHTTPServer import BaseHTTPRequestHandler
from BaseHTTPServer import HTTPServer
from SocketServer import ForkingMixIn
...
class OpenstackExporterHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def do_GET(self):
......
class ForkingHTTPServer(ForkingMixIn, HTTPServer):
pass
def handler(*args, **kwargs):
OpenstackExporterHandler(*args, **kwargs)
if __name__ == '__main__':
.......
server = ForkingHTTPServer(('', config.get('listen_port')), handler)
server.serve_forever()
自己创建的ForkingHTTPServer:
ForkingHTTPServer:这里的ForkingHTTPServer继承自HTTPServer和ForkingMixIn,主要作用,异步起个进程,监听socket;
引用的模块中的两个类,这俩类的功能:
HTTPServer:继承自SocketServer.TCPServer, HTTPServer主要实现启动事件监听和address port的绑定,并将请求交于handler处理
ForkingMIxIn是继承于SocketServer用来做异步处理的类,它重载了process_request()方法,当出现新请求时创建一个新的进程,把具体工作放到新的进程中执行.
再来看一下我们的处理请求的Handler类
OpenstackExporterHandler:继承自BaseHTTPRequestHandler
OpenstackExporterHandler中定义do_GET来处理请求
BaseHTTPRequestHandler 是http handler的基类,无法直接使用,需要定义请求处理函数,所以我们重写do_GET()函数处理请求
关于server和handler的关系
python http框架主要有server和handler组成,server主要是用于建立网络模型,例如利用epoll监听socket;handler用于处理各个就绪的socket,就是server是用来启动事件监听,handler是来处理事件的。
总结一下就是当执行到下面这段代码时:
server = ForkingHTTPServer(('', config.get('listen_port')), handler)
server.serve_forever()
开启一个事件循环监听,(如果address是本地的话,可以在本地看到绑定的address和port),可以在本地看到并将开启一个新进程来处理接受到的请求;
4.获取client | get_clients()
接着看在线程中是怎么获取client()的
从当前环境变量中获取需要的认证信息,然后获取各个openstack组件的client()
def get_clients():
ks_version = int(env.get('OS_IDENTITY_API_VERSION', 2))
if ks_version == 2:
from keystoneclient.v2_0 import client as keystone_client
# Legacy v2 env vars 在v2版本里用到了这些环境变量
# OS_USERNAME OS_PASSWORD OS_TENANT_NAME OS_AUTH_URL OS_REGION_NAME
ks_creds = get_creds_dict("username", "password", "tenant_name",
"auth_url", "region_name")
# ks_creds打印:{'username': 'admin', 'tenant_name': 'admin', 'password': 'admin', 'auth_url': 'http://192.168.1.106/identity/v3', 'region_name': 'RegionOne'}
cacert = maybe_get_cacert()
if cacert:
ks_creds["cacert"] = cacert
nova_creds = [2] + get_creds_list("username", "password", "tenant_name",
"auth_url")
cinder_creds = get_creds_list("username", "password", "tenant_name",
"auth_url")
# nova_creds打印:[2, 'admin', 'admin', 'admin', 'http://192.168.1.106/identity/v3']
# cinder_creds打印:['admin', 'admin', 'admin', 'http://192.168.1.106/identity/v3']
keystone = keystone_client.Client(**ks_creds)
nova = nova_client.Client(*nova_creds, cacert=cacert)
neutron = neutron_client.Client(**ks_creds)
cinder = cinder_client.Client(*cinder_creds, cacert=cacert)
elif ks_version == 3:
from keystoneauth1.identity import v3
from keystoneauth1 import session
from keystoneclient.v3 import client
# 在v3版本里用到了这些环境变量
# OS_USERNAME OS_PASSWORD OS_USER_DOMAIN_NAME OS_AUTH_URL
# OS_PROJECT_DOMAIN_NAME OS_PROJECT_DOMAIN_ID OS_PROJECT_ID OS_DOMAIN_NAME
ks_creds_domain = get_creds_dict(
"username", "password", "user_domain_name", "auth_url",
"project_domain_name", "project_name", "project_domain_id", "project_id")
#ks_creds_domain输出:{'username': 'admin', 'user_domain_name': 'default', 'password': 'admin', 'auth_url': 'http://192.168.1.106/identity/v3'}
# Need non-domain creds to get full catalog
ks_creds_admin = get_creds_dict(
"username", "password", "user_domain_name", "auth_url",
"project_domain_name", "project_name", "project_domain_id", "project_id")
# ks_creds_admin输出: {'username': 'admin', 'user_domain_name': 'default', 'password': 'admin', 'auth_url': 'http://192.168.1.106/identity/v3'}
auth_domain = v3.Password(**ks_creds_domain)
auth_admin = v3.Password(**ks_creds_admin)
# Need to pass in cacert separately
verify = maybe_get_cacert()
if verify is None:
verify = True
sess_domain = session.Session(auth=auth_domain, verify=verify)
sess_admin = session.Session(auth=auth_admin, verify=verify)
keystone = client.Client(session=sess_domain)
nova = nova_client.Client(2, session=sess_admin)
neutron = neutron_client.Client(session=sess_admin)
cinder = cinder_client.Client(session=sess_admin)
一个openrc示例:
export OS_USERNAME=admin
export OS_TENANT_NAME=admin
export OS_PASSWORD=admin
export OS_REGION_NAME=RegionOne
export OS_AUTH_URL=http://192.168.1.106/identity/v3
export OS_IDENTITY_API_VERSION=3
export OS_USER_DOMAIN_ID=default
export OS_USER_DOMAIN_NAME=default
export OS_PROJECT_DOMAIN_ID=default
export OS_PROJECT_NAME=admin
5. 来看看handler怎么处理给客户端返回数据的 |OpenstackExporterHandler(..)
在OpenstackExporterHandler中
# 在这把collector的和具体实现collector的类的映射关系放到了字典里
COLLECTORS = {
'cinder': Cinder,
'neutron': Neutron,
'nova': Nova,
'swift': Swift,
'swift-account-usage': SwiftAccountUsage,
}
DATA_GATHERER_USERS = [
'cinder',
'neutron',
'nova',
]
class OpenstackExporterHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def do_GET(self):
url = urlparse.urlparse(self.path)
if url.path == '/metrics':
try:
# collectors取配置文件enable_collectors和本地可获取的所有collector的交集,
collectors = [COLLECTORS[collector]() for collector in get_collectors(config.get('enabled_collectors'))]
log.debug("Collecting stats..")
output = ''
# 通过collector的get_stats()获取单个collector整理好的自己registry
for collector in collectors:
output += collector.get_stats()
#
if data_gatherer:
output += data_gatherer.get_stats()
self.send_response(200)
self.send_header('Content-Type', CONTENT_TYPE_LATEST)
self.end_headers()
self.wfile.write(output)
except Exception:
self.send_response(500)
self.end_headers()
self.wfile.write(traceback.format_exc())
OpenstackExporterHandler集成自BaseHTTPRequestHandler,通过重写do_GET()函数,响应请求
self.wfile也是BaseHTTPRequestHandler里方法,用来写入应答信息
self.send_header(..), self.send_response(..) 这些都是BaseHTTPRequestHandler中的方法
6. 来看一个Collector是怎么收集数据的 | Nova()
class Nova():
def __init__(self):
# 创建一个Collector仓库
self.registry = CollectorRegistry()
self.prodstack = {}
# 从我们之前创建的cache_file中读取数据
with open(config['cache_file'], 'rb') as f:
self.prodstack = pickle.load(f)[0]
self.hypervisors = self.prodstack['hypervisors']
.......
def gen_hypervisor_stats(self):
# 注册监控项
vms = Gauge('hypervisor_running_vms', 'Number of running VMs', labels, registry=self.registry)
vcpus_total = Gauge('hypervisor_vcpus_total', 'Total number of vCPUs', labels, registry=self.registry)
......
for h in self.hypervisors:
# 给监控项赋值
vms.labels(*label_values).set(squashnone(h['running_vms']))
vcpus_total.labels(*label_values).set(squashnone(h['vcpus']))
.....
def get_stats(self):
log.debug("get_stats")
# 下面这些函数都是从缓存文件中获取数据,监控项注册并赋值
self.gen_hypervisor_stats()
self.gen_instance_stats()
self.gen_overcommit_stats()
self.gen_quota_stats()
# 将收集到的监控项都注册到自己仓库中,并返回
return generate_latest(self.registry)
通过get_stats()返回registry
再来看一下DataGatherer的get_stats()
class DataGatherer(Thread):
def get_stats(self):
registry = CollectorRegistry()
labels = ['cloud']
# age 是最新的缓存文件存在的时间
age = Gauge('openstack_exporter_cache_age_seconds',
'Cache age in seconds. It can reset more frequently '
'than scraping interval so we use Gauge',
labels, registry=registry)
label_values = [config['cloud']]
age.labels(*label_values).set(time() - path.getmtime(self.cache_file))
# duration 是重新获取一次数据client开始到返回结果的时间
duration = Gauge('openstack_exporter_cache_refresh_duration_seconds',
'Cache refresh duration in seconds.',
labels, registry=registry)
duration.labels(*label_values).set(self.duration)
return generate_latest(registry)
7.所有的metrics列表
待补充
3. grafana-dashboard
对应的dashboard都写好了:
https://grafana.com/grafana/dashboards/7924/reviews
参考链接:
关于HTTPServer的:http://luodw.cc/2016/11/05/python-http/
metrics参考: https://segmentfault.com/a/1190000018372390
metrics参考2: https://www.infoq.cn/article/Prometheus-theory-source-code
源码解析:https://blog.csdn.net/qingyuanluofeng/article/details/84779737