Commit c38f80cb authored by Lars Michelsen's avatar Lars Michelsen
Browse files

Metrics: Improve typing #4

Improve types hints in graph data fetching code.

Change-Id: I512cfb220e34c3aa1922b48e3745f74f0dd5cfcb
parent 74e69891
......@@ -18,7 +18,7 @@ from cmk.gui.plugins.metrics.utils import (
......@@ -132,7 +132,7 @@ graph_identification_types.register(GraphIdentificationTemplate)
def create_graph_recipe_from_template(
graph_template: GraphTemplate, translated_metrics: TranslatedMetrics, row: Row
) -> GraphRecipe:
def _metric(metric_definition: MetricDefinition) -> GraphMetrics:
def _metric(metric_definition: MetricDefinition) -> GraphMetric:
unit_color = metric_unit_color(metric_definition[0], translated_metrics)
metric = {
"title": metric_line_title(metric_definition, translated_metrics),
......@@ -7,30 +7,42 @@
import collections
import time
from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Iterator, List, Optional, Set, Union
import livestatus
from livestatus import SiteId
import cmk.utils.version as cmk_version
from cmk.utils.prediction import livestatus_lql, TimeSeries
from cmk.utils.prediction import livestatus_lql, TimeSeries, TimeSeriesValues
from cmk.utils.type_defs import HostName, ServiceName
import cmk.gui.plugins.metrics.timeseries as ts
import cmk.gui.sites as sites
from cmk.gui.exceptions import MKGeneralException
from cmk.gui.i18n import _
from cmk.gui.plugins.metrics.utils import check_metrics, reverse_translate_metric_name, RRDData
from cmk.gui.plugins.metrics.utils import (
from cmk.gui.type_defs import ColumnName
def fetch_rrd_data_for_graph(graph_recipe, graph_data_range) -> RRDData:
def fetch_rrd_data_for_graph(
graph_recipe: GraphRecipe, graph_data_range: GraphDataRange
) -> RRDData:
needed_rrd_data = get_needed_sources(graph_recipe["metrics"])
by_service = group_needed_rrd_data_by_service(needed_rrd_data)
rrd_data: RRDData = {}
for (site, host_name, service_description), entries in by_service.items():
for (site, host_name, service_description), metrics in by_service.items():
for (perfvar, cf, scale), data in fetch_rrd_data(
site, host_name, service_description, entries, graph_recipe, graph_data_range
site, host_name, service_description, metrics, graph_recipe, graph_data_range
rrd_data[(site, host_name, service_description, perfvar, cf, scale)] = TimeSeries(
......@@ -44,7 +56,7 @@ def fetch_rrd_data_for_graph(graph_recipe, graph_data_range) -> RRDData:
return rrd_data
def align_and_resample_rrds(rrd_data: RRDData, cf):
def align_and_resample_rrds(rrd_data: RRDData, cf: GraphConsoldiationFunction) -> None:
"""RRDTool aligns start/end/step to its internal precision.
This is returned as first 3 values in each RRD data row. Using that
......@@ -78,7 +90,7 @@ def align_and_resample_rrds(rrd_data: RRDData, cf):
# This makes only sense for graphs which are ending "now". So disable this
# for the other graphs.
def chop_last_empty_step(graph_data_range, rrd_data: RRDData):
def chop_last_empty_step(graph_data_range: GraphDataRange, rrd_data: RRDData) -> None:
if rrd_data:
sample_data = next(iter(rrd_data.values()))
step = sample_data.twindow[2]
......@@ -117,7 +129,7 @@ def needed_elements_of_expression(expression):
def get_needed_sources(
metrics: List[Dict[str, Any]], condition: Callable[[Any], bool] = lambda x: True
metrics: list[GraphMetric], condition: Callable[[Any], bool] = lambda x: True
) -> Set:
"""Extract all metric data sources definitions
......@@ -133,14 +145,32 @@ def get_needed_sources(
def group_needed_rrd_data_by_service(needed_rrd_data):
by_service: Dict[Tuple[str, str, str], Set[Tuple[Any, Any, Any]]] = collections.defaultdict(set)
NeededRRDData = set[
tuple[SiteId, HostName, ServiceName, str, Optional[GraphConsoldiationFunction], float]
MetricProperties = tuple[str, Optional[GraphConsoldiationFunction], float]
def group_needed_rrd_data_by_service(
needed_rrd_data: NeededRRDData,
) -> dict[tuple[SiteId, HostName, ServiceName], set[MetricProperties],]:
by_service: dict[
tuple[SiteId, HostName, ServiceName],
] = collections.defaultdict(set)
for site, host_name, service_description, perfvar, cf, scale in needed_rrd_data:
by_service[(site, host_name, service_description)].add((perfvar, cf, scale))
return by_service
def fetch_rrd_data(site, host_name, service_description, entries, graph_recipe, graph_data_range):
def fetch_rrd_data(
site: SiteId,
host_name: HostName,
service_description: ServiceName,
metrics: set[MetricProperties],
graph_recipe: GraphRecipe,
graph_data_range: GraphDataRange,
) -> list[tuple[MetricProperties, TimeSeriesValues]]:
start_time, end_time = graph_data_range["time_range"]
step: Union[int, float, str] = graph_data_range["step"]
......@@ -149,15 +179,17 @@ def fetch_rrd_data(site, host_name, service_description, entries, graph_recipe,
step = max(1, step)
point_range = ":".join(map(str, (start_time, end_time, step)))
lql_columns = list(rrd_columns(entries, graph_recipe["consolidation_function"], point_range))
lql_columns = list(rrd_columns(metrics, graph_recipe["consolidation_function"], point_range))
query = livestatus_lql([host_name], lql_columns, service_description)
with sites.only_sites(site):
return list(zip(entries,
return list(zip(metrics,
def rrd_columns(
metrics: List[Tuple[str, Optional[str], float]], rrd_consolidation: str, data_range: str
metrics: set[MetricProperties],
rrd_consolidation: Optional[GraphConsoldiationFunction],
data_range: str,
) -> Iterator[ColumnName]:
"""RRD data columns for each metric
......@@ -172,18 +204,23 @@ def rrd_columns(
def metric_in_all_rrd_columns(
metric: str, rrd_consolidation: str, from_time: int, until_time: int
metric: str,
rrd_consolidation: GraphConsoldiationFunction,
from_time: int,
until_time: int,
) -> List[ColumnName]:
"""Translate metric name to all perf_data names and construct RRD data columns for each"""
data_range = "%s:%s:%s" % (from_time, until_time, 60)
_metrics: List[Tuple[str, Optional[str], float]] = [
metrics: set[MetricProperties] = {
(name, None, scale) for name, scale in reverse_translate_metric_name(metric)
return list(rrd_columns(_metrics, rrd_consolidation, data_range))
return list(rrd_columns(metrics, rrd_consolidation, data_range))
def merge_multicol(row: Dict, rrdcols: List[ColumnName], params: Dict) -> TimeSeries:
def merge_multicol(
row: dict[str, Any], rrdcols: List[ColumnName], params: dict[str, Any]
) -> TimeSeries:
"""Establish single timeseries for desired metric
If Livestatus query is performed in bulk, over all possible named
......@@ -33,6 +33,8 @@ from typing import (
from six import ensure_str
from livestatus import SiteId
import cmk.utils.regex
import cmk.utils.version as cmk_version
from cmk.utils.memoize import MemoizeCache
......@@ -113,8 +115,9 @@ class GraphTemplate(_GraphTemplateMandatory, total=False):
GraphRecipe = Dict[str, Any]
GraphMetrics = Dict[str, Any]
RRDData = Dict[Tuple[str, str, str, str, str, str], TimeSeries]
GraphMetric = Dict[str, Any]
RRDDataKey = Tuple[SiteId, HostName, ServiceName, str, Optional[GraphConsoldiationFunction], float]
RRDData = Dict[RRDDataKey, TimeSeries]
class MetricUnitColor(TypedDict):
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment