# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to use the graph nav recording service"""
import collections
from enum import Enum
from bosdyn.api.graph_nav import map_pb2, nav_pb2, recording_pb2, recording_service_pb2
from bosdyn.api.graph_nav import recording_service_pb2_grpc as recording_service
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory,
handle_common_header_errors, handle_lease_use_result_errors,
handle_license_errors_if_present, handle_unset_status_error)
from bosdyn.client.exceptions import ResponseError
[docs]class WaypointRegion(Enum):
"""Helper enum to describe the localization region type for a waypoint."""
DEFAULT_REGION = 1
EMPTY_REGION = 2
CIRCLE_REGION = 3
[docs]class GraphNavRecordingServiceClient(BaseClient):
"""Client for the GraphNav recording service."""
default_service_name = 'recording-service'
service_type = 'bosdyn.api.graph_nav.GraphNavRecordingService'
def __init__(self):
super(GraphNavRecordingServiceClient,
self).__init__(recording_service.GraphNavRecordingServiceStub)
[docs] def start_recording(self, lease=None, recording_environment=None, require_fiducials=None,
**kwargs):
"""Start the recording service to create/update a map.
Args:
lease: Leases to show ownership of necessary resources. Will use the client's leases by default.
recording_environment: RecordingEnvironment protobuf to be used for the initial waypoint created at start.
require_fiducials: Boolean to show whether a fiducial is needed to start the recording.
Returns:
The status of the start recording request.
"""
request = self._build_start_recording_request(lease, recording_environment,
require_fiducials)
return self.call(self._stub.StartRecording, request, value_from_response=_get_status,
error_from_response=_start_recording_error, copy_request=False, **kwargs)
[docs] def start_recording_full(self, lease=None, recording_environment=None, require_fiducials=None,
**kwargs):
"""Same as start_recording() but returns a full response"""
request = self._build_start_recording_request(lease, recording_environment,
require_fiducials)
return self.call(self._stub.StartRecording, request, value_from_response=_get_response,
error_from_response=_start_recording_error, copy_request=False, **kwargs)
[docs] def start_recording_async(self, lease=None, recording_environment=None, require_fiducials=None,
**kwargs):
"""Async version of start_recording()."""
request = self._build_start_recording_request(lease, recording_environment,
require_fiducials)
return self.call_async(self._stub.StartRecording, request, value_from_response=_get_status,
error_from_response=_start_recording_error, copy_request=False,
**kwargs)
[docs] def start_recording_full_async(self, lease=None, recording_environment=None,
require_fiducials=None, **kwargs):
"""Async version of start_recording_full()."""
request = self._build_start_recording_request(lease, recording_environment,
require_fiducials)
return self.call_async(self._stub.StartRecording, request,
value_from_response=_get_response,
error_from_response=_start_recording_error, copy_request=False,
**kwargs)
[docs] def stop_recording(self, lease=None, **kwargs):
"""Stop the recording service.
Args:
lease: Leases to show ownership of necessary resources. Will use the client's leases by default.
Returns:
The status of the start recording request.
"""
request = self._build_stop_recording_request(lease)
return self.call(self._stub.StopRecording, request, value_from_response=_get_status,
error_from_response=_stop_recording_error, copy_request=False, **kwargs)
[docs] def stop_recording_async(self, lease=None, **kwargs):
"""Async version of stop_recording()."""
request = self._build_stop_recording_request(lease)
return self.call_async(self._stub.StopRecording, request, value_from_response=_get_status,
error_from_response=_stop_recording_error, copy_request=False,
**kwargs)
[docs] def get_record_status(self, **kwargs):
"""Get the status of the recording service.
Returns:
The record service status, which indicates the current persistent environment and if it's recording a map.
"""
request = self._build_get_record_status_request()
return self.call(self._stub.GetRecordStatus, request, value_from_response=_get_response,
error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def get_record_status_async(self, **kwargs):
"""Async version of get_record_status()."""
request = self._build_get_record_status_request()
return self.call_async(self._stub.GetRecordStatus, request,
value_from_response=_get_response,
error_from_response=common_header_errors, copy_request=False,
**kwargs)
[docs] def set_recording_environment(self, lease=None, recording_environment=None, **kwargs):
"""Set the persistent recording environment.
Args:
lease: Leases to show ownership of necessary resources. Will use the client's leases by default.
recording_environment: RecordingEnvironment protobuf to be set as the persistent environment.
Returns:
Nothing unless an error occurs.
"""
request = self._build_set_recording_environment_request(lease, recording_environment)
return self.call(self._stub.SetRecordingEnvironment, request, value_from_response=None,
error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def set_recording_environment_async(self, lease=None, recording_environment=None, **kwargs):
"""Async version of set_recording_environment()."""
request = self._build_set_recording_environment_request(lease, recording_environment)
return self.call_async(self._stub.SetRecordingEnvironment, request,
value_from_response=None, error_from_response=common_header_errors,
copy_request=False, **kwargs)
[docs] def create_waypoint(self, lease=None, waypoint_name=None, recording_environment=None, **kwargs):
"""Create a waypoint in the map at the current robot state.
Args:
lease: Leases to show ownership of necessary resources. Will use the client's leases by default.
waypoint_name: Human readable string for the waypoint name.
recording_environment: RecordingEnvironment protobuf to be used for the waypoint (will overwrite
and merge with any persistent env).
Returns:
The response, which includes the created waypoint and any associated edges created.
"""
request = self._build_create_waypoint_request(waypoint_name, recording_environment, lease)
return self.call(self._stub.CreateWaypoint, request, value_from_response=_get_response,
error_from_response=_create_waypoint_error, copy_request=False, **kwargs)
[docs] def create_waypoint_async(self, lease=None, waypoint_name=None, recording_environment=None,
**kwargs):
"""Async version of create_waypoint()."""
request = self._build_create_waypoint_request(waypoint_name, recording_environment, lease)
return self.call_async(self._stub.CreateWaypoint, request,
value_from_response=_get_response,
error_from_response=_create_waypoint_error, copy_request=False,
**kwargs)
[docs] def create_edge(self, lease=None, edge=None, **kwargs):
"""Create an edge in the map between two existing waypoints.
Args:
lease: Leases to show ownership of necessary resources. Will use the client's leases by default.
edge: An edge protobuf, which must include valid from/to waypoint id's and a from_T_to transform.
Returns:
The response status.
"""
request = self._build_create_edge_request(edge, lease)
return self.call(self._stub.CreateEdge, request, value_from_response=_get_status,
error_from_response=_create_edge_error, copy_request=False, **kwargs)
[docs] def create_edge_async(self, lease=None, edge=None, **kwargs):
"""Async version of create_edge()."""
request = self._build_create_edge_request(edge, lease)
return self.call_async(self._stub.CreateEdge, request, value_from_response=_get_status,
error_from_response=_create_edge_error, copy_request=False, **kwargs)
@staticmethod
def _build_start_recording_request(lease, recording_env, require_fiducials):
return recording_pb2.StartRecordingRequest(lease=lease, recording_environment=recording_env,
require_fiducials=require_fiducials)
@staticmethod
def _build_stop_recording_request(lease):
return recording_pb2.StopRecordingRequest(lease=lease)
@staticmethod
def _build_get_record_status_request():
return recording_pb2.GetRecordStatusRequest()
@staticmethod
def _build_set_recording_environment_request(lease, recording_env):
return recording_pb2.SetRecordingEnvironmentRequest(lease=lease, environment=recording_env)
@staticmethod
def _build_create_waypoint_request(waypoint_name, recording_env, lease):
return recording_pb2.CreateWaypointRequest(waypoint_name=waypoint_name,
recording_environment=recording_env, lease=lease)
@staticmethod
def _build_create_edge_request(edge, lease):
return recording_pb2.CreateEdgeRequest(edge=edge, lease=lease)
[docs] @staticmethod
def make_recording_environment(name=None, waypoint_env=None, edge_env=None):
"""Construct a complete recording environment from the waypoint and edge environments.
Args:
name: A string name prefix which will prefix waypoint names (human-readable).
waypoint_env: Waypoint.Annotations protobuf which includes information for the waypoint environment.
edge_env: Edge.Annotations protobuf which includes information for the edge environment.
Returns:
The API RecordingEnvironment protobuf message.
"""
return recording_pb2.RecordingEnvironment(name_prefix=name,
waypoint_environment=waypoint_env,
edge_environment=edge_env)
[docs] @staticmethod
def make_waypoint_environment(name=None, region=WaypointRegion.DEFAULT_REGION, dist_2d=None,
client_metadata=None, **kwargs):
"""Create a waypoint environment.
Args:
name: A string name for the waypoint (human-readable).
region: A WaypointRegion enum representing the region in which we are localizing in. This can be
either a default region, an empty region (don't localize to this waypoint), or a circular region.
dist_2d: If the region is circular, then this is set as a distance (meters) representing the number
of meters away we can be from the waypoint before scan matching.
client_metadata: Info about the client which will be stored in the waypoints.
Returns:
The API Waypoint.Annotations protobuf message.
"""
waypoint_env = map_pb2.Waypoint.Annotations(name=name, client_metadata=client_metadata)
if region == WaypointRegion.DEFAULT_REGION:
waypoint_env.scan_match_region.default_region.CopyFrom(
map_pb2.Waypoint.Annotations.LocalizeRegion.Default())
waypoint_env.scan_match_region.state = map_pb2.ANNOTATION_STATE_SET
elif region == WaypointRegion.EMPTY_REGION:
waypoint_env.scan_match_region.empty.CopyFrom(
map_pb2.Waypoint.Annotations.LocalizeRegion.Empty())
waypoint_env.scan_match_region.state = map_pb2.ANNOTATION_STATE_SET
elif region == WaypointRegion.CIRCLE_REGION:
if dist_2d is not None:
waypoint_env.scan_match_region.circle.CopyFrom(
map_pb2.Waypoint.Annotations.LocalizeRegion.Circle2D(dist_2d=dist_2d))
waypoint_env.scan_match_region.state = map_pb2.ANNOTATION_STATE_SET
else:
waypoint_env.scan_match_region.state = map_pb2.ANNOTATION_STATE_NONE
else:
waypoint_env.scan_match_region.state = map_pb2.ANNOTATION_STATE_NONE
return waypoint_env
[docs] @staticmethod
def make_edge_environment(
vel_limit=None, direction_constraint=map_pb2.Edge.Annotations.DIRECTION_CONSTRAINT_NONE,
require_alignment=False, ground_mu_hint=.8, grated_floor=False):
"""Create an edge environment.
Args:
vel_limit: A SE2VelocityLimit to use while traversing the edge. Note this is not a target speed, just a max/min.
direction_constraint: A direction constraints on the robot's orientation when traversing the edge.
require_alignment: Boolean where if true, the robot must be aligned with the edge in yaw before traversing it.
ground_mu_hint: Terrain coefficient of friction user hint. Suggested values lie between [.4, .8].
grated_floor: Boolean where if true, the edge crosses over grated metal.
Returns:
The API Edge.Annotations protobuf message.
"""
edge_env = map_pb2.Edge.Annotations()
edge_env.require_alignment.value.CopyFrom(require_alignment)
edge_env.grated_floor.value.CopyFrom(grated_floor)
if (ground_mu_hint > 0):
edge_env.ground_mu_hint.value.CopyFrom(ground_mu_hint)
if vel_limit is not None:
edge_env.vel_limit.CopyFrom(vel_limit)
edge_env.direction_constraint.CopyFrom(direction_constraint)
edge_env.stairs.state.CopyFrom(map_pb2.AnnotationState.ANNOTATION_STATE_NONE)
return edge_env
[docs] @staticmethod
def make_edge(from_waypoint_id, to_waypoint_id, from_tform_to, edge_environment=None):
"""Create an edge between two waypoint ids.
Args:
from_waypoint_id: A waypoint string id for the from waypoint.
to_waypoint_id: A waypoint string id for the to waypoint.
from_tform_to: An SE3Pose representing the transform of from_waypoint to to_waypoint.
edge_environment: Any edge environment to be associated with the created edge.
Returns:
The API Edge protobuf message.
"""
edge_id = map_pb2.Edge.Id(from_waypoint=from_waypoint_id, to_waypoint=to_waypoint_id)
edge = map_pb2.Edge(id=edge_id, from_tform_to=from_tform_to)
if edge_environment is not None:
edge.annotations.CopyFrom(edge_environment)
return edge
'''
Static helper methods for handing responses and errors.
'''
[docs]class RecordingServiceResponseError(ResponseError):
"""General class of errors for the GraphNav Recording Service."""
[docs]class CouldNotCreateWaypointError(RecordingServiceResponseError):
"""Service could not create a waypoint."""
[docs]class NotRecordingError(RecordingServiceResponseError):
"""The recording service has not been started."""
[docs]class UnknownWaypointError(RecordingServiceResponseError):
"""The edge requested has a waypoint id that is unknown."""
[docs]class EdgeExistsError(RecordingServiceResponseError):
"""The edge requested with the given ID already exists in the map."""
[docs]class NotLocalizedToEndError(RecordingServiceResponseError):
"""Stop recording failed to localize to the last created waypoint."""
[docs]class FollowingRouteError(RecordingServiceResponseError):
"""Cannot start recording while the robot is already following a route."""
[docs]class NotLocalizedToExistingMapError(RecordingServiceResponseError):
"""The robot is not localized to the existing map and cannot start recording."""
[docs]class TooFarFromExistingMapError(RecordingServiceResponseError):
"""The robot is too far from the existing map and cannot start recording."""
[docs]class RemoteCloudFailureNotInDirectoryError(RecordingServiceResponseError):
"""Failed to start recording because a remote point cloud (e.g. a LIDAR) is not registered to the service directory."""
[docs]class RemoteCloudFailureNoDataError(RecordingServiceResponseError):
"""Failed to start recording because a remote point cloud (e.g. a LIDAR) is not delivering data."""
[docs]class NotReadyYetError(RecordingServiceResponseError):
"""The service is processing the map at its current position. Try again in 1-2 seconds."""
[docs]class MissingFiducialsError(RecordingServiceResponseError):
"""One or more required fiducials were not detected."""
[docs]class FiducialPoseError(RecordingServiceResponseError):
"""The pose of one or more required fiducials could not be determined accurately."""
[docs]class RobotImpairedError(RecordingServiceResponseError):
"""Failed to start recording because the robot is impaired."""
def __init__(self, response, error_message):
RecordingServiceResponseError.__init__(self, response, error_message)
self.impaired_state = response.impaired_state
def __str__(self):
base = RecordingServiceResponseError.__str__(self)
base += "\nImpaired state: {}".format(self.impaired_state)
return base
def _get_status(response):
return response.status
def _get_response(response):
# Return full response for RecordStatus to get environment and is_recording information.
return response
_START_RECORDING_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_START_RECORDING_STATUS_TO_ERROR.update({
recording_pb2.StartRecordingResponse.STATUS_OK: (None, None),
recording_pb2.StartRecordingResponse.STATUS_COULD_NOT_CREATE_WAYPOINT:
(CouldNotCreateWaypointError, CouldNotCreateWaypointError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_FOLLOWING_ROUTE:
(FollowingRouteError, FollowingRouteError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_NOT_LOCALIZED_TO_EXISTING_MAP:
(NotLocalizedToExistingMapError, NotLocalizedToExistingMapError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_MISSING_FIDUCIALS:
(MissingFiducialsError, MissingFiducialsError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_MAP_TOO_LARGE_LICENSE:
(MapTooLargeLicenseError, MapTooLargeLicenseError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_REMOTE_CLOUD_FAILURE_NOT_IN_DIRECTORY:
(RemoteCloudFailureNotInDirectoryError, RemoteCloudFailureNotInDirectoryError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_REMOTE_CLOUD_FAILURE_NO_DATA:
(RemoteCloudFailureNoDataError, RemoteCloudFailureNoDataError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_FIDUCIAL_POSE_NOT_OK:
(FiducialPoseError, FiducialPoseError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_TOO_FAR_FROM_EXISTING_MAP:
(TooFarFromExistingMapError, TooFarFromExistingMapError.__doc__),
recording_pb2.StartRecordingResponse.STATUS_ROBOT_IMPAIRED:
(RobotImpairedError, RobotImpairedError.__doc__)
})
@handle_common_header_errors
# @handle_lease_use_result_errors
@handle_license_errors_if_present
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _start_recording_error(response):
"""Return a custom exception based on start recording response, None if no error."""
return error_factory(response, response.status,
status_to_string=recording_pb2.StartRecordingResponse.Status.Name,
status_to_error=_START_RECORDING_STATUS_TO_ERROR)
_STOP_RECORDING_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_STOP_RECORDING_STATUS_TO_ERROR.update({
recording_pb2.StopRecordingResponse.STATUS_OK: (None, None),
recording_pb2.StopRecordingResponse.STATUS_NOT_LOCALIZED_TO_END:
(NotLocalizedToEndError, NotLocalizedToEndError.__doc__),
recording_pb2.StopRecordingResponse.STATUS_NOT_READY_YET:
(NotReadyYetError, NotReadyYetError.__doc__)
})
@handle_common_header_errors
# @handle_lease_use_result_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _stop_recording_error(response):
"""Return a custom exception based on stop recording response, None if no error."""
return error_factory(response, response.status,
status_to_string=recording_pb2.StopRecordingResponse.Status.Name,
status_to_error=_STOP_RECORDING_STATUS_TO_ERROR)
_CREATE_WAYPOINT_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_CREATE_WAYPOINT_STATUS_TO_ERROR.update({
recording_pb2.CreateWaypointResponse.STATUS_OK: (None, None),
recording_pb2.CreateWaypointResponse.STATUS_NOT_RECORDING:
(NotRecordingError, NotRecordingError.__doc__),
recording_pb2.CreateWaypointResponse.STATUS_COULD_NOT_CREATE_WAYPOINT:
(CouldNotCreateWaypointError, CouldNotCreateWaypointError.__doc__),
recording_pb2.CreateWaypointResponse.STATUS_REMOTE_CLOUD_FAILURE_NOT_IN_DIRECTORY:
(RemoteCloudFailureNotInDirectoryError, RemoteCloudFailureNotInDirectoryError.__doc__),
recording_pb2.CreateWaypointResponse.STATUS_REMOTE_CLOUD_FAILURE_NO_DATA:
(RemoteCloudFailureNoDataError, RemoteCloudFailureNoDataError.__doc__),
})
@handle_common_header_errors
# @handle_lease_use_result_errors
@handle_license_errors_if_present
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _create_waypoint_error(response):
"""Return a custom exception based on create waypoint response, None if no error."""
return error_factory(response, response.status,
status_to_string=recording_pb2.CreateWaypointResponse.Status.Name,
status_to_error=_CREATE_WAYPOINT_STATUS_TO_ERROR)
_CREATE_EDGE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_CREATE_EDGE_STATUS_TO_ERROR.update({
recording_pb2.CreateEdgeResponse.STATUS_OK: (None, None),
recording_pb2.CreateEdgeResponse.STATUS_NOT_RECORDING:
(NotRecordingError, NotRecordingError.__doc__),
recording_pb2.CreateEdgeResponse.STATUS_EXISTS: (EdgeExistsError, EdgeExistsError.__doc__),
recording_pb2.CreateEdgeResponse.STATUS_UNKNOWN_WAYPOINT:
(UnknownWaypointError, UnknownWaypointError.__doc__),
recording_pb2.CreateEdgeResponse.STATUS_MISSING_TRANSFORM:
(EdgeMissingTransformError, EdgeMissingTransformError.__doc__)
})
@handle_common_header_errors
# @handle_lease_use_result_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _create_edge_error(response):
"""Return a custom exception based on create edge response, None if no error."""
return error_factory(response, response.status,
status_to_string=recording_pb2.CreateEdgeResponse.Status.Name,
status_to_error=_CREATE_EDGE_STATUS_TO_ERROR)