Source code for bosdyn.client.point_cloud

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Client for the point cloud service.

This allows client code to read from a point cloud service.
"""

import collections
import logging

import bosdyn.api.point_cloud_pb2 as point_cloud_protos
import bosdyn.api.point_cloud_service_pb2_grpc as point_cloud_service
from bosdyn.client.common import (common_header_errors, error_factory, error_pair,
                                  handle_common_header_errors)
from bosdyn.client.exceptions import ResponseError, UnsetStatusError

from .common import BaseClient

LOGGER = logging.getLogger('point_cloud_client')


[docs]class PointCloudResponseError(ResponseError): """General class of errors for PointCloud service."""
[docs]class UnknownPointCloudSourceError(PointCloudResponseError): """System cannot find the requested point cloud source name."""
[docs]class SourceDataError(PointCloudResponseError): """System cannot generate the PointCloudSource at this time."""
[docs]class PointCloudDataError(PointCloudResponseError): """System cannot generate point cloud data at this time."""
_STATUS_TO_ERROR = collections.defaultdict(lambda: (PointCloudResponseError, None)) _STATUS_TO_ERROR.update({ point_cloud_protos.PointCloudResponse.STATUS_OK: (None, None), point_cloud_protos.PointCloudResponse.STATUS_UNKNOWN_SOURCE: error_pair(UnknownPointCloudSourceError), point_cloud_protos.PointCloudResponse.STATUS_SOURCE_DATA_ERROR: error_pair(SourceDataError), point_cloud_protos.PointCloudResponse.STATUS_UNKNOWN: error_pair(UnsetStatusError), point_cloud_protos.PointCloudResponse.STATUS_POINT_CLOUD_DATA_ERROR: error_pair(PointCloudDataError), }) @handle_common_header_errors def _error_from_response(response): """Return a custom exception based on the first invalid point_cloud response, None if no error.""" for point_cloud_response in response.point_cloud_responses: result = error_factory(response, point_cloud_response.status, status_to_string=point_cloud_protos.PointCloudResponse.Status.Name, status_to_error=_STATUS_TO_ERROR) if result is not None: return result return None
[docs]class PointCloudClient(BaseClient): """A client handling point clouds.""" default_service_name = 'point-cloud' service_type = 'bosdyn.api.PointCloudService' def __init__(self): super(PointCloudClient, self).__init__(point_cloud_service.PointCloudServiceStub)
[docs] def list_point_cloud_sources(self, **kwargs): """ Obtain the list of PointCloudSources. Returns: A list of the different point cloud sources as strings. Raises: RpcError: Problem communicating with the robot. """ req = self._get_list_point_cloud_source_request() return self.call(self._stub.ListPointCloudSources, req, _list_point_cloud_sources_value, common_header_errors, copy_request=False, **kwargs)
[docs] def list_point_cloud_sources_async(self, **kwargs): """Async version of list_point_cloud_sources()""" req = self._get_list_point_cloud_source_request() return self.call_async(self._stub.ListPointCloudSources, req, _list_point_cloud_sources_value, common_header_errors, copy_request=False, **kwargs)
[docs] def get_point_cloud_from_sources(self, point_cloud_sources, **kwargs): """Obtain point clouds from sources using default parameters. Args: point_cloud_sources (list of strings): The source names to request point clouds from. Returns: A list of point cloud responses for each of the requested sources. Raises: RpcError: Problem communicating with the robot. UnknownPointCloudSourceError: Provided point cloud source was invalid or not found point_cloud.SourceDataError: Failed to fill out PointCloudSource. All other fields are not filled UnsetStatusError: An internal PointCloudService issue has happened PointCloudDataError: Problem with the point cloud data. Only PointCloudSource is filled """ return self.get_point_cloud([build_pc_request(src) for src in point_cloud_sources], **kwargs)
[docs] def get_point_cloud_from_sources_async(self, point_cloud_sources, **kwargs): """Obtain point clouds from sources using default parameters.""" return self.get_point_cloud_async([build_pc_request(src) for src in point_cloud_sources], **kwargs)
[docs] def get_point_cloud(self, point_cloud_requests, **kw_args): """Get the most recent point cloud Args: point_cloud_requests (list of PointCloudRequest): A list of PointCloudRequest protobuf messages which specify which point clouds to collect kw_args: Extra arguments to pass to grpc call invocation. Returns: A list of point cloud responses for each of the requested sources. Raises: RpcError: Problem communicating with the robot. UnknownPointCloudSourceError: Provided point cloud source was invalid or not found point_cloud.SourceDataError: Failed to fill out PointCloudSource. All other fields are not filled UnsetStatusError: An internal PointCloudService issue has happened PointCloudDataError: Problem with the point cloud data. Only PointCloudSource is filled """ request = self._get_point_cloud_request(point_cloud_requests) return self.call(self._stub.GetPointCloud, request, value_from_response=_get_point_cloud_value, error_from_response=_error_from_response, **kw_args)
[docs] def get_point_cloud_async(self, point_cloud_requests, **kw_args): """Get the most recent point cloud Args: point_cloud_requests (list of PointCloudRequest): A list of PointCloudRequest protobuf messages which specify which point clouds to collect kw_args: Extra arguments to pass to grpc call invocation. Raises: RpcError: Problem communicating with the robot. UnknownPointCloudSourceError: Provided point cloud source was invalid or not found point_cloud.SourceDataError: Failed to fill out PointCloudSource. All other fields are not filled UnsetStatusError: An internal PointCloudService issue has happened PointCloudDataError: Problem with the point cloud data. Only PointCloudSource is filled Returns: A list of point cloud responses for each of the requested sources. """ request = self._get_point_cloud_request(point_cloud_requests) return self.call_async(self._stub.GetPointCloud, request, value_from_response=_get_point_cloud_value, error_from_response=_error_from_response, **kw_args)
@staticmethod def _get_point_cloud_request(point_cloud_requests): return point_cloud_protos.GetPointCloudRequest(point_cloud_requests=point_cloud_requests) @staticmethod def _get_list_point_cloud_source_request(): return point_cloud_protos.ListPointCloudSourcesRequest()
[docs]def build_pc_request(point_cloud_source_name): """Helper function which builds an PointCloudRequest from a point cloud source name. Args: point_cloud_source_name (string): The point cloud source to query. Returns: The PointCloudRequest protobuf message for the given parameters. """ return point_cloud_protos.PointCloudRequest(point_cloud_source_name=point_cloud_source_name)
def _list_point_cloud_sources_value(response): return response.point_cloud_sources def _get_point_cloud_value(response): return response.point_cloud_responses