0x00: Ultimate query
(
sum(max_over_time(kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="service"}[24h]
)
)
-
sum(min_over_time(kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="service"}[24h]
) unless (min_over_time(kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="service"}[24h]
) unless kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="eservice"} offset 24h
)
)
)
0x01 Why the increase is inaccurate?
the max value for last 24 hours:
sum(max_over_time(kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="event-emitter-master"}[24h]))
the min value for last 24 hours:
we can see there are some data are inaccurate with gaps.
kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="event-emitter-master"}
An inaccurate instance:
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}
increase(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
max_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h]) - min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
max_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
An accurate instance:
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}
max_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h]) - min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
A consistent example
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}
increase(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
max_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h]) - min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
0x03 How to revise this?
We have 3 kinds of data:
- consistent instance whose min value starts from a normal positive value
- inconsistent instance but with 0 as min value
- inconsistent instance whose min value starts from a positive value - this one is actual we need to handle with.
Basically, we don’t have to do anything for 1 and 2.
But for the 3rd, we have to amend the min value as 0 instead, and must be careful, we also need to keep the normal one as normal.
I tried several approaches for this, including:
clamp_min()
OR vector(0)
unless
Eventually, unless
helps me out of there accurately.
vector1 unless vector2
operator means exclude vector2
from vector1
.
For the 3 kinds of data, we intent to:
- consistent instance whose min value starts from a normal positive value
- exclude nothing
- inconsistent instance but with 0 as min value
- exclude nothing
-
inconsistent instance whose min value starts from a positive value - this one is actual we need to handle with.
- exclude itself
we do this to get nothing or itself:
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id" ...}[24h])
unless
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id" ...} offset 24h
results for this:
- consistent instance
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
kafka_consumer_fetch_manager_records_consumed_total{kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"} offset 24h
- inconsistent instance but with 0 as min value
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"} offset 24h
- inconsistent instance whose min value starts from a positive value
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"} offset 24h
then do unless
again with above results to exclude nothing or itself to get the accurate min value.
results for this:
- consistent instance
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
(min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"} offset 24h)
- inconsistent instance but with 0 as min value
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
(min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"} offset 24h)
- inconsistent instance whose min value starts from a positive value
min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
(min_over_time(kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"}[24h])
unless
kafka_consumer_fetch_manager_records_consumed_total{client_id="client_id", container="app", endpoint="http", instance="instance", job="job", kafka_version="3.7.0", kubernetes_namespace="engineering", kubernetes_pod_name="kubernetes_pod_name", namespace="engineering", pod="kubernetes_pod_name", service="service", topic="topic"} offset 24h)
last, we just need to get the difference with max and min sum
them for all instance values.
Thus, we get the ultimate query as the very above shows:
(
sum(max_over_time(kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="service"}[24h]
)
)
-
sum(min_over_time(kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="service"}[24h]
) unless (min_over_time(kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="service"}[24h]
) unless kafka_consumer_fetch_manager_records_consumed_total{namespace="engineering", topic="topic", service="eservice"} offset 24h
)
)
)
0x04 Discussions online
https://github.com/prometheus/prometheus/issues/6779
https://github.com/skaes/logjam-tools/pull/31
increase() in Prometheus sometimes doubles values: how to avoid?
Alternative tool: