Source code for bosdyn.client.audio_visual

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import collections
import math

from bosdyn.api import audio_visual_pb2, audio_visual_service_pb2_grpc
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory, error_pair,
                                  handle_common_header_errors, handle_unset_status_error)

from .exceptions import Error as BaseError
from .exceptions import ResponseError


[docs]class AudioVisualResponseError(ResponseError): """General class of errors for AudioVisual service."""
[docs]class Error(BaseError): """Base class for non-response errors in this module."""
[docs]class NoTimeSyncError(BaseError): """Client has not done timesync with robot."""
[docs]class DoesNotExistError(AudioVisualResponseError): """The specified behavior does not exist."""
[docs]class PermanentBehaviorError(AudioVisualResponseError): """Permanent behaviors cannot be modified or deleted."""
[docs]class BehaviorExpiredError(AudioVisualResponseError): """The specified end_time has already expired."""
[docs]class InvalidBehaviorError(AudioVisualResponseError): """The request contained a behavior with invalid fields."""
[docs]class InvalidClientError(AudioVisualResponseError): """The behavior cannot be stopped because a different client is running it."""
[docs]class AudioVisualClient(BaseClient): """Client for calling the Audio Visual Service.""" default_service_name = 'audio-visual' service_type = 'bosdyn.api.AudioVisualService' def __init__(self): super(AudioVisualClient, self).__init__(audio_visual_service_pb2_grpc.AudioVisualServiceStub) self.timesync_endpoint = None
[docs] def update_from(self, other): """Update instance from another object. Args: other: The object where to copy from. """ super(AudioVisualClient, self).update_from(other) # Grab a timesync endpoint if it is available. try: self.timesync_endpoint = other.time_sync.endpoint except AttributeError: pass # other doesn't have a time_sync accessor
[docs] def run_behavior(self, name, end_time_secs, restart=False, timesync_endpoint=None, **kwargs): """Run a behavior on the robot. Args: name: The name of the behavior to run. end_time_secs: The time that this behavior should stop. restart: If this behavior is already running, should we restart it from the beginning. timesync_endpoint: Timesync endpoint. Raises: RpcError: Problem communicating with the robot. DoesNotExistError: The behavior name specified has not been added to the system. BehaviorExpiredError: The specified end_time has already expired. NoTimeSyncError: Time sync has not been established with the robot yet. """ end_time = self._timestamp_to_robot_time(end_time_secs, timesync_endpoint) req = audio_visual_pb2.RunBehaviorRequest(name=name, end_time=end_time, restart=restart) return self.call(self._stub.RunBehavior, req, error_from_response=_run_behavior_error, copy_request=False, **kwargs)
[docs] def run_behavior_async(self, name, end_time_secs, restart=False, timesync_endpoint=None, **kwargs): """Async version of run_behavior(). Args: name: The name of the behavior to run. end_time_secs: The time that this behavior should stop. restart: If this behavior is already running, should we restart it from the beginning. timesync_endpoint: Timesync endpoint. Raises: RpcError: Problem communicating with the robot. DoesNotExistError: The behavior name specified has not been added to the system. BehaviorExpiredError: The specified end_time has already expired. NoTimeSyncError: Time sync has not been established with the robot yet. """ end_time = self._timestamp_to_robot_time(end_time_secs, timesync_endpoint) req = audio_visual_pb2.RunBehaviorRequest(name=name, end_time=end_time, restart=restart) return self.call_async(self._stub.RunBehavior, req, error_from_response=_run_behavior_error, copy_request=False, **kwargs)
[docs] def stop_behavior(self, name, **kwargs): """Stop a behavior that is currently running. Args: name: The name of the behavior to stop. Raises: RpcError: Problem communicating with the robot. InvalidClientError: A different client is running this behavior.""" req = audio_visual_pb2.StopBehaviorRequest(behavior_name=name) return self.call(self._stub.StopBehavior, req, error_from_response=_stop_behavior_error, copy_request=False, **kwargs)
[docs] def stop_behavior_async(self, name, **kwargs): """Async version of stop_behavior(). Args: name: The name of the behavior to stop. Raises: RpcError: Problem communicating with the robot. InvalidClientError: A different client is running this behavior.""" req = audio_visual_pb2.StopBehaviorRequest(behavior_name=name) return self.call_async(self._stub.StopBehavior, req, error_from_response=_stop_behavior_error, copy_request=False, **kwargs)
[docs] def list_behaviors(self, **kwargs): """List all currently added AudioVisualBehaviors. Returns: A list of all LiveAudioVisualBehavior protos. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.ListBehaviorsRequest() return self.call(self._stub.ListBehaviors, req, value_from_response=_get_behavior_list, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def list_behaviors_async(self, **kwargs): """Async version of list_behaviors(). Returns: A list of all LiveAudioVisualBehavior protos. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.ListBehaviorsRequest() return self.call_async(self._stub.ListBehaviors, req, value_from_response=_get_behavior_list, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def get_system_params(self, **kwargs): """Get the current system params. Returns: An AudioVisualSystemParams proto containing the current system param values. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.GetSystemParamsRequest() return self.call(self._stub.GetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def get_system_params_async(self, **kwargs): """Async version of get_system_params(). Returns: An AudioVisualSystemParams proto containing the current system param values. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.GetSystemParamsRequest() return self.call_async(self._stub.GetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def set_system_params(self, enabled=None, max_brightness=None, buzzer_max_volume=None, speaker_max_volume=None, normal_color_association=None, warning_color_association=None, danger_color_association=None, **kwargs): """Set the system params. Args: enabled: [optional] System is enabled or disabled (boolean). max_brightness: [optional] New max_brightness value [0, 1]. buzzer_max_volume: [optional] New buzzer_max_volume value [0, 1]. speaker_max_volume: [optional] New speaker_max_volume value [0, 1]. normal_color_association: [optional] The color to associate with the normal color preset. warning_color_association: [optional] The color to associate with the warning color preset. danger_color_association: [optional] The color to associate with the danger color preset. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.SetSystemParamsRequest() if (enabled is not None): req.enabled.value = enabled if (max_brightness is not None): req.max_brightness.value = max_brightness if (buzzer_max_volume is not None): req.buzzer_max_volume.value = buzzer_max_volume if (speaker_max_volume is not None): req.speaker_max_volume.value = speaker_max_volume if (normal_color_association is not None): req.normal_color_association.CopyFrom(normal_color_association) if (warning_color_association is not None): req.warning_color_association.CopyFrom(warning_color_association) if (danger_color_association is not None): req.danger_color_association.CopyFrom(danger_color_association) return self.call(self._stub.SetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def set_system_params_async(self, enabled=None, max_brightness=None, buzzer_max_volume=None, speaker_max_volume=None, normal_color_association=None, warning_color_association=None, danger_color_association=None, **kwargs): """Async version of set_system_params(). Args: enabled: [optional] System is enabled or disabled (boolean). max_brightness: [optional] New max_brightness value [0, 1]. buzzer_max_volume: [optional] New buzzer_max_volume value [0, 1]. speaker_max_volume: [optional] New speaker_max_volume value [0, 1]. normal_color_association: [optional] The color to associate with the normal color preset. warning_color_association: [optional] The color to associate with the warning color preset. danger_color_association: [optional] The color to associate with the danger color preset. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.SetSystemParamsRequest() if (enabled is not None): req.enabled.value = enabled if (max_brightness is not None): req.max_brightness.value = max_brightness if (buzzer_max_volume is not None): req.buzzer_max_volume.value = buzzer_max_volume if (speaker_max_volume is not None): req.speaker_max_volume.value = speaker_max_volume if (normal_color_association is not None): req.normal_color_association.CopyFrom(normal_color_association) if (warning_color_association is not None): req.warning_color_association.CopyFrom(warning_color_association) if (danger_color_association is not None): req.danger_color_association.CopyFrom(danger_color_association) return self.call_async(self._stub.SetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
def _timestamp_to_robot_time(self, timestamp, timesync_endpoint=None): # Create a time converter to convert timestamp to robot time time_converter = None if (timesync_endpoint): time_converter = timesync_endpoint.get_robot_time_converter() elif (self.timesync_endpoint): time_converter = self.timesync_endpoint.get_robot_time_converter() else: raise NoTimeSyncError("No timesync endpoint was passed to audio visual client.") return time_converter.robot_timestamp_from_local_secs(timestamp)
def _get_behavior_list(response): return response.behaviors def _get_live_behavior(response): return response.live_behavior _AUDIO_VISUAL_RUN_BEHAVIOR_STATUS_TO_ERROR = collections.defaultdict( lambda: (AudioVisualResponseError, None)) _AUDIO_VISUAL_RUN_BEHAVIOR_STATUS_TO_ERROR.update({ audio_visual_pb2.RunBehaviorResponse.STATUS_SUCCESS: (None, None), audio_visual_pb2.RunBehaviorResponse.STATUS_DOES_NOT_EXIST: error_pair(DoesNotExistError), audio_visual_pb2.RunBehaviorResponse.STATUS_EXPIRED: error_pair(BehaviorExpiredError), }) _AUDIO_VISUAL_STOP_BEHAVIOR_STATUS_TO_ERROR = collections.defaultdict( lambda: (AudioVisualResponseError, None)) _AUDIO_VISUAL_STOP_BEHAVIOR_STATUS_TO_ERROR.update({ audio_visual_pb2.StopBehaviorResponse.STATUS_SUCCESS: (None, None), audio_visual_pb2.StopBehaviorResponse.STATUS_INVALID_CLIENT: error_pair(InvalidClientError) }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _run_behavior_error(response): """RunBehaviorResponse response to exception.""" return error_factory(response, response.status, status_to_string=audio_visual_pb2.RunBehaviorResponse.Status.Name, status_to_error=_AUDIO_VISUAL_RUN_BEHAVIOR_STATUS_TO_ERROR) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _stop_behavior_error(response): """StopBehaviorResponse response to exception.""" return error_factory(response, response.status, status_to_string=audio_visual_pb2.StopBehaviorResponse.Status.Name, status_to_error=_AUDIO_VISUAL_STOP_BEHAVIOR_STATUS_TO_ERROR)