# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Clients for the metrics logging service."""
import collections
import threading
from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp
from bosdyn.api.metrics_logging import (absolute_metrics_pb2, metrics_logging_robot_pb2,
metrics_logging_robot_service_pb2_grpc)
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory,
handle_common_header_errors, handle_unset_status_error)
from bosdyn.client.data_buffer import make_parameter
from bosdyn.client.exceptions import ResponseError
[docs]class MissingKeysError(ResponseError):
"""Metrics requested from the metrics service did not exist."""
[docs]class UnableToOptOutError(ResponseError):
"""Unable to opt-out of metrics logging due to invalid license permissions."""
[docs]class MetricsLoggingClient(BaseClient):
"""A client for the metrics logging service on the robot.
"""
default_service_name = 'metrics-logging'
service_type = 'bosdyn.api.metrics_logging.MetricsLoggingRobotService'
def __init__(self):
super(MetricsLoggingClient,
self).__init__(metrics_logging_robot_service_pb2_grpc.MetricsLoggingRobotServiceStub)
[docs] def get_metrics(self, keys=None, include_events=False, **kwargs):
"""Get metrics from the robot.
Args:
keys(List[strings]): A list of strings representing the keys for metrics that should be returned.
include_events(bool): Whether events should be included in the response.
Returns:
The GetMetricsResponse response.
Raises:
RpcError: Problem communicating with the robot.
"""
req = self._get_metrics_request(keys, include_events)
return self.call(self._stub.GetMetrics, req, None, _get_metrics_error_from_response,
copy_request=False, **kwargs)
[docs] def get_metrics_async(self, keys=None, include_events=False, **kwargs):
"""Async version of get_metrics()."""
req = self._get_metrics_request(keys, include_events)
return self.call_async(self._stub.GetMetrics, req, None, _get_metrics_error_from_response,
copy_request=False, **kwargs)
[docs] def get_store_sequence_range(self, **kwargs):
"""Determine the range of sequence numbers currently being used by the metrics system's store.
Returns:
A list where the first number represents the starting index (inclusive), and the second number represents
the final index (inclusive) for the entries in the metrics store.
Raises:
RpcError: Problem communicating with the robot.
"""
req = metrics_logging_robot_pb2.GetStoreSequenceRangeRequest()
return self.call(self._stub.GetStoreSequenceRange, req,
self._store_sequence_range_from_response, common_header_errors,
copy_request=False, **kwargs)
[docs] def get_store_sequence_range_async(self, **kwargs):
"""Async version of get_store_sequence_range()."""
req = metrics_logging_robot_pb2.GetStoreSequenceRangeRequest()
return self.call_async(self._stub.GetStoreSequenceRange, req,
self._store_sequence_range_from_response, common_header_errors,
copy_request=False, **kwargs)
[docs] def get_absolute_metric_snapshot(self, sequence_numbers, **kwargs):
"""Get absolute metric snapshots for specific sequence numbers' entries.
Args:
sequence_numbers(List(int)): The list of sequence numbers whose entries should be returned as
absolute metric snapshots.
Returns:
A list of signed_proto_pb2.SignedProto(). Each signed proto will contain the serialized
absolute_metric_pb2.AbsoluteMetricsSnapshot() for the requested sequence number's timestamp.
The robot will exclude snapshots if the sequence number does not exist, or the response will exceed
the grpc limit.
Raises:
RpcError: Problem communicating with the robot.
"""
req = metrics_logging_robot_pb2.GetAbsoluteMetricSnapshotRequest(
sequence_numbers=sequence_numbers)
return self.call(self._stub.GetAbsoluteMetricSnapshot, req,
self._get_absolute_metric_snapshots_from_response, common_header_errors,
copy_request=False, **kwargs)
[docs] def get_absolute_metric_snapshot_async(self, sequence_numbers, **kwargs):
"""Async version of get_absolute_metric_snapshot()."""
req = metrics_logging_robot_pb2.GetAbsoluteMetricSnapshotRequest(
sequence_numbers=sequence_numbers)
return self.call_async(self._stub.GetAbsoluteMetricSnapshot, req,
self._get_absolute_metric_snapshots_from_response,
common_header_errors, copy_request=False, **kwargs)
@staticmethod
def _get_metrics_request(keys, include_events):
return metrics_logging_robot_pb2.GetMetricsRequest(keys=keys, include_events=include_events)
@staticmethod
def _store_sequence_range_from_response(response):
return [response.first_sequence_number, response.last_sequence_number]
@staticmethod
def _get_absolute_metric_snapshots_from_response(response):
return response.snapshots
[docs]def make_parameter_update(label, value, is_incremental, units="", notes=""):
"""Create a parameter update proto from the label, param value, and if it is incremental."""
param_update = absolute_metrics_pb2.ParameterUpdate(incremental=is_incremental)
param = make_parameter(label, value, units, notes)
if param is not None:
param_update.parameter.CopyFrom(param)
return param_update
@handle_common_header_errors
def _get_metrics_error_from_response(response):
"""Return an exception based on response from GetMetrics RPC, None if no error."""
# This could return a MissingKeysError for invalid keys, but at the time of writing
# this would cause it to throw errors on unreported metrics (not just invalid).
# Eg, this consistently would throw errors on a new robot with no metrics reported yet.
return None