Source code for bosdyn.client.autowalk

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""For clients to the Autowalk service."""

import collections

from bosdyn.api.autowalk import autowalk_pb2, autowalk_service_pb2_grpc
from bosdyn.client import data_chunk
from bosdyn.client.common import BaseClient, error_factory, error_pair
from bosdyn.client.exceptions import ResponseError
from bosdyn.client.lease import add_lease_wallet_processors

from .data_chunk import chunk_message


[docs]class AutowalkResponseError(ResponseError): """General class of errors for autowalk service."""
[docs]class CompilationError(AutowalkResponseError): """Provided Walk could not be compiled because the Walk was malformed. """
[docs]class ValidationError(AutowalkResponseError): """Provided Walk could not be validated because some part of the Walk was unable to initialize. """
[docs]class AutowalkClient(BaseClient): """Client for the Autowalk service.""" default_service_name = 'autowalk-service' service_type = 'bosdyn.api.autowalk.AutowalkService' def __init__(self): super(AutowalkClient, self).__init__(autowalk_service_pb2_grpc.AutowalkServiceStub)
[docs] def update_from(self, other): """Update instance from another object. Args: other: The object where to copy from. """ super(AutowalkClient, self).update_from(other) if self.lease_wallet: add_lease_wallet_processors(self, self.lease_wallet)
[docs] def compile_autowalk(self, walk, data_chunk_byte_size=1000 * 1000, **kwargs): """Send the input walk file to the autowalk service for compilation. Args: walk: a walks_pb2.Walk input to be compiled by the autowalk service data_chunk_byte_size: max size of each streamed message Raises: RpcError: Problem communicating with the robot. bosdyn.client.autowalk.CompilationError: The walk failed to compile because it was malformed. bosdyn.client.autowalk.ValidationError: The walk failed to validate because some part of it was unable to initialize. """ request = self._compile_autowalk_request(walk) self._apply_request_processors(request, copy_request=False) return self.call(self._stub.CompileAutowalk, data_chunk.chunk_message(request, data_chunk_byte_size), error_from_response=_compile_autowalk_error_from_response, assemble_type=autowalk_pb2.CompileAutowalkResponse, copy_request=False, **kwargs)
[docs] def load_autowalk(self, walk, leases=[], data_chunk_byte_size=1000 * 1000, **kwargs): """Send the input walk file to the autowalk service for compilation and load resulting mission to the Mission Service on the robot. Args: walk: a walks_pb2.Walk input to be loaded onto the robot by the autowalk service leases: Leases the autowalk service will need to use. Unlike other clients, these MUST be specified. data_chunk_byte_size: max size of each streamed message Raises: RpcError: Problem communicating with the robot. bosdyn.client.autowalk.CompilationError: The walk failed to compile because it was malformed. bosdyn.client.autowalk.ValidationError: The walk failed to validate because some part of it was unable to initialize. """ request = self._load_autowalk_request(walk, leases) self._apply_request_processors(request, copy_request=False) return self.call(self._stub.LoadAutowalk, data_chunk.chunk_message(request, data_chunk_byte_size), error_from_response=_load_autowalk_error_from_response, assemble_type=autowalk_pb2.LoadAutowalkResponse, copy_request=False, **kwargs)
@staticmethod def _compile_autowalk_request(walk): request = autowalk_pb2.CompileAutowalkRequest(walk=walk) return request @staticmethod def _load_autowalk_request(walk, leases): request = autowalk_pb2.LoadAutowalkRequest(walk=walk) for lease in leases: request.leases.add().CopyFrom(lease.lease_proto) return request
_COMPILE_AUTOWALK_STATUS_TO_ERROR = collections.defaultdict(lambda: (AutowalkResponseError, None)) _COMPILE_AUTOWALK_STATUS_TO_ERROR.update({ autowalk_pb2.CompileAutowalkResponse.STATUS_OK: (None, None), autowalk_pb2.CompileAutowalkResponse.STATUS_COMPILE_ERROR: error_pair(CompilationError), }) def _compile_autowalk_error_from_response(response): return error_factory(response, response.status, status_to_string=autowalk_pb2.CompileAutowalkResponse.Status.Name, status_to_error=_COMPILE_AUTOWALK_STATUS_TO_ERROR) _LOAD_AUTOWALK_STATUS_TO_ERROR = collections.defaultdict(lambda: (AutowalkResponseError, None)) _LOAD_AUTOWALK_STATUS_TO_ERROR.update({ autowalk_pb2.LoadAutowalkResponse.STATUS_OK: (None, None), autowalk_pb2.LoadAutowalkResponse.STATUS_COMPILE_ERROR: error_pair(CompilationError), autowalk_pb2.LoadAutowalkResponse.STATUS_VALIDATE_ERROR: error_pair(ValidationError), }) def _load_autowalk_error_from_response(response): return error_factory(response, response.status, status_to_string=autowalk_pb2.LoadAutowalkResponse.Status.Name, status_to_error=_LOAD_AUTOWALK_STATUS_TO_ERROR)