Source code for bosdyn.client.audio_visual_helpers

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import logging
import threading
import time
from concurrent.futures import Future

import bosdyn.client
from bosdyn.api import audio_visual_pb2
from bosdyn.client.audio_visual import (AudioVisualClient, BehaviorExpiredError, DoesNotExistError,
                                        InvalidClientError)

_LOGGER = logging.getLogger(__name__)


[docs]class AudioVisualHelper: """Context manager that runs an AV behavior for the duration of the context. Use as follows: .. code-block:: python with AudioVisualHelper(robot, behavior_name, refresh_rate): # Lights and sounds will play here # Lights and sounds will stop here. Args: robot: Robot object for creating clients behavior_name: Name of the desired behavior to run refresh_rate: What rate to refresh the behavior (seconds) """ def __init__(self, robot, behavior_name, refresh_rate, logger=None): self.robot = robot self.logger = logger self.behavior_name = behavior_name self.refresh_rate = refresh_rate self.av_client = None self._behavior_running_fut = None try: self.av_client = robot.ensure_client(AudioVisualClient.default_service_name) except: _LOGGER.warning("Could not initialize AV client, skipping AudioVisualHelper.") self.av_thread = None self.stop_event = threading.Event() def __enter__(self): self._behavior_running_fut = Future() self._behavior_running_fut.set_running_or_notify_cancel() if self.av_client: self.av_thread = threading.Thread(target=self._run_behavior_thread, args=()) self.av_thread.start() else: self._behavior_running_fut.set_result(False) return self._behavior_running_fut def __exit__(self, exc_type, exc_value, tb): if self.av_thread: self.stop_event.set() self.av_thread.join() def _run_behavior_thread(self): # Check if the robot has AV hardware if not self.robot.get_cached_hardware_hardware_configuration().has_audio_visual_system: self._behavior_running_fut.set_result(False) return def set_future_result(result): if not self._behavior_running_fut.done(): self._behavior_running_fut.set_result(result) def set_future_exception(exc): if not self._behavior_running_fut.done(): self._behavior_running_fut.set_exception(exc) # Run the AV behavior until the stop_event is triggered while not self.stop_event.wait(self.refresh_rate): try: end_time_secs = time.time() + self.refresh_rate + 0.10 # add 100ms margin result = self.av_client.run_behavior(self.behavior_name, end_time_secs) set_future_result( result.run_result == audio_visual_pb2.RunBehaviorResponse.RESULT_BEHAVIOR_RUN) except DoesNotExistError as exc: set_future_exception(exc) _LOGGER.exception(f'Audio Visual Behavior {self.behavior_name} does not exist.') return # Since the behavior doesn't exist, we can stop trying to run it except BehaviorExpiredError as exc: set_future_exception(exc) _LOGGER.warning('Behavior was expired when received by client.') except bosdyn.client.PersistentRpcError as exc: set_future_exception(exc) _LOGGER.exception('Failed to run behavior. Quitting AudioVisualHelper.') return # A persistent error means we can't talk to the AV service, we can stop. except bosdyn.client.RpcError: _LOGGER.exception('Failed to run behavior. Retrying.') except bosdyn.client.Error as exc: set_future_exception(exc) _LOGGER.exception('Unknown exception caught, quitting AudioVisualHelper.') return try: self.av_client.stop_behavior(self.behavior_name) except InvalidClientError: _LOGGER.warning('Failed to stop behavior, run by a different client.')