# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client for the RemoteMission service."""
import collections
from bosdyn.api.mission import remote_pb2, remote_service_pb2_grpc
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory,
handle_common_header_errors, handle_custom_params_errors,
handle_lease_use_result_errors, handle_unset_status_error)
from bosdyn.client.exceptions import ResponseError
from bosdyn.client.lease import (DEFAULT_RESOURCES, LeaseWalletRequestProcessor,
LeaseWalletResponseProcessor)
from bosdyn.mission import constants
[docs]class Error(Exception):
pass
[docs]class InvalidSessionId(ResponseError):
"""Provided session ID was not valid on the server."""
[docs]class MissingLeases(ResponseError):
"""Missing leases on required resources."""
[docs]def tree_status_from_tick_status(tick_status):
if tick_status == remote_pb2.TickResponse.STATUS_FAILURE:
return constants.Result.FAILURE
elif tick_status == remote_pb2.TickResponse.STATUS_RUNNING:
return constants.Result.RUNNING
elif tick_status == remote_pb2.TickResponse.STATUS_SUCCESS:
return constants.Result.SUCCESS
raise Error('No corresponding tree status for tick status "{}"'.format(tick_status))
[docs]class RemoteClient(BaseClient):
default_service_name = None
service_type = 'bosdyn.api.mission.RemoteMissionService'
def __init__(self):
super(RemoteClient, self).__init__(remote_service_pb2_grpc.RemoteMissionServiceStub)
self.lease_processor = None
[docs] def update_from(self, other):
super(RemoteClient, self).update_from(other)
self.lease_processor = LeaseWalletRequestProcessor(self.lease_wallet)
self.response_processors.append(LeaseWalletResponseProcessor(self.lease_wallet))
[docs] def establish_session(self, leases=(), inputs=None, lease_resources=DEFAULT_RESOURCES,
**kwargs):
"""Establish a session.
Args:
leases (Iterable[Lease]): List of lease protobufs to establish session with.
inputs (Iterable[bosdyn.api.mission.KeyValue]): any inputs needed by the remote node.
lease_resources (Iterable[str]): List of resource names to use from the lease wallet.
Only applied if no leases are provided.
"""
req = self._build_establish_session_request(inputs=inputs, leases=leases,
lease_resources=lease_resources)
return self.call(self._stub.EstablishSession, req,
value_from_response=_session_id_from_response,
error_from_response=_establish_error_from_response, copy_request=False,
**kwargs)
[docs] def establish_session_async(self, leases=(), inputs=None, lease_resources=DEFAULT_RESOURCES,
**kwargs):
"""Async version of establish_session()"""
req = self._build_establish_session_request(inputs=inputs, leases=leases,
lease_resources=lease_resources)
return self.call_async(self._stub.EstablishSession, req,
value_from_response=_session_id_from_response,
error_from_response=_establish_error_from_response,
copy_request=False, **kwargs)
[docs] def tick(self, session_id, leases=(), inputs=None, lease_resources=DEFAULT_RESOURCES,
group_name=None, params=None, **kwargs):
"""Tick the remote node.
Args:
session_id: session
leases (Iterable[Lease]): List of lease protobufs to use during the tick.
inputs (Iterable[bosdyn.api.mission.KeyValue]): any inputs needed by the remote node.
lease_resources (Iterable[str]): List of resource names to use from the lease wallet.
Only applied if no leases are provided.
"""
req = self._build_tick_request(inputs=inputs, leases=leases, session_id=session_id,
lease_resources=lease_resources, group_name=group_name,
params=params)
return self.call(self._stub.Tick, req, value_from_response=None,
error_from_response=_tick_error_from_response, copy_request=False,
**kwargs)
[docs] def tick_async(self, session_id, leases=(), inputs=None, lease_resources=DEFAULT_RESOURCES,
group_name=None, params=None, **kwargs):
"""Async version of tick()"""
req = self._build_tick_request(inputs=inputs, leases=leases, session_id=session_id,
lease_resources=lease_resources, group_name=group_name,
params=params)
return self.call_async(self._stub.Tick, req, value_from_response=None,
error_from_response=_tick_error_from_response, copy_request=False,
**kwargs)
[docs] def stop(self, session_id, **kwargs):
req = remote_pb2.StopRequest(session_id=session_id)
return self.call(self._stub.Stop, req, value_from_response=None,
error_from_response=_stop_error_from_response, copy_request=False,
**kwargs)
[docs] def stop_async(self, session_id, **kwargs):
req = remote_pb2.StopRequest(session_id=session_id)
return self.call_async(self._stub.Stop, req, value_from_response=None,
error_from_response=_stop_error_from_response, copy_request=False,
**kwargs)
[docs] def teardown_session(self, session_id, **kwargs):
req = remote_pb2.TeardownSessionRequest(session_id=session_id)
return self.call(self._stub.TeardownSession, req, value_from_response=None,
error_from_response=_teardown_error_from_response, copy_request=False,
**kwargs)
[docs] def teardown_session_async(self, session_id, **kwargs):
req = remote_pb2.TeardownSessionRequest(session_id=session_id)
return self.call_async(self._stub.TeardownSession, req, value_from_response=None,
error_from_response=_teardown_error_from_response,
copy_request=False, **kwargs)
[docs] def get_service_info(self, **kwargs):
"""Get information about the service itself, such as a custom parameter spec.
Raises:
RpcError: problem communicating with the robot.
Returns:
A GetRemoteMissionServiceInfoResponse message describing this service.
"""
req = remote_pb2.GetRemoteMissionServiceInfoRequest()
return self.call(self._stub.GetRemoteMissionServiceInfo, req,
error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def get_service_info_async(self, **kwargs):
"""Async version of get_service_info()"""
req = remote_pb2.GetRemoteMissionServiceInfoRequest()
return self.call_async(self._stub.GetRemoteMissionServiceInfo, req,
error_from_response=common_header_errors, copy_request=False,
**kwargs)
def _build_establish_session_request(self, inputs, leases, lease_resources):
req = remote_pb2.EstablishSessionRequest(inputs=inputs)
_copy_leases(req, leases)
if self.lease_processor:
self.lease_processor.mutate(req, resource_list=lease_resources)
return req
def _build_tick_request(self, inputs, leases, session_id, lease_resources, group_name=None,
params=None):
req = remote_pb2.TickRequest(inputs=inputs, session_id=session_id)
if group_name:
req.group_name = group_name
if params:
req.params.CopyFrom(params)
_copy_leases(req, leases)
if self.lease_processor:
self.lease_processor.mutate(req, resource_list=lease_resources)
return req
def _copy_leases(req, leases):
"""Modify req.leases in place to contain lease_pb2.Lease objects"""
# Assume that the "leases" iterable contains lease_pb2.Lease protobuf objects.
for proto in leases:
try:
# Try to access the lease_proto, in case the "leases" argument isn't a protobuf.
# For example it could be a bosdyn.client.lease.Lease.
proto = proto.lease_proto
except AttributeError:
# If that didn't work, do nothing.
# Either it's a lease_pb2.Lease, or we'll get an error in the CopyFrom step.
pass
req.leases.add().CopyFrom(proto)
def _session_id_from_response(response):
return response.session_id
_ESTABLISH_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_ESTABLISH_STATUS_TO_ERROR.update({
remote_pb2.EstablishSessionResponse.STATUS_OK: (None, None),
remote_pb2.EstablishSessionResponse.STATUS_MISSING_LEASES:
(MissingLeases, MissingLeases.__doc__),
remote_pb2.EstablishSessionResponse.STATUS_MISSING_INPUTS:
(MissingInputs, MissingInputs.__doc__),
})
@handle_common_header_errors
@handle_lease_use_result_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _establish_error_from_response(response):
return error_factory(response, response.status,
status_to_string=remote_pb2.TickResponse.Status.Name,
status_to_error=_ESTABLISH_STATUS_TO_ERROR)
_TICK_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_TICK_STATUS_TO_ERROR.update({
remote_pb2.TickResponse.STATUS_FAILURE: (None, None),
remote_pb2.TickResponse.STATUS_RUNNING: (None, None),
remote_pb2.TickResponse.STATUS_SUCCESS: (None, None),
remote_pb2.TickResponse.STATUS_MISSING_LEASES: (MissingLeases, MissingLeases.__doc__),
remote_pb2.TickResponse.STATUS_MISSING_INPUTS: (MissingInputs, MissingInputs.__doc__),
remote_pb2.TickResponse.STATUS_INVALID_SESSION_ID: (InvalidSessionId, InvalidSessionId.__doc__),
})
@handle_common_header_errors
@handle_lease_use_result_errors
@handle_custom_params_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _tick_error_from_response(response):
return error_factory(response, response.status,
status_to_string=remote_pb2.TickResponse.Status.Name,
status_to_error=_TICK_STATUS_TO_ERROR)
_STOP_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_STOP_STATUS_TO_ERROR.update({
remote_pb2.StopResponse.STATUS_OK: (None, None),
remote_pb2.StopResponse.STATUS_INVALID_SESSION_ID: (InvalidSessionId, InvalidSessionId.__doc__),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _stop_error_from_response(response):
return error_factory(response, response.status,
status_to_string=remote_pb2.StopResponse.Status.Name,
status_to_error=_STOP_STATUS_TO_ERROR)
_TEARDOWN_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_TEARDOWN_STATUS_TO_ERROR.update({
remote_pb2.TeardownSessionResponse.STATUS_OK: (None, None),
remote_pb2.TeardownSessionResponse.STATUS_INVALID_SESSION_ID:
(InvalidSessionId, InvalidSessionId.__doc__),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _teardown_error_from_response(response):
return error_factory(response, response.status,
status_to_string=remote_pb2.TeardownSessionResponse.Status.Name,
status_to_error=_TEARDOWN_STATUS_TO_ERROR)