Source code for bosdyn.client.log_status

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Client for the log-status service.

This allows client code to start, extend or terminate experiment logs and start retro logs.
"""

import collections

import bosdyn.util
from bosdyn.api.log_status import log_status_pb2 as log_status
from bosdyn.api.log_status import log_status_service_pb2_grpc as log_status_service
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory, error_pair,
                                  handle_common_header_errors, handle_unset_status_error)
from bosdyn.client.exceptions import ResponseError


[docs]class LogStatusResponseError(ResponseError): """Error in Log Status RPC"""
[docs]class ExperimentAlreadyRunningError(LogStatusResponseError): """The log status request could not be started, an experiment is already running."""
[docs]class RequestIdDoesNotExistError(LogStatusResponseError): """The provided request id does not exist or is invalid."""
[docs]class InactiveLogError(LogStatusResponseError): """The log has already terminated and cannot be updated."""
[docs]class ConcurrencyLimitReachedError(LogStatusResponseError): """The limit of concurrent retro logs has be reached, a new log cannot be started."""
[docs]class LogStatusClient(BaseClient): """A client for interacting with robot logs.""" # Typical name of the service in the robot's directory listing. default_service_name = 'log-status' # gRPC service proto definition implemented by this service service_type = 'bosdyn.api.log_status.LogStatusService' def __init__(self): super(LogStatusClient, self) \ .__init__(log_status_service.LogStatusServiceStub)
[docs] def get_log_status(self, id, **kwargs): """Synchronously get status of a log. Args: id (string): Id of log to retrieve Raises: RequestIdDoesNotExistError: Id was not found on robot """ req = log_status.GetLogStatusRequest() req.id = id return self.call(self._stub.GetLogStatus, req, error_from_response=get_log_status_error, copy_request=False, **kwargs)
[docs] def get_log_status_async(self, id, **kwargs): """Asynchronously get status of a log.""" req = log_status.GetLogStatusRequest() req.id = id return self.call_async(self._stub.GetLogStatus, req, error_from_response=get_log_status_error, copy_request=False, **kwargs)
[docs] def get_active_log_statuses(self, **kwargs): """Synchronously retrieve status of active logs.""" req = log_status.GetActiveLogStatusesRequest() return self.call(self._stub.GetActiveLogStatuses, req, error_from_response=get_active_log_statuses_error, copy_request=False, **kwargs)
[docs] def get_active_log_statuses_async(self, **kwargs): """Asynchronously retrieve status of active logs.""" req = log_status.GetActiveLogStatusesRequest() return self.call_async(self._stub.GetActiveLogStatuses, req, error_from_response=get_active_log_statuses_error, copy_request=False, **kwargs)
[docs] def start_experiment_log(self, seconds, **kwargs): """Start an experiment log, to run for a specified duration. Args: seconds: Number of seconds to gather data for the experiment log Raises: ExperimentAlreadyRunningError: Only 1 experiment log can be run at a time """ req = log_status.StartExperimentLogRequest() req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds)) return self.call(self._stub.StartExperimentLog, req, error_from_response=start_experiment_log_error, copy_request=False, **kwargs)
[docs] def start_experiment_log_async(self, seconds, **kwargs): """Start an experiment log, to run for a specified duration.""" req = log_status.StartExperimentLogRequest() req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds)) return self.call_async(self._stub.StartExperimentLog, req, error_from_response=start_experiment_log_error, copy_request=False, **kwargs)
[docs] def start_retro_log(self, seconds, **kwargs): """Start a retro log, to run for a specified duration. Args: seconds: Number of seconds to gather data for the retro log Raises: ExperimentAlreadyRunningError: Retro logs cannot be run while an experiment log is running ConcurrencyLimitReachedError: Maximum number of retro logs are already running, another cannot be started """ req = log_status.StartRetroLogRequest() req.past_duration.CopyFrom(bosdyn.util.seconds_to_duration(-seconds)) return self.call(self._stub.StartRetroLog, req, error_from_response=start_retro_log_error, copy_request=False, **kwargs)
[docs] def start_retro_log_async(self, seconds, **kwargs): """Start a retro log, to run for a specified duration.""" req = log_status.StartRetroLogRequest() req.past_duration.CopyFrom(bosdyn.util.seconds_to_duration(-seconds)) return self.call_async(self._stub.StartRetroLog, req, error_from_response=start_retro_log_error, copy_request=False, **kwargs)
[docs] def update_experiment(self, id, seconds, **kwargs): """Update an experiment log to run for a specified duration. Args: id (string): Id of log to retrieve seconds (float): Number of seconds to gather data for the experiment log Raises: RequestIdDoesNotExistError: Id was not found on robot InactiveLogError: Cannot update log, it is already terminated """ req = log_status.UpdateExperimentLogRequest() req.id = id req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds)) return self.call(self._stub.UpdateExperimentLog, req, error_from_response=update_experiment_log_error, copy_request=False, **kwargs)
[docs] def update_experiment_async(self, id, seconds, **kwargs): """Update an experiment log to run for a specified duration.""" req = log_status.UpdateExperimentLogRequest() req.id = id req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds)) return self.call_async(self._stub.UpdateExperimentLog, req, error_from_response=update_experiment_log_error, copy_request=False, **kwargs)
[docs] def terminate_log(self, id, **kwargs): """Terminate an experiment log. Args: id (string): Id of log to terminate Raises: RequestIdDoesNotExistError: Id was not found on robot """ req = log_status.TerminateLogRequest() req.id = id return self.call(self._stub.TerminateLog, req, error_from_response=terminate_log_error, copy_request=False, **kwargs)
[docs] def terminate_log_async(self, id, **kwargs): """Terminate an experiment log.""" req = log_status.TerminateLogRequest() req.id = id return self.call_async(self._stub.TerminateLog, req, error_from_response=terminate_log_error, copy_request=False, **kwargs)
_GET_LOG_STATUS_STATUS_TO_ERROR = \ collections.defaultdict(lambda: (LogStatusResponseError, None)) _GET_LOG_STATUS_STATUS_TO_ERROR.update({ log_status.GetLogStatusResponse.STATUS_OK: (None, None), log_status.GetLogStatusResponse.STATUS_ID_NOT_FOUND: error_pair(RequestIdDoesNotExistError), }) _GET_ACTIVE_LOG_STATUSES_STATUS_TO_ERROR = \ collections.defaultdict(lambda: (LogStatusResponseError, None)) _GET_ACTIVE_LOG_STATUSES_STATUS_TO_ERROR.update({ log_status.GetActiveLogStatusesResponse.STATUS_OK: (None, None), }) _START_EXPERIMENT_LOG_STATUS_TO_ERROR = \ collections.defaultdict(lambda: (LogStatusResponseError, None)) _START_EXPERIMENT_LOG_STATUS_TO_ERROR.update({ log_status.StartExperimentLogResponse.STATUS_OK: (None, None), log_status.StartExperimentLogResponse.STATUS_EXPERIMENT_LOG_RUNNING: error_pair(ExperimentAlreadyRunningError), }) _START_RETRO_LOG_STATUS_TO_ERROR = \ collections.defaultdict(lambda: (LogStatusResponseError, None)) _START_RETRO_LOG_STATUS_TO_ERROR.update({ log_status.StartRetroLogResponse.STATUS_OK: (None, None), log_status.StartRetroLogResponse.STATUS_EXPERIMENT_LOG_RUNNING: error_pair(ExperimentAlreadyRunningError), log_status.StartRetroLogResponse.STATUS_CONCURRENCY_LIMIT_REACHED: error_pair(ConcurrencyLimitReachedError), }) _UPDATE_EXPERIMENT_LOG_STATUS_TO_ERROR = \ collections.defaultdict(lambda: (LogStatusResponseError, None)) _UPDATE_EXPERIMENT_LOG_STATUS_TO_ERROR.update({ log_status.UpdateExperimentLogResponse.STATUS_OK: (None, None), log_status.UpdateExperimentLogResponse.STATUS_ID_NOT_FOUND: error_pair(RequestIdDoesNotExistError), log_status.UpdateExperimentLogResponse.STATUS_LOG_TERMINATED: error_pair(InactiveLogError), }) _TERMINATE_LOG_STATUS_TO_ERROR = \ collections.defaultdict(lambda: (LogStatusResponseError, None)) _TERMINATE_LOG_STATUS_TO_ERROR.update({ log_status.TerminateLogResponse.STATUS_OK: (None, None), log_status.TerminateLogResponse.STATUS_ID_NOT_FOUND: error_pair(RequestIdDoesNotExistError), })
[docs]@handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def get_log_status_error(response): """Return a custom exception based on the GetLogStatus response, None if no error.""" return error_factory(response, response.status, status_to_string=log_status.GetLogStatusResponse.Status.Name, status_to_error=_GET_LOG_STATUS_STATUS_TO_ERROR)
[docs]@handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def get_active_log_statuses_error(response): """Return a custom exception based on the GetActiveLogStatuses response, None if no error.""" return error_factory(response, response.status, status_to_string=log_status.GetActiveLogStatusesResponse.Status.Name, status_to_error=_GET_ACTIVE_LOG_STATUSES_STATUS_TO_ERROR)
[docs]@handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def start_experiment_log_error(response): """Return a custom exception based on the StartExperimentLog response, None if no error.""" return error_factory(response, response.status, status_to_string=log_status.StartExperimentLogResponse.Status.Name, status_to_error=_START_EXPERIMENT_LOG_STATUS_TO_ERROR)
[docs]@handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def start_retro_log_error(response): """Return a custom exception based on the StartRetroLog response, None if no error.""" return error_factory(response, response.status, status_to_string=log_status.StartRetroLogResponse.Status.Name, status_to_error=_START_RETRO_LOG_STATUS_TO_ERROR)
[docs]@handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def update_experiment_log_error(response): """Return a custom exception based on the UpdateExperimentLog response, None if no error.""" return error_factory(response, response.status, status_to_string=log_status.UpdateExperimentLogResponse.Status.Name, status_to_error=_UPDATE_EXPERIMENT_LOG_STATUS_TO_ERROR)
[docs]@handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def terminate_log_error(response): """Return a custom exception based on the TerminateLog response, None if no error.""" return error_factory(response, response.status, status_to_string=log_status.TerminateLogResponse.Status.Name, status_to_error=_TERMINATE_LOG_STATUS_TO_ERROR)