Gnocchi processing流程走读+关键字段debug打印

简介

processing是gnocchi处理数据的核心流程。

程序入口:

class MetricProcessor(MetricProcessBase):
    name = "processing"

    def __init__(self, worker_id, conf, queue):
        super(MetricProcessor, self).__init__(worker_id, conf, 0)
        self.queue = queue

    def _run_job(self):
        try:
            try:
                metrics = self.queue.get(block=True, timeout=10)
            except six.moves.queue.Empty:
                # NOTE(sileht): Allow the process to exit gracefully every
                # 10 seconds
                return
            self.store.process_background_tasks(self.index, metrics)
        except Exception:
            LOG.error("Unexpected error during measures processing",
                      exc_info=True)

解析:
1.metrics是从多进程队列queue中获取metrics值,metrics列表是metrics id列表

(Pdb) metrics
[u'a08ca3ac-d834-4bf6-b89d-1f8576976287', u'd4bd1d46-8df2-4436-8b39-fba6e732b2b5', u'11065ce7-1baf-425d-916a-4ff8e37423d7', u'9c8310fe-3110-49fe-85c5-49d91c283dcc']

gnocchi.storage.StorageDriver.process_background_tasks

    def process_background_tasks(self, index, metrics, sync=False):
        """Process background tasks for this storage.

        This calls :func:`process_new_measures` to process new measures

        :param index: An indexer to be used for querying metrics
        :param block_size: number of metrics to process
        :param sync: If True, then process everything synchronously and raise
                     on error
        :type sync: bool
        """
        LOG.debug("Processing new measures")
        try:
            self.process_new_measures(index, metrics, sync)
        except Exception:
            if sync:
                raise
            LOG.error("Unexpected error during measures processing",
                      exc_info=True)

gnocchi.storage._carbonara.CarbonaraBasedStorage.process_new_measures

    def process_new_measures(self, indexer, metrics_to_process,
                             sync=False):
        metrics = indexer.list_metrics(ids=metrics_to_process)
        # This build the list of deleted metrics, i.e. the metrics we have
        # measures to process for but that are not in the indexer anymore.
        deleted_metrics_id = (set(map(uuid.UUID, metrics_to_process))
                              - set(m.id for m in metrics))
        for metric_id in deleted_metrics_id:
            # NOTE(jd): We need to lock the metric otherwise we might delete
            # measures that another worker might be processing. Deleting
            # measurement files under its feet is not nice!
            try:
                with self._lock(metric_id)(blocking=sync):
                    self.incoming.delete_unprocessed_measures_for_metric_id(
                        metric_id)
            except coordination.LockAcquireFailed:
                LOG.debug("Cannot acquire lock for metric %s, postponing "
                          "unprocessed measures deletion", metric_id)

        for metric in metrics:
            lock = self._lock(metric.id)
            # Do not block if we cannot acquire the lock, that means some other
            # worker is doing the job. We'll just ignore this metric and may
            # get back later to it if needed.
            if not lock.acquire(blocking=sync):
                continue
            try:
                locksw = timeutils.StopWatch().start()
                LOG.debug("Processing measures for %s", metric)
                with self.incoming.process_measure_for_metric(metric) \
                        as measures:
                    self._compute_and_store_timeseries(metric, measures)
                LOG.debug("Metric %s locked during %.2f seconds",
                          metric.id, locksw.elapsed())
            except Exception:
                LOG.debug("Metric %s locked during %.2f seconds",
                          metric.id, locksw.elapsed())
                if sync:
                    raise
                LOG.error("Error processing new measures", exc_info=True)
            finally:
                lock.release()

解析:
1.根据metrics_id查找到metrics对象,将被删除的metrics从redis中删除掉。
2.self.incoming.process_measure_for_metric(metric) 这个函数从redis中获取measures对象,是一个带时间戳和value的变量的对象[(Timestamp('2020-12-25 08:03:34.551648'), 0.0)]

(Pdb) metrics
[<Metric 11065ce7-1baf-425d-916a-4ff8e37423d7>, <Metric 9c8310fe-3110-49fe-85c5-49d91c283dcc>, <Metric a08ca3ac-d834-4bf6-b89d-1f8576976287>, <Metric d4bd1d46-8df2-4436-8b39-fba6e732b2b5>]
(Pdb) metrics[0].__dict__
{'status': u'active', '_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7fc3cc482a50>, 'name': u'disk.read.requests.rate', 'creator': u'4c90cbb7dfaa47798790c3cd411e4575:7e88c2ceced8412dab3d80ed131cf53a', 'resource_id': UUID('be25a992-e971-488c-a4c9-64bd6f6bb753'), 'archive_policy': <gnocchi.indexer.sqlalchemy_base.ArchivePolicy object at 0x7fc3cc482bd0>, 'archive_policy_name': u'frequency_300s', 'id': UUID('11065ce7-1baf-425d-916a-4ff8e37423d7'), 'unit': u'request/s'}
(Pdb) measures
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]

gnocchi.storage._carbonara.CarbonaraBasedStorage._compute_and_store_timeseries

    def _compute_and_store_timeseries(self, metric, measures):
        # NOTE(mnaser): The metric could have been handled by
        #               another worker, ignore if no measures.
        if len(measures) == 0:
            LOG.debug("Skipping %s (already processed)", metric)
            return

        measures = sorted(measures, key=operator.itemgetter(0))

        agg_methods = list(metric.archive_policy.aggregation_methods)
        block_size = metric.archive_policy.max_block_size
        back_window = metric.archive_policy.back_window
        definition = metric.archive_policy.definition

        try:
            ts = self._get_unaggregated_timeserie_and_unserialize(
                metric, block_size=block_size, back_window=back_window)
        except storage.MetricDoesNotExist:
            try:
                self._create_metric(metric)
            except storage.MetricAlreadyExists:
                # Created in the mean time, do not worry
                pass
            ts = None
        except CorruptionError as e:
            LOG.error(e)
            ts = None

        if ts is None:
            # This is the first time we treat measures for this
            # metric, or data are corrupted, create a new one
            ts = carbonara.BoundTimeSerie(block_size=block_size,
                                          back_window=back_window)
            current_first_block_timestamp = None
        else:
            current_first_block_timestamp = ts.first_block_timestamp()

        # NOTE(jd) This is Python where you need such
        # hack to pass a variable around a closure,
        # sorry.
        computed_points = {"number": 0}

        def _map_add_measures(bound_timeserie):
            # NOTE (gordc): bound_timeserie is entire set of
            # unaggregated measures matching largest
            # granularity. the following takes only the points
            # affected by new measures for specific granularity
            tstamp = max(bound_timeserie.first, measures[0][0])
            new_first_block_timestamp = bound_timeserie.first_block_timestamp()
            computed_points['number'] = len(bound_timeserie)
            for d in definition:
                ts = bound_timeserie.group_serie(
                    d.granularity, carbonara.round_timestamp(
                        tstamp, d.granularity * 10e8))

                self._map_in_thread(
                    self._add_measures,
                    ((aggregation, d, metric, ts,
                        current_first_block_timestamp,
                        new_first_block_timestamp)
                        for aggregation in agg_methods))

        with timeutils.StopWatch() as sw:
            ts.set_values(measures,
                          before_truncate_callback=_map_add_measures,
                          ignore_too_old_timestamps=True)

            elapsed = sw.elapsed()
            number_of_operations = (len(agg_methods) * len(definition))
            perf = ""
            if elapsed > 0:
                perf = " (%d points/s, %d measures/s)" % (
                    ((number_of_operations * computed_points['number']) /
                        elapsed),
                    ((number_of_operations * len(measures)) / elapsed)
                )
            LOG.debug("Computed new metric %s with %d new measures "
                      "in %.2f seconds%s",
                      metric.id, len(measures), elapsed, perf)

        self._store_unaggregated_timeserie(metric, ts.serialize())

解析:
1.在Gnocchi中有三层数据,resources -> metric -> measure
resoures为资源,每个resource下有很多的计量项metric。计量项下面有具体的采样数据
2.ts包含了之前存存储在ceph里面存储的(时间戳,value值),采样点,粒度,时间间隔等信息。measure里面存放着的当前的需要归档的数据。

(Pdb) metric.__dict__
{'status': u'active', '_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x7fc3cc482a50>, 'name': u'disk.read.requests.rate', 'creator': u'4c90cbb7dfaa47798790c3cd411e4575:7e88c2ceced8412dab3d80ed131cf53a', 'resource_id': UUID('be25a992-e971-488c-a4c9-64bd6f6bb753'), 'archive_policy': <gnocchi.indexer.sqlalchemy_base.ArchivePolicy object at 0x7fc3cc482bd0>, 'archive_policy_name': u'frequency_300s', 'id': UUID('11065ce7-1baf-425d-916a-4ff8e37423d7'), 'unit': u'request/s'}
(Pdb) measures
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]
(Pdb) agg_methods
[u'max', u'mean', u'min']
(Pdb) block_size
86400.0
(Pdb) back_window
0
(Pdb) definition
[{'points': 300, 'granularity': 300.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 900.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 7200.0, 'timespan': 720000.0}, {'points': 200, 'granularity': 86400.0, 'timespan': 17280000.0}]
(Pdb) ts.__dict__
{'back_window': 0, 'block_size': <86400000000000 * Nanos>, 'ts': 2020-12-25 03:59:50.191696     9.739510
2020-12-25 04:04:50.157824     0.120014
2020-12-25 04:09:50.118074     0.000000
2020-12-25 04:14:50.238515     0.000000
2020-12-25 04:19:50.091682     0.000000
2020-12-25 04:24:50.074567     0.000000
2020-12-25 04:29:50.144667     0.046656
2020-12-25 04:34:50.078793     0.000000
2020-12-25 04:39:50.125287     0.000000
2020-12-25 04:44:50.285588     0.000000
2020-12-25 04:49:50.180779     0.003334
2020-12-25 04:54:50.130977     0.000000
2020-12-25 04:59:50.134970     0.000000
2020-12-25 05:04:50.273585     0.000000
2020-12-25 05:09:50.102544     0.003335
2020-12-25 05:14:50.192206     0.000000
2020-12-25 05:19:50.158147     0.000000
2020-12-25 05:24:50.157283     0.000000
2020-12-25 05:29:50.099026     0.000000
2020-12-25 05:34:50.141446     0.000000
2020-12-25 05:39:50.280643     0.000000
2020-12-25 05:43:34.498113     0.000000
2020-12-25 05:48:34.481620     0.000000
2020-12-25 05:53:34.586508     0.000000
2020-12-25 05:58:34.522015     0.000000
2020-12-25 06:03:34.555708     0.000000
2020-12-25 06:08:34.530322     0.000000
2020-12-25 06:13:34.681981     0.000000
2020-12-25 06:18:34.551089     0.000000
2020-12-25 06:23:34.500688     0.000000
2020-12-25 06:28:34.542119     0.000000
2020-12-25 06:33:34.508253     0.000000
2020-12-25 06:39:50.190674    18.369239
2020-12-25 06:44:50.328215     0.663029
2020-12-25 06:48:34.701798     0.000000
2020-12-25 06:53:34.569040     0.000000
2020-12-25 06:58:34.538735     0.000000
2020-12-25 07:03:34.554803     0.106661
2020-12-25 07:08:34.622109     0.000000
2020-12-25 07:13:34.498547     0.000000
2020-12-25 07:18:34.508618     0.000000
2020-12-25 07:23:34.799742     0.000000
2020-12-25 07:28:34.562884     0.000000
2020-12-25 07:33:34.649724     0.000000
2020-12-25 07:38:34.593196     0.000000
2020-12-25 07:43:34.584885     0.000000
2020-12-25 07:48:34.524464     0.000000
2020-12-25 07:53:34.682062     0.000000
2020-12-25 07:58:34.659174     0.000000
dtype: float64}
(Pdb) current_first_block_timestamp
Timestamp('2020-12-25 00:00:00')
_map_add_measures函数中变量:
(Pdb) tstamp
Timestamp('2020-12-25 08:03:34.551648')
(Pdb) new_first_block_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) definition
[{'points': 300, 'granularity': 300.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 900.0, 'timespan': 90000.0}, {'points': 100, 'granularity': 7200.0, 'timespan': 720000.0}, {'points': 200, 'granularity': 86400.0, 'timespan': 17280000.0}]
(Pdb) ts
<gnocchi.carbonara.GroupedTimeSeries object at 0x7fc3cc482150>
(Pdb) ts.__dict__
{'tstamps': array([  1.60888320e+18,   1.60888350e+18,   1.60888380e+18]), 'counts': array([1, 1, 1]), '_ts': 2020-12-25 08:03:34.551648    0.0
2020-12-25 08:08:34.615949    0.0
2020-12-25 08:13:34.586165    0.0
dtype: float64, 'indexes': array([  1.60888320e+18,   1.60888350e+18,   1.60888380e+18])

gnocchi.storage._carbonara.CarbonaraBasedStorage._get_unaggregated_timeserie_and_unserialize
解析:

  1. raw_measures是从ceph中取出的值,经过lz4算法压缩过的数据,里面一部分存时间戳,一部分存value值。
  2. carbonara.BoundTimeSerie.unserialize函数将这个压缩过的数据,反序列话处理。
    def _get_unaggregated_timeserie_and_unserialize(
            self, metric, block_size, back_window):
        """Retrieve unaggregated timeserie for a metric and unserialize it.

        Returns a gnocchi.carbonara.BoundTimeSerie object. If the data cannot
        be retrieved, returns None.

        """
        with timeutils.StopWatch() as sw:
            raw_measures = (
                self._get_unaggregated_timeserie(
                    metric)
            )
            LOG.debug(
                "Retrieve unaggregated measures "
                "for %s in %.2fs",
                metric.id, sw.elapsed())
        try:
            return carbonara.BoundTimeSerie.unserialize(
                raw_measures, block_size, back_window)
        except ValueError:
            raise CorruptionError(
                "Data corruption detected for %s "
                "unaggregated timeserie" % metric.id)
(Pdb) raw_measures
'\x10\x03\x00\x00\xf1\x04\x80\xa8\x96\xf9\xa3\xd9S\x16\x80\xdf_\xd7E\x00\x00\x00\x90.\x06\x08\x00\xf1=\xa8\x80\x92\xe0E\x00\x00\x00\x989\xa4\xd0E\x00\x00\x00\x88\x90_\xd8E\x00\x00\x00 \\\x92\xddE\x00\x00\x00\xb0\x8fw\xd5E\x00\x00\x000)*\xdcE\x00\x00\x00\xc8\xb7\xf2\xe2E\x00\x00\x00\xd8u%\xd3E\x00\x00\x00\xf0\xccl\xd6E\x00\x00\x00\xa8\xa5\xa1\xd9\x18\x00\xb1\xd0\xa7\xe1E\x00\x00\x00\x18\xd72\xcf8\x00\xa1\xda\xbc\xdeE\x00\x00\x00\x08\x05]p\x00A\x00\x89W\xd9p\x00!\xc9\xebX\x00\xb1 \xff\xeb\xdbE\x00\x00\x00H\xb2\xb08\x00\xb10\x14g44\x00\x00\x008\x0ei\x88\x00\xb1\xc0.\xa5\xdfE\x00\x00\x008\xa2\x8c0\x001H\xd5f0\x001\xf0[\xe1P\x001\xf8\xd9n\x90\x00A w\x97\xd1x\x001\xa9c\xd6\x88\x00"\xe7\xdc(\x00\x12\xf6\xf0\x00\xb1\x08\xd1jxW\x00\x00\x00\x88m\x97h\x00\xb2\x98,\xb5=4\x00\x00\x00\x10\xfez8\x00!M\x96P\x00\xb1\xa0\xe5Y\xdaE\x00\x00\x00\x10\xbag\x00\x01A\xf0O\x07\xd2P\x00!c\xfe\xb8\x00\xf1\x04 \xeb\xbe\xeaE\x00\x00\x00p\x8dF\xcbE\x00\x00\x00\xc0\xca\x91\xe0\x001\x80+\x06x\x001(\xe7\xe5\xb8\x001x\xc4\xca\xb0\x0010y\xc9\xa0\x001\xc0y\x07\x18\x00\xff\x02\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x01\x00\x0c\x7f\x07\xfc\xb5\xafC\xe3\xa7(\x00\x06\x7fu\xe2\x9c"\xf3Pk \x00\x06o\xf51\x88\x96~R \x00\x07\x0f\x02\x00]\xff\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5\xb8\x00\x06\x7f\xabL\xc9\xdc!N\xbb \x00\x06\x0f\x02\x00(P\x00\x00\x00\x00\x00'

gnocchi.storage.ceph.CephStorage._get_unaggregated_timeserie
解析:

  1. build_unaggregated_timeserie_path函数拼接一个路径的值:
    格式gnocchi
    <$metric.id>_none_v3
  2. 根据这个路径去ceph里面去ioctx.read读取压缩过的数据
    def _get_unaggregated_timeserie(self, metric, version=3):
        try:
            return self._get_object_content(
                self._build_unaggregated_timeserie_path(metric, version))
        except rados.ObjectNotFound:
            raise storage.MetricDoesNotExist(metric)
    @staticmethod
    def _build_unaggregated_timeserie_path(metric, version):
        return (('gnocchi_%s_none' % metric.id)
                + ("_v%s" % version if version else ""))

gnocchi.storage.ceph.CephStorage._get_object_content

    def _get_object_content(self, name):
        offset = 0
        content = b''
        while True:
            data = self.ioctx.read(name, offset=offset)
            if not data:
                break
            content += data
            offset += len(data)
        return content

gnocchi.carbonara.BoundTimeSerie.unserialize
解析:
1.lz4解压处理,筛选出时间戳和value值
2.可以看出时间戳是之前所有的时间戳

@classmethod
    def unserialize(cls, data, block_size, back_window):
        uncompressed = lz4.loads(data)
        nb_points = (
            len(uncompressed) // cls._SERIALIZATION_TIMESTAMP_VALUE_LEN
        )
        timestamps_raw = uncompressed[
            :nb_points*cls._SERIALIZATION_TIMESTAMP_LEN]
        timestamps = numpy.frombuffer(timestamps_raw, dtype='<Q')
        timestamps = numpy.cumsum(timestamps)
        timestamps = numpy.array(timestamps, dtype='datetime64[ns]')

        values_raw = uncompressed[nb_points*cls._SERIALIZATION_TIMESTAMP_LEN:]
        values = numpy.frombuffer(values_raw, dtype='<d')

        return cls.from_data(
            pandas.to_datetime(timestamps),
            values,
            block_size=block_size,
            back_window=back_window)
(Pdb) uncompressed
'\x80\xa8\x96\xf9\xa3\xd9S\x16\x80\xdf_\xd7E\x00\x00\x00\x90.\x06\xd7E\x00\x00\x00\xa8\x80\x92\xe0E\x00\x00\x00\x989\xa4\xd0E\x00\x00\x00\x88\x90_\xd8E\x00\x00\x00 \\\x92\xddE\x00\x00\x00\xb0\x8fw\xd5E\x00\x00\x000)*\xdcE\x00\x00\x00\xc8\xb7\xf2\xe2E\x00\x00\x00\xd8u%\xd3E\x00\x00\x00\xf0\xccl\xd6E\x00\x00\x00\xa8\xa5\xa1\xd9E\x00\x00\x00\xd8\xd0\xa7\xe1E\x00\x00\x00\x18\xd72\xcfE\x00\x00\x000\xda\xbc\xdeE\x00\x00\x00\x08\x05]\xd7E\x00\x00\x00\x00\x89W\xd9E\x00\x00\x00\x98\xc9\xeb\xd5E\x00\x00\x00 \xff\xeb\xdbE\x00\x00\x00H\xb2\xb0\xe1E\x00\x00\x000\x14g44\x00\x00\x008\x0ei\xd8E\x00\x00\x00\xc0.\xa5\xdfE\x00\x00\x008\xa2\x8c\xd5E\x00\x00\x00H\xd5f\xdbE\x00\x00\x00\xf0[\xe1\xd7E\x00\x00\x00\xf8\xd9n\xe2E\x00\x00\x00 w\x97\xd1E\x00\x00\x00\x18\xa9c\xd6E\x00\x00\x00\xd8\xe7\xdc\xdbE\x00\x00\x00\xf0\xf6_\xd7E\x00\x00\x00\x08\xd1jxW\x00\x00\x00\x88m\x97\xe1E\x00\x00\x00\x98,\xb5=4\x00\x00\x00\x10\xfez\xd1E\x00\x00\x00\x18M\x96\xd7E\x00\x00\x00\xa0\xe5Y\xdaE\x00\x00\x00\x10\xbag\xddE\x00\x00\x00\xf0O\x07\xd2E\x00\x00\x00\xd8c\xfe\xd9E\x00\x00\x00 \xeb\xbe\xeaE\x00\x00\x00p\x8dF\xcbE\x00\x00\x00\xc0\xca\x91\xdeE\x00\x00\x00\x80+\x06\xd6E\x00\x00\x00(\xe7\xe5\xd8E\x00\x00\x00x\xc4\xca\xd5E\x00\x00\x000y\xc9\xe2E\x00\x00\x00\xc0y\x07\xd8E\x00\x00\x00\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\xfc\xb5\xafC\xe3\xa7?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00u\xe2\x9c"\xf3Pk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf51\x88\x96~Rk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xabL\xc9\xdc!N\xbb?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
(Pdb) cls._SERIALIZATION_TIMESTAMP_VALUE_LEN
16
(Pdb) timestamps
array(['2020-12-25T03:59:50.191696000', '2020-12-25T04:04:50.157824000',
       '2020-12-25T04:09:50.118074000', '2020-12-25T04:14:50.238515000',
       '2020-12-25T04:19:50.091682000', '2020-12-25T04:24:50.074567000',
       '2020-12-25T04:29:50.144667000', '2020-12-25T04:34:50.078793000',
       '2020-12-25T04:39:50.125287000', '2020-12-25T04:44:50.285588000',
       '2020-12-25T04:49:50.180779000', '2020-12-25T04:54:50.130977000',
       '2020-12-25T04:59:50.134970000', '2020-12-25T05:04:50.273585000',
       '2020-12-25T05:09:50.102544000', '2020-12-25T05:14:50.192206000',
       '2020-12-25T05:19:50.158147000', '2020-12-25T05:24:50.157283000',
       '2020-12-25T05:29:50.099026000', '2020-12-25T05:34:50.141446000',
       '2020-12-25T05:39:50.280643000', '2020-12-25T05:43:34.498113000',
       '2020-12-25T05:48:34.481620000', '2020-12-25T05:53:34.586508000',
       '2020-12-25T05:58:34.522015000', '2020-12-25T06:03:34.555708000',
       '2020-12-25T06:08:34.530322000', '2020-12-25T06:13:34.681981000',
       '2020-12-25T06:18:34.551089000', '2020-12-25T06:23:34.500688000',
       '2020-12-25T06:28:34.542119000', '2020-12-25T06:33:34.508253000',
       '2020-12-25T06:39:50.190674000', '2020-12-25T06:44:50.328215000',
       '2020-12-25T06:48:34.701798000', '2020-12-25T06:53:34.569040000',
       '2020-12-25T06:58:34.538735000', '2020-12-25T07:03:34.554803000',
       '2020-12-25T07:08:34.622109000', '2020-12-25T07:13:34.498547000',
       '2020-12-25T07:18:34.508618000', '2020-12-25T07:23:34.799742000',
       '2020-12-25T07:28:34.562884000', '2020-12-25T07:33:34.649724000',
       '2020-12-25T07:38:34.593196000', '2020-12-25T07:43:34.584885000',
       '2020-12-25T07:48:34.524464000', '2020-12-25T07:53:34.682062000',
       '2020-12-25T07:58:34.659174000'], dtype='datetime64[ns]')
(Pdb) values_raw
'\xcdfX\x1c\xa1z#@\xf0m\xbeA5\xb9\xbe?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\xfc\xb5\xafC\xe3\xa7?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00u\xe2\x9c"\xf3Pk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf51\x88\x96~Rk?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00#\xe2qj\x86^2@\xbbo\x1bV\x897\xe5?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xabL\xc9\xdc!N\xbb?\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
(Pdb) values
array([  9.73951043e+00,   1.20013550e-01,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         4.66557648e-02,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   3.33449828e-03,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   3.33523487e-03,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   1.83692385e+01,
         6.63029354e-01,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   1.06660954e-01,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
         0.00000000e+00])

解析:
1.set_values函数,将redis中的measure值调用_add_measures函数存储到ceph中

    def set_values(self, values, before_truncate_callback=None,
                   ignore_too_old_timestamps=False):
        # NOTE: values must be sorted when passed in.
        if self.block_size is not None and not self.ts.empty:
            first_block_timestamp = self.first_block_timestamp()
            if ignore_too_old_timestamps:
                for index, (timestamp, value) in enumerate(values):
                    if timestamp >= first_block_timestamp:
                        values = values[index:]
                        break
                else:
                    values = []
            else:
                # Check that the smallest timestamp does not go too much back
                # in time.
                smallest_timestamp = values[0][0]
                if smallest_timestamp < first_block_timestamp:
                    raise NoDeloreanAvailable(first_block_timestamp,
                                              smallest_timestamp)
        super(BoundTimeSerie, self).set_values(values)
        if before_truncate_callback:
            before_truncate_callback(self)
        self._truncate()
(Pdb) values
[(Timestamp('2020-12-25 08:03:34.551648'), 0.0), (Timestamp('2020-12-25 08:08:34.615949'), 0.0), (Timestamp('2020-12-25 08:13:34.586165'), 0.0)]
    def _add_measures(self, aggregation, archive_policy_def,
                      metric, grouped_serie,
                      previous_oldest_mutable_timestamp,
                      oldest_mutable_timestamp):
        ts = carbonara.AggregatedTimeSerie.from_grouped_serie(
            grouped_serie, archive_policy_def.granularity,
            aggregation, max_size=archive_policy_def.points)

        # Don't do anything if the timeserie is empty
        if not ts:
            return

        # We only need to check for rewrite if driver is not in WRITE_FULL mode
        # and if we already stored splits once
        need_rewrite = (
            not self.WRITE_FULL
            and previous_oldest_mutable_timestamp is not None
        )

        if archive_policy_def.timespan or need_rewrite:
            existing_keys = self._list_split_keys_for_metric(
                metric, aggregation, archive_policy_def.granularity)

        # First delete old splits
        if archive_policy_def.timespan:
            oldest_point_to_keep = ts.last - datetime.timedelta(
                seconds=archive_policy_def.timespan)
            oldest_key_to_keep = ts.get_split_key(oldest_point_to_keep)
            oldest_key_to_keep_s = str(oldest_key_to_keep)
            for key in list(existing_keys):
                # NOTE(jd) Only delete if the key is strictly inferior to
                # the timestamp; we don't delete any timeserie split that
                # contains our timestamp, so we prefer to keep a bit more
                # than deleting too much
                if key < oldest_key_to_keep_s:
                    self._delete_metric_measures(
                        metric, key, aggregation,
                        archive_policy_def.granularity)
                    existing_keys.remove(key)
        else:
            oldest_key_to_keep = carbonara.SplitKey(0, 0)

        # Rewrite all read-only splits just for fun (and compression). This
        # only happens if `previous_oldest_mutable_timestamp' exists, which
        # means we already wrote some splits at some point – so this is not the
        # first time we treat this timeserie.
        if need_rewrite:
            previous_oldest_mutable_key = str(ts.get_split_key(
                previous_oldest_mutable_timestamp))
            oldest_mutable_key = str(ts.get_split_key(
                oldest_mutable_timestamp))

            if previous_oldest_mutable_key != oldest_mutable_key:
                for key in existing_keys:
                    if previous_oldest_mutable_key <= key < oldest_mutable_key:
                        LOG.debug(
                            "Compressing previous split %s (%s) for metric %s",
                            key, aggregation, metric)
                        # NOTE(jd) Rewrite it entirely for fun (and later for
                        # compression). For that, we just pass None as split.
                        self._store_timeserie_split(
                            metric, carbonara.SplitKey(
                                float(key), archive_policy_def.granularity),
                            None, aggregation, archive_policy_def,
                            oldest_mutable_timestamp)

        for key, split in ts.split():
            if key >= oldest_key_to_keep:
                LOG.debug(
                    "Storing split %s (%s) for metric %s",
                    key, aggregation, metric)
                self._store_timeserie_split(
                    metric, key, split, aggregation, archive_policy_def,
                    oldest_mutable_timestamp)
(Pdb) aggregation
u'max'
(Pdb) archive_policy_def
{'points': 100, 'granularity': 900.0, 'timespan': 90000.0}
(Pdb) metric
<Metric 11065ce7-1baf-425d-916a-4ff8e37423d7>
(Pdb) grouped_serie
<gnocchi.carbonara.GroupedTimeSeries object at 0x7fc3cc492490>
(Pdb) grouped_serie.__dict__
{'tstamps': array([  1.60888320e+18]), 'counts': array([3]), '_ts': 2020-12-25 08:03:34.551648    0.0
2020-12-25 08:08:34.615949    0.0
2020-12-25 08:13:34.586165    0.0
dtype: float64, 'indexes': array([  1.60888320e+18,   1.60888320e+18,   1.60888320e+18])}
(Pdb) 2020-12-25 16:53:57.033 72 WARNING gnocchi.cli [-] Metric processing lagging scheduling rate. It is recommended to increase the number of workers or to lengthen processing interval.
previous_oldest_mutable_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) oldest_mutable_timestamp
Timestamp('2020-12-25 00:00:00')
(Pdb) ts.__dict__
{'aggregation_method': u'max', 'ts': 2020-12-25 08:00:00    0.0
dtype: float64, 'max_size': 100, 'sampling': 900.0}
(Pdb) existing_keys
set([u'1607040000.0'])
(Pdb) oldest_point_to_keep
Timestamp('2020-12-24 07:00:00')
(Pdb) oldest_key_to_keep
<SplitKey: 1607040000.0 / 900.000000s>
(Pdb) oldest_key_to_keep_s
'1607040000.0
(Pdb) previous_oldest_mutable_key
'1607040000.0'
(Pdb) oldest_mutable_key
'1607040000.0'
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容