# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client implementation of the AutoReturn service."""
import collections
from bosdyn.api.auto_return import auto_return_pb2, auto_return_service_pb2_grpc
from bosdyn.client.common import (BaseClient, error_factory, error_pair,
handle_common_header_errors, handle_unset_status_error)
from bosdyn.client.exceptions import ResponseError
[docs]class AutoReturnResponseError(ResponseError):
"""Error in Auto Return RPC"""
[docs]class InvalidParameterError(AutoReturnResponseError):
"""One or more parameters were invalid."""
[docs]class AutoReturnClient(BaseClient):
"""A client for configuring automatic AutoReturn behavior."""
default_service_name = 'auto-return'
service_type = 'bosdyn.api.auto_return.AutoReturnService'
def __init__(self):
super(AutoReturnClient, self).__init__(auto_return_service_pb2_grpc.AutoReturnServiceStub)
self._timesync_endpoint = None
[docs] def get_configuration(self, **kwargs):
"""Get the configuration of the AutoReturn system.
Raises:
RpcError: Problem communicating with the service.
Returns:
The bosdyn.api.auto_return_pb2.GetConfigurationResponse.
"""
request = auto_return_pb2.GetConfigurationRequest()
return self.call(self._stub.GetConfiguration, request, None, None, copy_request=False,
**kwargs)
[docs] def get_configuration_async(self, **kwargs):
"""Async version of the get_configuration() RPC."""
request = auto_return_pb2.GetConfigurationRequest()
return self.call_async(self._stub.GetConfiguration, request, None, None, copy_request=False,
**kwargs)
[docs] def start(self, params=None, leases=[], **kwargs):
"""Start AutoReturn now.
Raises:
InvalidParameterError: An invalid request was received by the service.
RpcError: Problem communicating with the service.
Returns:
The bosdyn.api.auto_return_pb2.StartResponse.
"""
request = self._start_request(params, leases)
return self.call(self._stub.Start, request, None, start_error, copy_request=False, **kwargs)
[docs] def start_async(self, params=None, leases=[], **kwargs):
"""Async version of the start() RPC."""
request = self._start_request(params, leases)
return self.call_async(self._stub.Start, request, None, start_error, copy_request=False,
**kwargs)
@staticmethod
def _configure_request(params, leases, clear_buffer):
request = auto_return_pb2.ConfigureRequest(params=params, clear_buffer=clear_buffer)
for lease in leases:
request.leases.add().CopyFrom(lease.lease_proto)
return request
@staticmethod
def _start_request(params, leases):
request = auto_return_pb2.StartRequest(params=params)
for lease in leases:
request.leases.add().CopyFrom(lease.lease_proto)
return request
_CONFIGURE_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_CONFIGURE_STATUS_TO_ERROR.update(
{auto_return_pb2.ConfigureResponse.STATUS_INVALID_PARAMS: error_pair(InvalidParameterError)})
_START_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_START_STATUS_TO_ERROR.update(
{auto_return_pb2.StartResponse.STATUS_INVALID_PARAMS: error_pair(InvalidParameterError)})
[docs]@handle_common_header_errors
def start_error(response):
"""Return a custom exception based on the Start response, None if no error."""
return error_factory(response, response.status,
status_to_string=auto_return_pb2.StartResponse.Status.Name,
status_to_error=_START_STATUS_TO_ERROR)