Skip to content

MetricService

Expose TM1 model-performance statistics uniformly across v11 and v12.

This module holds both the :class:MetricService (version dispatch + REST I/O) and the pure, server-free helpers it delegates to: the v11 measure vocabulary, the v11 MDX builders, the v12 Metrics() $filter builder, and the record shapers. Keeping the helpers at module scope lets them be unit-tested without a live TM1 server while the service stays a thin orchestrator.

One method per Stats Category (by_cube, by_server, ...). Each method returns the same shape regardless of the underlying TM1 version, hiding whether the data came from a v11 }Stats* control cube (MDX/cellset) or the v12 Metrics() OData endpoint. Reads never mutate server state. Two record orientations:

  • gauge-long (by_cube, by_server): one row per metric, with Metric/Value/Unit.
  • entity-wide (by_rule and the v11-only entity categories): one row per entity with heterogeneous attribute columns.

Values are passed through verbatim — never converted. Read Unit to interpret them (v11 memory is raw bytes; v12 reports its own unit).

ALL_TIME_INTERVALS = 'ALL' module-attribute

CATEGORY_BY_CHORE = 'by_chore' module-attribute

CATEGORY_BY_CLIENT = 'by_client' module-attribute

CATEGORY_BY_CUBE = 'by_cube' module-attribute

CATEGORY_BY_CUBE_BY_CLIENT = 'by_cube_by_client' module-attribute

CATEGORY_BY_PROCESS = 'by_process' module-attribute

CATEGORY_BY_RULE = 'by_rule' module-attribute

CATEGORY_BY_SERVER = 'by_server' module-attribute

CUBES_TOTAL = 'Cubes Total' module-attribute

DEFAULT_TIME_INTERVAL = 'LATEST' module-attribute

ENTITY_DIM_COLUMN = {'}Cubes': 'CubeName', '}PerfCubes': 'CubeName', '}LineNumber': 'LineNumber', '}Processes': 'ProcessName', '}Chores': 'ChoreName', '}PerfClients': 'ClientName', '}Cube Functions': 'CubeFunction'} module-attribute

ENTITY_MEASURE_COLUMNS = {CATEGORY_BY_RULE: {'Rule Text': 'RuleText', 'Total Run Count': 'TotalRunCount', 'Min Time (ms)': 'MinTimeMs', 'Max Time (ms)': 'MaxTimeMs', 'Avg Time (ms)': 'AvgTimeMs', 'Total Time (ms)': 'TotalTimeMs', 'Last Run Time': 'LastRunTime'}, CATEGORY_BY_PROCESS: {'Current State': 'CurrentState', 'Completion Status': 'CompletionStatus', 'Client Name': 'ClientName', 'Last Start Time': 'LastStartTime', 'Last End Time': 'LastEndTime', 'Last Duration': 'LastDuration', 'Next Activation Time': 'NextActivationTime', 'Current Process': 'CurrentProcess'}, CATEGORY_BY_CLIENT: {'Message Count': 'MessageCount', 'Message Bytes': 'MessageBytes', 'Request Count': 'RequestCount', 'Elapse Time (ms)': 'ElapseTimeMs', 'Bytes/Message': 'BytesPerMessage'}, CATEGORY_BY_CUBE_BY_CLIENT: {'Count': 'Count', 'Elapse Time (ms)': 'ElapseTimeMs'}} module-attribute

UNIT_BYTES = 'B' module-attribute

UNIT_COUNT = '#' module-attribute

V12_VERSION = '12.0.0' module-attribute

MetricService(rest)

Bases: ObjectService

Expose TM1 model-performance statistics uniformly across v11 and v12.

One method per Stats Category (by_cube, by_server, ...). Each method returns the same shape regardless of the underlying TM1 version, hiding whether the data came from a v11 }Stats* control cube (MDX/cellset) or the v12 Metrics() OData endpoint. Reads never mutate server state.

Two return orientations, chosen by the data's nature:

  • gauge-long (by_cube, by_server): one row per metric, with Metric / Value / Unit.
  • entity-wide (by_rule and the v11-only categories): one row per entity with attribute columns.

Every read method has a parallel *_as_dataframe variant.

Per-cube metrics (gauge-long), unified across versions::

>>> rows = tm1.metrics.by_cube(cube="plan_BudgetPlan")
>>> rows[0]
{'Category': 'by_cube', 'CubeName': 'plan_BudgetPlan',
 'Metric': 'cube_memory_used', 'NativeName': 'Total Memory Used',
 'Value': 8385536, 'Unit': 'B', 'ReplicaID': 0,
 'TimeInterval': 'LATEST', 'Timestamp': None}

The canonical Metric name is stable across versions; NativeName carries the source's own name. Values pass through unconverted — read Unit to interpret them (cube_memory_used is bytes on v11, KB on v12). As a filtered DataFrame::

>>> df = tm1.metrics.by_cube_as_dataframe(metrics=["cube_memory_used"])

Server/replica-level metrics. v12 (highly-available) yields one row per replica; v11 is a single replica (ReplicaID=0)::

>>> tm1.metrics.by_server(metrics=["replica_memory_used"])

Per-rule timing (entity-wide), unified across versions — the read is identical; only how }StatsByRule gets populated differs. On v11 the Performance Monitor populates it, so ensure it is running, then read::

>>> tm1.metrics.get_performance_monitor_state()
False
>>> tm1.metrics.start_performance_monitor()
>>> tm1.metrics.by_rule(cube="plan_BudgetPlan")

On v12 collect on demand: start, exercise the cube's rules, let the ~60s sampling interval elapse, flush, then read the same way::

>>> tm1.metrics.start_collecting_rule_stats("plan_BudgetPlan")
>>> # ... exercise the cube's rules, wait ~60s for the sampling interval ...
>>> tm1.metrics.flush_collected_rule_stats("plan_BudgetPlan")
>>> tm1.metrics.stop_collecting_rule_stats("plan_BudgetPlan")
>>> tm1.metrics.by_rule(cube="plan_BudgetPlan")

v11-only categories (the cubes were removed in v12; these raise :class:TM1pyVersionException on a v12 database)::

>>> tm1.metrics.by_process()
>>> tm1.metrics.by_chore()
>>> tm1.metrics.by_client()
>>> tm1.metrics.by_cube_by_client(cube="plan_BudgetPlan")

Two version-error flavours are used deliberately. Version-gated reads — the v11-only categories above and the v11-only time_interval / v12-only since arguments — raise :class:TM1pyVersionException (the underlying source simply does not exist on the other version). The Performance Monitor controls (:meth:start_performance_monitor etc.) raise :class:TM1pyVersionDeprecationException, mirroring the deprecated :class:ServerService methods they delegate to.

Source code in TM1py/Services/MetricService.py
def __init__(self, rest: RestService):
    super().__init__(rest)
    self._cells = None
    self._cubes = None
    self._servers = None

STATS_BY_RULE_CUBE = '}StatsByRule' class-attribute instance-attribute

by_chore(**kwargs)

Per-chore execution statistics (entity-wide). v11 only.

Raises:

Type Description
TM1pyVersionException

on v12 (the }StatsByChore cube was removed; no v12 source exists).

Source code in TM1py/Services/MetricService.py
def by_chore(self, **kwargs) -> List[Dict]:
    """Per-chore execution statistics (entity-wide). v11 only.

    :raises TM1pyVersionException: on v12 (the ``}StatsByChore`` cube was
        removed; no v12 source exists).
    """
    self._require_v11(CATEGORY_BY_CHORE)
    return self._entity_v11(CATEGORY_BY_CHORE, **kwargs)

by_chore_as_dataframe(*args, **kwargs)

Source code in TM1py/Services/MetricService.py
@require_pandas
def by_chore_as_dataframe(self, *args, **kwargs) -> "pd.DataFrame":
    return pd.DataFrame.from_records(self.by_chore(*args, **kwargs))

by_client(**kwargs)

Per-client request/message statistics (entity-wide). v11 only.

Raises:

Type Description
TM1pyVersionException

on v12 (the }StatsByClient cube was removed; no v12 source exists).

Source code in TM1py/Services/MetricService.py
def by_client(self, **kwargs) -> List[Dict]:
    """Per-client request/message statistics (entity-wide). v11 only.

    :raises TM1pyVersionException: on v12 (the ``}StatsByClient`` cube was
        removed; no v12 source exists).
    """
    self._require_v11(CATEGORY_BY_CLIENT)
    return self._entity_v11(CATEGORY_BY_CLIENT, **kwargs)

by_client_as_dataframe(*args, **kwargs)

Source code in TM1py/Services/MetricService.py
@require_pandas
def by_client_as_dataframe(self, *args, **kwargs) -> "pd.DataFrame":
    return pd.DataFrame.from_records(self.by_client(*args, **kwargs))

by_cube(cube=None, metrics=None, time_interval=None, since=None, include_control=False, **kwargs)

Per-cube performance metrics (gauge-long), unified across versions.

}-control cubes and the synthetic Cubes Total row are excluded by default; include_control=True adds }-cubes (v11 only).

Parameters:

Name Type Description Default
time_interval str

v11-only rolling-window selector (see :meth:build_v11_mdx); passing it on v12 raises :class:TM1pyVersionException.

None
since datetime

v12-only — return only metrics whose Timestamp is strictly after this datetime (Metrics() has no per-metric timestamp on v11); passing it on v11 raises :class:TM1pyVersionException. A tz-aware datetime is converted to UTC; a naive one is assumed UTC.

None
Source code in TM1py/Services/MetricService.py
def by_cube(
    self,
    cube: str = None,
    metrics: List[str] = None,
    time_interval: str = None,
    since: datetime = None,
    include_control: bool = False,
    **kwargs,
) -> List[Dict]:
    """Per-cube performance metrics (gauge-long), unified across versions.

    ``}``-control cubes and the synthetic ``Cubes Total`` row are excluded
    by default; ``include_control=True`` adds ``}``-cubes (v11 only).

    :param time_interval: v11-only rolling-window selector (see
        :meth:`build_v11_mdx`); passing it on v12 raises
        :class:`TM1pyVersionException`.
    :param since: v12-only — return only metrics whose ``Timestamp`` is
        strictly after this ``datetime`` (``Metrics()`` has no per-metric
        timestamp on v11); passing it on v11 raises
        :class:`TM1pyVersionException`. A tz-aware datetime is converted to
        UTC; a naive one is assumed UTC.
    """
    if self._is_v12:
        self._reject_time_interval(time_interval, "by_cube")
        if include_control:
            warnings.warn(
                "include_control has no effect on v12: Metrics() never reports '}'-control cubes.",
                stacklevel=2,
            )
        return self._gauge_v12(CATEGORY_BY_CUBE, cube=cube, metrics=metrics, since=since, **kwargs)
    self._reject_since(since, "by_cube")
    return self._gauge_v11(
        CATEGORY_BY_CUBE,
        cube=cube,
        metrics=metrics,
        time_interval=time_interval,
        include_control=include_control,
        **kwargs,
    )

by_cube_as_dataframe(*args, **kwargs)

Source code in TM1py/Services/MetricService.py
@require_pandas
def by_cube_as_dataframe(self, *args, **kwargs) -> "pd.DataFrame":
    return pd.DataFrame.from_records(self.by_cube(*args, **kwargs))

by_cube_by_client(cube=None, **kwargs)

Per-cube-per-client access statistics (entity-wide). v11 only.

Raises:

Type Description
TM1pyVersionException

on v12 (the }StatsByCubeByClient cube was removed; no v12 source exists).

Source code in TM1py/Services/MetricService.py
def by_cube_by_client(self, cube: str = None, **kwargs) -> List[Dict]:
    """Per-cube-per-client access statistics (entity-wide). v11 only.

    :raises TM1pyVersionException: on v12 (the ``}StatsByCubeByClient`` cube
        was removed; no v12 source exists).
    """
    self._require_v11(CATEGORY_BY_CUBE_BY_CLIENT)
    return self._entity_v11(CATEGORY_BY_CUBE_BY_CLIENT, cube=cube, **kwargs)

by_cube_by_client_as_dataframe(*args, **kwargs)

Source code in TM1py/Services/MetricService.py
@require_pandas
def by_cube_by_client_as_dataframe(self, *args, **kwargs) -> "pd.DataFrame":
    return pd.DataFrame.from_records(self.by_cube_by_client(*args, **kwargs))

by_process(**kwargs)

Per-process execution statistics (entity-wide). v11 only.

Raises:

Type Description
TM1pyVersionException

on v12 (the }StatsByProcess cube was removed; no v12 source exists).

Source code in TM1py/Services/MetricService.py
def by_process(self, **kwargs) -> List[Dict]:
    """Per-process execution statistics (entity-wide). v11 only.

    :raises TM1pyVersionException: on v12 (the ``}StatsByProcess`` cube was
        removed; no v12 source exists).
    """
    self._require_v11(CATEGORY_BY_PROCESS)
    return self._entity_v11(CATEGORY_BY_PROCESS, **kwargs)

by_process_as_dataframe(*args, **kwargs)

Source code in TM1py/Services/MetricService.py
@require_pandas
def by_process_as_dataframe(self, *args, **kwargs) -> "pd.DataFrame":
    return pd.DataFrame.from_records(self.by_process(*args, **kwargs))

by_rule(cube=None, **kwargs)

Per-rule-line statistics (entity-wide), unified across versions.

}StatsByRule is structurally identical on v11 and v12 (dimensions }Cubes x }LineNumber x }RuleStats), so the same cellset read/shape path serves both. If the cube does not exist yet, this returns [] with a warning rather than raising.

How the cube gets populated differs by version: on v11 the Performance Monitor populates it. On v12 it is created/updated on demand by the rule-stats collection lifecycle — call :meth:start_collecting_rule_stats, exercise the cube's rules, allow the ~60s sampling interval to elapse, then :meth:flush_collected_rule_stats (verified on 12.5.9).

Source code in TM1py/Services/MetricService.py
def by_rule(self, cube: str = None, **kwargs) -> List[Dict]:
    """Per-rule-line statistics (entity-wide), unified across versions.

    ``}StatsByRule`` is structurally identical on v11 and v12 (dimensions
    ``}Cubes`` x ``}LineNumber`` x ``}RuleStats``), so the same cellset
    read/shape path serves both. If the cube does not exist yet, this returns
    ``[]`` with a warning rather than raising.

    How the cube gets populated differs by version: on v11 the Performance
    Monitor populates it. On v12 it is created/updated on demand by the
    rule-stats collection lifecycle — call :meth:`start_collecting_rule_stats`,
    exercise the cube's rules, allow the ~60s sampling interval to elapse,
    then :meth:`flush_collected_rule_stats` (verified on 12.5.9).
    """
    if not self._cube_service.exists(self.STATS_BY_RULE_CUBE, **kwargs):
        if self._is_v12:
            hint = (
                "rule stats have not been collected yet — call start_collecting_rule_stats(cube), "
                "exercise the cube's rules, wait for the ~60s sampling interval, then "
                "flush_collected_rule_stats(cube)"
            )
        else:
            hint = (
                "rule stats have not been collected — ensure the Performance Monitor is running "
                "(start_performance_monitor)"
            )
        warnings.warn(
            f"'{self.STATS_BY_RULE_CUBE}' does not exist; {hint}. Returning no records.",
            stacklevel=2,
        )
        return []
    return self._entity_v11(CATEGORY_BY_RULE, cube=cube, **kwargs)

by_rule_as_dataframe(*args, **kwargs)

Source code in TM1py/Services/MetricService.py
@require_pandas
def by_rule_as_dataframe(self, *args, **kwargs) -> "pd.DataFrame":
    return pd.DataFrame.from_records(self.by_rule(*args, **kwargs))

by_server(metrics=None, time_interval=None, since=None, **kwargs)

Server/replica-level metrics (gauge-long), unified across versions.

Always one row per replica with a ReplicaID column (v11 -> 0).

Parameters:

Name Type Description Default
time_interval str

v11-only rolling-window selector; passing it on v12 raises :class:TM1pyVersionException.

None
since datetime

v12-only Timestamp gt filter (see :meth:by_cube); passing it on v11 raises :class:TM1pyVersionException.

None
Source code in TM1py/Services/MetricService.py
def by_server(
    self, metrics: List[str] = None, time_interval: str = None, since: datetime = None, **kwargs
) -> List[Dict]:
    """Server/replica-level metrics (gauge-long), unified across versions.

    Always one row per replica with a ``ReplicaID`` column (v11 -> ``0``).

    :param time_interval: v11-only rolling-window selector; passing it on
        v12 raises :class:`TM1pyVersionException`.
    :param since: v12-only ``Timestamp gt`` filter (see :meth:`by_cube`);
        passing it on v11 raises :class:`TM1pyVersionException`.
    """
    if self._is_v12:
        self._reject_time_interval(time_interval, "by_server")
        return self._gauge_v12(CATEGORY_BY_SERVER, cube=None, metrics=metrics, since=since, **kwargs)
    self._reject_since(since, "by_server")
    return self._gauge_v11(CATEGORY_BY_SERVER, cube=None, metrics=metrics, time_interval=time_interval, **kwargs)

by_server_as_dataframe(*args, **kwargs)

Source code in TM1py/Services/MetricService.py
@require_pandas
def by_server_as_dataframe(self, *args, **kwargs) -> "pd.DataFrame":
    return pd.DataFrame.from_records(self.by_server(*args, **kwargs))

flush_collected_rule_stats(cube, **kwargs)

Flush the rule stats collected for cube into }StatsByRule (v12).

Writes the rule statistics collected since :meth:start_collecting_rule_stats into the }StatsByRule cube (created on demand), which :meth:by_rule then reads. Collection is sampled on a ~60s interval, so exercise the cube's rules and let that interval elapse between start and flush for stats to appear (verified on 12.5.9).

Source code in TM1py/Services/MetricService.py
@require_version(version=V12_VERSION)
def flush_collected_rule_stats(self, cube: str, **kwargs):
    """Flush the rule stats collected for ``cube`` into ``}StatsByRule`` (v12).

    Writes the rule statistics collected since :meth:`start_collecting_rule_stats`
    into the ``}StatsByRule`` cube (created on demand), which :meth:`by_rule`
    then reads. Collection is sampled on a ~60s interval, so exercise the
    cube's rules and let that interval elapse between start and flush for
    stats to appear (verified on 12.5.9).
    """
    return self._rule_stats_action(cube, "FlushCollectedRuleStats", **kwargs)

get_performance_monitor_state()

Return whether the Performance Monitor is currently active (v11 only).

Raises :class:TM1pyVersionDeprecationException on v12. Delegates to :meth:ServerService.get_performance_monitor_state.

Source code in TM1py/Services/MetricService.py
@deprecated_in_version(version="12.0.0")
def get_performance_monitor_state(self) -> bool:
    """Return whether the Performance Monitor is currently active (v11 only).

    Raises :class:`TM1pyVersionDeprecationException` on v12. Delegates to
    :meth:`ServerService.get_performance_monitor_state`.
    """
    return self._server_service.get_performance_monitor_state()

start_collecting_rule_stats(cube, **kwargs)

Start collecting per-rule timing statistics for cube (v12).

Source code in TM1py/Services/MetricService.py
@require_version(version=V12_VERSION)
def start_collecting_rule_stats(self, cube: str, **kwargs):
    """Start collecting per-rule timing statistics for ``cube`` (v12)."""
    return self._rule_stats_action(cube, "StartCollectingRuleStats", **kwargs)

start_performance_monitor()

Turn the Performance Monitor on (v11 only).

While it runs, TM1 populates the }Stats* cubes that :meth:by_cube, :meth:by_server, and :meth:by_rule read on v11. PerformanceMonitorOn is a v11-only setting, so this raises :class:TM1pyVersionDeprecationException on v12. Delegates to :meth:ServerService.start_performance_monitor.

Source code in TM1py/Services/MetricService.py
@deprecated_in_version(version="12.0.0")
def start_performance_monitor(self):
    """Turn the Performance Monitor on (v11 only).

    While it runs, TM1 populates the ``}Stats*`` cubes that :meth:`by_cube`,
    :meth:`by_server`, and :meth:`by_rule` read on v11. ``PerformanceMonitorOn``
    is a v11-only setting, so this raises :class:`TM1pyVersionDeprecationException`
    on v12. Delegates to :meth:`ServerService.start_performance_monitor`.
    """
    return self._server_service.start_performance_monitor()

stop_collecting_rule_stats(cube, **kwargs)

Stop collecting per-rule timing statistics for cube (v12).

Source code in TM1py/Services/MetricService.py
@require_version(version=V12_VERSION)
def stop_collecting_rule_stats(self, cube: str, **kwargs):
    """Stop collecting per-rule timing statistics for ``cube`` (v12)."""
    return self._rule_stats_action(cube, "StopCollectingRuleStats", **kwargs)

stop_performance_monitor()

Turn the Performance Monitor off (v11 only).

Raises :class:TM1pyVersionDeprecationException on v12. Delegates to :meth:ServerService.stop_performance_monitor.

Source code in TM1py/Services/MetricService.py
@deprecated_in_version(version="12.0.0")
def stop_performance_monitor(self):
    """Turn the Performance Monitor off (v11 only).

    Raises :class:`TM1pyVersionDeprecationException` on v12. Delegates to
    :meth:`ServerService.stop_performance_monitor`.
    """
    return self._server_service.stop_performance_monitor()

build_metrics_url(cube_name=None, metrics=None, timestamp=None)

Build the relative /Metrics() URL, optionally with a $filter.

Parameters:

Name Type Description Default
cube_name str

restrict to a single cube (CubeName eq '<cube>').

None
metrics Optional[List[str]]

restrict to these canonical metric names (Name eq 'm1' or Name eq 'm2' ...).

None
timestamp datetime

only metrics newer than this (Timestamp gt <iso>).

None

Returns:

Type Description
str

"/Metrics()" or "/Metrics()?$filter=...".

Source code in TM1py/Services/MetricService.py
def build_metrics_url(
    cube_name: str = None,
    metrics: Optional[List[str]] = None,
    timestamp: datetime = None,
) -> str:
    """Build the relative ``/Metrics()`` URL, optionally with a ``$filter``.

    :param cube_name: restrict to a single cube (``CubeName eq '<cube>'``).
    :param metrics: restrict to these canonical metric names
        (``Name eq 'm1' or Name eq 'm2' ...``).
    :param timestamp: only metrics newer than this (``Timestamp gt <iso>``).
    :return: ``"/Metrics()"`` or ``"/Metrics()?$filter=..."``.
    """
    clauses: List[str] = []

    if cube_name:
        clauses.append(f"CubeName eq '{build_url_friendly_object_name(cube_name)}'")

    if metrics:
        clauses.append(" or ".join(f"Name eq '{build_url_friendly_object_name(m)}'" for m in metrics))

    if timestamp:
        clauses.append(f"Timestamp gt {datetime_to_iso(timestamp)}")

    if not clauses:
        return "/Metrics()"

    filter_string = " and ".join(f"({clause})" for clause in clauses)
    return f"/Metrics()?$filter={filter_string}"

build_v11_entity_mdx(category, cube=None)

Build the v11 MDX for an entity (wide) category.

Measures (the category's measure dimension) go on axis 0; the entity dimension(s) are crossjoined NON EMPTY on axis 1; a LATEST slicer is added when the cube has a time dimension. cube restricts the relevant entity dimension to a single member.

Raises:

Type Description
KeyError

if category is not an entity category.

Source code in TM1py/Services/MetricService.py
def build_v11_entity_mdx(category: str, cube: str = None) -> str:
    """Build the v11 MDX for an entity (wide) category.

    Measures (the category's measure dimension) go on axis 0; the entity
    dimension(s) are crossjoined ``NON EMPTY`` on axis 1; a ``LATEST`` slicer is
    added when the cube has a time dimension. ``cube`` restricts the relevant
    entity dimension to a single member.

    :raises KeyError: if ``category`` is not an entity category.
    """
    try:
        spec = _ENTITY_SPEC[category]
    except KeyError:
        raise KeyError(f"Unknown entity Stats Category: '{category}'")

    measure_dim = spec["measure_dim"]
    measures = ",".join(f"[{measure_dim}].[{_escape_mdx(m)}]" for m in entity_measure_columns(category))
    columns = "{" + measures + "}"

    cube_dim = spec["cube_dim"]
    entity_parts = []
    for dim in spec["entity_dims"]:
        if cube and dim == cube_dim:
            entity_parts.append("{[" + dim + "].[" + _escape_mdx(cube) + "]}")
        else:
            entity_parts.append("{TM1SUBSETALL([" + dim + "])}")
    rows = "NON EMPTY {" + " * ".join(entity_parts) + "}"

    where = ""
    if spec["time_dim"]:
        where = f" WHERE ([{spec['time_dim']}].[{DEFAULT_TIME_INTERVAL}])"

    return f"SELECT {columns} ON 0, {rows} ON 1 FROM [{spec['cube']}]{where}"

build_v11_mdx(category, cube=None, time_interval=None, include_control=False)

Build the v11 MDX for a gauge category.

Parameters:

Name Type Description Default
category str

by_cube or by_server.

required
cube str

restrict to a single entity (by_cube only); ignored where the category has no entity dimension.

None
time_interval str

None -> LATEST snapshot (default); ALL -> the full rolling window (time on an axis); any other string -> that specific }TimeIntervals bucket.

None
include_control bool

include }-control cubes (by_cube only).

False

Raises:

Type Description
KeyError

if category is not a gauge category.

Source code in TM1py/Services/MetricService.py
def build_v11_mdx(
    category: str,
    cube: str = None,
    time_interval: str = None,
    include_control: bool = False,
) -> str:
    """Build the v11 MDX for a gauge category.

    :param category: ``by_cube`` or ``by_server``.
    :param cube: restrict to a single entity (``by_cube`` only); ignored where
        the category has no entity dimension.
    :param time_interval: ``None`` -> ``LATEST`` snapshot (default);
        ``ALL`` -> the full rolling window (time on an axis);
        any other string -> that specific ``}TimeIntervals`` bucket.
    :param include_control: include ``}``-control cubes (``by_cube`` only).
    :raises KeyError: if ``category`` is not a gauge category.
    """
    try:
        spec = _SPEC[category]
    except KeyError:
        raise KeyError(f"Unknown gauge Stats Category: '{category}'")

    columns = _measure_set(category, spec["measure_dim"])
    time_dim = spec["time_dim"]
    entity_dim = spec["entity_dim"]
    full_window = time_interval == ALL_TIME_INTERVALS

    # Build the rows axis (entity and/or time) and the optional WHERE slicer.
    row_parts = []
    if entity_dim:
        row_parts.append(_entity_set(entity_dim, cube, include_control))
    if full_window:
        row_parts.append("{[" + time_dim + "].Members}")

    where = ""
    if not full_window:
        bucket = time_interval or DEFAULT_TIME_INTERVAL
        where = f" WHERE ([{time_dim}].[{_escape_mdx(bucket)}])"

    rows = " * ".join(row_parts)
    axes = columns + " ON 0"
    if rows:
        axes += ", " + rows + " ON 1"

    return f"SELECT {axes} FROM [{spec['cube']}]{where}"

entity_measure_columns(category)

Return the native-measure -> column-name map for an entity category.

Source code in TM1py/Services/MetricService.py
def entity_measure_columns(category: str) -> Dict[str, str]:
    """Return the native-measure -> column-name map for an entity category."""
    try:
        return dict(ENTITY_MEASURE_COLUMNS[category])
    except KeyError:
        raise KeyError(f"Unknown entity Stats Category: '{category}'")

normalize_v11_measure(category, native_name)

Normalize a raw v11 measure into (Metric, NativeName, Unit).

Parameters:

Name Type Description Default
category str

a gauge Stats Category (by_cube / by_server).

required
native_name str

the v11 measure name as reported by the }Stats* cube.

required

Returns:

Type Description
Tuple[str, str, Optional[str]]

(canonical Metric, native_name as supplied, Unit).

Raises:

Type Description
KeyError

if the category is not a known gauge category, or the measure is not in that category's mapping table.

Source code in TM1py/Services/MetricService.py
def normalize_v11_measure(category: str, native_name: str) -> Tuple[str, str, Optional[str]]:
    """Normalize a raw v11 measure into ``(Metric, NativeName, Unit)``.

    :param category: a gauge Stats Category (``by_cube`` / ``by_server``).
    :param native_name: the v11 measure name as reported by the ``}Stats*`` cube.
    :raises KeyError: if the category is not a known gauge category, or the
        measure is not in that category's mapping table.
    :return: ``(canonical Metric, native_name as supplied, Unit)``.
    """
    try:
        lookup = _LOOKUPS[category]
    except KeyError:
        raise KeyError(f"Unknown gauge Stats Category: '{category}'")

    try:
        metric, unit = lookup[native_name]
    except KeyError:
        raise KeyError(f"Unknown v11 measure '{native_name}' for gauge Stats Category '{category}'")
    return metric, native_name, unit

shape_v11_entity_records(cellset, category)

Shape a raw v11 }Stats* cellset into entity-wide records for category.

One row per entity (e.g. per (CubeName, LineNumber) for by_rule); each of the category's measures becomes a column. Members are mapped to their dimension via UniqueName. ReplicaID is always 0 on v11.

Raises:

Type Description
KeyError

if category is not an entity category.

Source code in TM1py/Services/MetricService.py
def shape_v11_entity_records(cellset: Dict, category: str) -> List[Dict]:
    """Shape a raw v11 ``}Stats*`` cellset into entity-wide records for ``category``.

    One row per entity (e.g. per ``(CubeName, LineNumber)`` for ``by_rule``);
    each of the category's measures becomes a column. Members are mapped to
    their dimension via ``UniqueName``. ``ReplicaID`` is always ``0`` on v11.

    :raises KeyError: if ``category`` is not an entity category.
    """
    spec = v11_entity_spec(category)
    measure_dim = spec["measure_dim"]
    entity_dims = spec["entity_dims"]
    time_dim = spec["time_dim"]
    measure_columns = entity_measure_columns(category)

    axes = sorted(cellset.get("Axes", []), key=lambda a: a["Ordinal"])
    sizes = [len(axis["Tuples"]) for axis in axes]
    if any(size == 0 for size in sizes):
        return []

    axis_tuples = _axis_tuples_by_dimension(axes)
    cells = cellset.get("Cells", [])
    _warn_if_truncated(category, sizes, cells)

    records: Dict[tuple, Dict] = {}
    for coord in itertools.product(*(range(size) for size in sizes)):
        index = _cell_index(coord, sizes)

        dims: Dict[str, str] = {}
        for axis_i, member_i in enumerate(coord):
            dims.update(axis_tuples[axis_i][member_i])

        native = dims.get(measure_dim)
        if native not in measure_columns:
            continue

        entity_key = tuple(dims.get(dim) for dim in entity_dims)
        record = records.get(entity_key)
        if record is None:
            record = {"Category": category}
            for dim in entity_dims:
                record[ENTITY_DIM_COLUMN[dim]] = dims.get(dim)
            record["ReplicaID"] = 0
            # Entity categories with a time dimension are always sliced on the
            # ``LATEST`` bucket (see build_v11_entity_mdx), so the fallback is
            # accurate. ``by_rule`` has no time dimension at all — its cube is
            # cumulative since collection started — so ``TimeInterval`` there is
            # a constant placeholder for schema uniformity, NOT a snapshot
            # selector. Callers must not read a "now" semantic into it.
            record["TimeInterval"] = dims.get(time_dim, DEFAULT_TIME_INTERVAL) if time_dim else DEFAULT_TIME_INTERVAL
            # pre-create measure columns so column order/presence is stable
            for column in measure_columns.values():
                record[column] = None
            records[entity_key] = record

        record[measure_columns[native]] = cells[index].get("Value") if index < len(cells) else None

    return list(records.values())

shape_v11_gauge_records(cellset, category)

Shape a raw v11 }Stats* cellset into gauge-long records for category.

Members are mapped to their dimension via UniqueName (the context/time axis ordinal is not fixed across queries), then each cell becomes one record: the measure is normalized to its canonical Metric/Unit via the vocabulary, the entity (cube) and time bucket are read from their dimensions. ReplicaID is always 0 on v11. Values are passed through verbatim (including None).

Note: v11 surfaces the MDX WHERE slicer as an axis in the cellset response (verified on a live 11.8 server), so the }TimeIntervals bucket — LATEST, a specific bucket, or the full window — is always present on an axis and read per-row. DEFAULT_TIME_INTERVAL is only a safety fallback for the (unobserved) case where it is absent.

Raises:

Type Description
KeyError

if category is not a gauge category.

Source code in TM1py/Services/MetricService.py
def shape_v11_gauge_records(cellset: Dict, category: str) -> List[Dict]:
    """Shape a raw v11 ``}Stats*`` cellset into gauge-long records for ``category``.

    Members are mapped to their dimension via ``UniqueName`` (the context/time
    axis ordinal is not fixed across queries), then each cell becomes one
    record: the measure is normalized to its canonical ``Metric``/``Unit`` via
    the vocabulary, the entity (cube) and time bucket are read from their
    dimensions. ``ReplicaID`` is always ``0`` on v11. Values are passed through
    verbatim (including ``None``).

    Note: v11 surfaces the MDX ``WHERE`` slicer as an axis in the cellset
    response (verified on a live 11.8 server), so the ``}TimeIntervals`` bucket
    — ``LATEST``, a specific bucket, or the full window — is always present on
    an axis and read per-row. ``DEFAULT_TIME_INTERVAL`` is only a safety
    fallback for the (unobserved) case where it is absent.

    :raises KeyError: if ``category`` is not a gauge category.
    """
    spec = v11_spec(category)
    measure_dim = spec["measure_dim"]
    entity_dim = spec["entity_dim"]
    time_dim = spec["time_dim"]

    axes = sorted(cellset.get("Axes", []), key=lambda a: a["Ordinal"])
    sizes = [len(axis["Tuples"]) for axis in axes]
    if any(size == 0 for size in sizes):
        return []

    axis_tuples = _axis_tuples_by_dimension(axes)
    cells = cellset.get("Cells", [])
    _warn_if_truncated(category, sizes, cells)

    records: List[Dict] = []
    for coord in itertools.product(*(range(size) for size in sizes)):
        index = _cell_index(coord, sizes)

        dims: Dict[str, str] = {}
        for axis_i, member_i in enumerate(coord):
            dims.update(axis_tuples[axis_i][member_i])

        if measure_dim not in dims:
            raise KeyError(
                f"measure dimension '{measure_dim}' missing from a '{category}' cellset tuple; "
                f"got dimensions {sorted(dims)}"
            )
        metric, native_name, unit = normalize_v11_measure(category, dims[measure_dim])
        value = cells[index].get("Value") if index < len(cells) else None

        record: Dict = {"Category": category}
        if entity_dim:
            record["CubeName"] = dims.get(entity_dim)
        record["Metric"] = metric
        record["NativeName"] = native_name
        record["Value"] = value
        record["Unit"] = unit
        record["ReplicaID"] = 0
        record["TimeInterval"] = dims.get(time_dim, DEFAULT_TIME_INTERVAL)
        record["Timestamp"] = None
        records.append(record)

    return records

shape_v12_gauge_records(raw, category)

Shape raw v12 Metrics() rows into gauge-long records for category.

Selects only the rows whose Name belongs to the category (cube_* for by_cube, replica_* for by_server). On v12 the metric name is canonical verbatim, so NativeName == Metric. CubeName is included for by_cube only.

Raises:

Type Description
KeyError

if category is not a gauge category.

Source code in TM1py/Services/MetricService.py
def shape_v12_gauge_records(raw: List[Dict], category: str) -> List[Dict]:
    """Shape raw v12 ``Metrics()`` rows into gauge-long records for ``category``.

    Selects only the rows whose ``Name`` belongs to the category
    (``cube_*`` for ``by_cube``, ``replica_*`` for ``by_server``). On v12 the
    metric name is canonical verbatim, so ``NativeName == Metric``. ``CubeName``
    is included for ``by_cube`` only.

    :raises KeyError: if ``category`` is not a gauge category.
    """
    try:
        prefix = _V12_PREFIX[category]
    except KeyError:
        raise KeyError(f"Unknown gauge Stats Category: '{category}'")

    include_cube = category == CATEGORY_BY_CUBE

    records: List[Dict] = []
    for row in raw:
        name = row.get("Name", "")
        if not name.startswith(prefix):
            continue

        record: Dict = {"Category": category}
        if include_cube:
            record["CubeName"] = row.get("CubeName")
        record["Metric"] = name
        record["NativeName"] = name
        record["Value"] = row.get("Value")
        record["Unit"] = row.get("Unit")
        record["ReplicaID"] = row.get("ReplicaID", 0)
        record["TimeInterval"] = DEFAULT_TIME_INTERVAL
        record["Timestamp"] = row.get("Timestamp")
        record["DatabaseName"] = row.get("DatabaseName")
        record["DatabaseID"] = row.get("DatabaseID")
        records.append(record)

    return records

v11_entity_spec(category)

Return a copy of the v11 cube/dimension spec for an entity category.

Raises:

Type Description
KeyError

if category is not an entity category.

Source code in TM1py/Services/MetricService.py
def v11_entity_spec(category: str) -> dict:
    """Return a copy of the v11 cube/dimension spec for an entity category.

    :raises KeyError: if ``category`` is not an entity category.
    """
    try:
        return dict(_ENTITY_SPEC[category])
    except KeyError:
        raise KeyError(f"Unknown entity Stats Category: '{category}'")

v11_measure_names(category)

Return the v11 native measure names for a gauge category, in canonical order.

Used by the v11 MDX builder to select exactly the measures TM1py maps.

Source code in TM1py/Services/MetricService.py
def v11_measure_names(category: str) -> List[str]:
    """Return the v11 native measure names for a gauge category, in canonical order.

    Used by the v11 MDX builder to select exactly the measures TM1py maps.
    """
    try:
        return list(_TABLES[category].keys())
    except KeyError:
        raise KeyError(f"Unknown gauge Stats Category: '{category}'")

v11_spec(category)

Return a copy of the v11 cube/dimension spec for a gauge category.

Used by the record shaper to map cellset members to their dimension roles.

Raises:

Type Description
KeyError

if category is not a gauge category.

Source code in TM1py/Services/MetricService.py
def v11_spec(category: str) -> dict:
    """Return a copy of the v11 cube/dimension spec for a gauge category.

    Used by the record shaper to map cellset members to their dimension roles.

    :raises KeyError: if ``category`` is not a gauge category.
    """
    try:
        return dict(_SPEC[category])
    except KeyError:
        raise KeyError(f"Unknown gauge Stats Category: '{category}'")