Source code for bosdyn.client.network_compute_bridge_client

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""For clients to the network compute bridge service."""

import collections

from bosdyn.api import (network_compute_bridge_pb2, network_compute_bridge_service_pb2,
                        network_compute_bridge_service_pb2_grpc)
from bosdyn.client.common import (BaseClient, error_factory, error_pair,
                                  handle_common_header_errors, handle_custom_params_errors,
                                  handle_lease_use_result_errors, handle_unset_status_error)
from bosdyn.client.exceptions import Error, InternalServerError, ResponseError, UnsetStatusError


[docs]class ExternalServiceNotFoundError(ResponseError): """The requested service for external computation was not found in the directory."""
[docs]class ExternalServerError(ResponseError): """The call to the external server did not complete successfully."""
[docs]class NetworkComputeRotationError(ResponseError): """The robot failed to rotate the image as requested."""
[docs]class NetworkComputeAnalysisFailedError(ResponseError): """The model failed to analyze the set of input images, but a retry might work."""
[docs]class NetworkComputeBridgeClient(BaseClient): """Client to either the NetworkComputeBridgeService or the NetworkComputeBridgeWorkerService.""" default_service_name = 'network-compute-bridge' service_type = 'bosdyn.api.NetworkComputeBridge' def __init__(self): super(NetworkComputeBridgeClient, self).__init__(network_compute_bridge_service_pb2_grpc.NetworkComputeBridgeStub)
[docs] def list_available_models(self, service_name, **kwargs): """List all available models that the service knows. Args: service_name (str): The service to query for models. Returns: The full ListAvailableModelsResponse, which contains any models the service or worker service advertise. Raises: RpcError: Problem communicating with the robot. ExternalServiceNotFoundError: The network compute bridge worker service was not found in the robot's directory. ExternalServerError: Either the service or worker service threw an error when responding with the set of all models. """ request = network_compute_bridge_pb2.ListAvailableModelsRequest() request.server_config.service_name = service_name return self.list_available_models_command(request, **kwargs)
[docs] def list_available_models_async(self, service_name, **kwargs): """Async version of list_available_models().""" request = network_compute_bridge_pb2.ListAvailableModelsRequest() request.server_config.service_name = service_name return self.list_available_models_command_async(request, **kwargs)
[docs] def list_available_models_command(self, list_request, **kwargs): """List all available models that the service knows. Args: list_request (ListAvailableModelsRequest): The request to list all models. Returns: The full ListAvailableModelsResponse, which contains any models the service or worker service advertise. Raises: RpcError: Problem communicating with the robot. ExternalServiceNotFoundError: The network compute bridge worker service was not found in the robot's directory. ExternalServerError: Either the service or worker service threw an error when responding with the set of all models. """ return self.call(self._stub.ListAvailableModels, list_request, None, _list_available_models_error, **kwargs)
[docs] def list_available_models_command_async(self, list_request, **kwargs): """Async version of list_available_models_command().""" return self.call_async(self._stub.ListAvailableModels, list_request, None, _list_available_models_error, **kwargs)
[docs] def network_compute_bridge_command(self, network_compute_request, **kwargs): """Issue the main network compute bridge request to run a model on specific, requested data. Args: network_compute_request (NetworkComputeRequest): The request which contains what type of data should be processed, and which model the server should run. Returns: The full NetworkComputeResponse, which contains the processed data. Raises: RpcError: Problem communicating with the robot. ExternalServiceNotFoundError: The network compute bridge worker service was not found in the robot's directory. ExternalServerError: Either the service or worker service threw an error when responding with the set of all models. NetworkComputeRotationError: For processed image data, the robot was unable to rotate the image as requested. """ return self.call(self._stub.NetworkCompute, network_compute_request, None, _network_compute_error, **kwargs)
[docs] def network_compute_bridge_command_async(self, network_compute_request, **kwargs): """Async version of network_compute_bridge_command().""" return self.call_async(self._stub.NetworkCompute, network_compute_request, None, _network_compute_error, **kwargs)
@handle_common_header_errors @handle_custom_params_errors( status_value=network_compute_bridge_pb2.NETWORK_COMPUTE_STATUS_CUSTOM_PARAMS_ERROR) def _network_compute_error(response): """Return a custom exception based on response, None if no error.""" error_type, message = _NETWORK_COMPUTE_STATUS_TO_ERROR[response.status] # This status is not an error. if error_type is None: return None return error_type(response=response, error_message=message) _NETWORK_COMPUTE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _NETWORK_COMPUTE_STATUS_TO_ERROR.update({ network_compute_bridge_pb2.NETWORK_COMPUTE_STATUS_UNKNOWN: error_pair(UnsetStatusError), network_compute_bridge_pb2.NETWORK_COMPUTE_STATUS_SUCCESS: (None, None), network_compute_bridge_pb2.NETWORK_COMPUTE_STATUS_EXTERNAL_SERVICE_NOT_FOUND: error_pair(ExternalServiceNotFoundError), network_compute_bridge_pb2.NETWORK_COMPUTE_STATUS_EXTERNAL_SERVER_ERROR: error_pair(ExternalServerError), network_compute_bridge_pb2.NETWORK_COMPUTE_STATUS_ROTATION_ERROR: error_pair(NetworkComputeRotationError), network_compute_bridge_pb2.NETWORK_COMPUTE_STATUS_ANALYSIS_FAILED: error_pair(NetworkComputeAnalysisFailedError), }) @handle_common_header_errors def _list_available_models_error(response): """Return a custom exception based on response, None if no error.""" error_type, message = _LIST_AVAILABLE_MODELS_STATUS_TO_ERROR[response.status] # This status is not an error. if error_type is None: return None return error_type(response=response, error_message=message) _LIST_AVAILABLE_MODELS_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _LIST_AVAILABLE_MODELS_STATUS_TO_ERROR.update({ network_compute_bridge_pb2.LIST_AVAILABLE_MODELS_STATUS_UNKNOWN: error_pair(UnsetStatusError), network_compute_bridge_pb2.LIST_AVAILABLE_MODELS_STATUS_SUCCESS: (None, None), network_compute_bridge_pb2.LIST_AVAILABLE_MODELS_STATUS_EXTERNAL_SERVICE_NOT_FOUND: (ExternalServiceNotFoundError, ExternalServiceNotFoundError.__doc__), network_compute_bridge_pb2.LIST_AVAILABLE_MODELS_STATUS_EXTERNAL_SERVER_ERROR: (ExternalServerError, None), })