# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to the Spot CAM Compositor service."""
from google.protobuf.wrappers_pb2 import BoolValue
from bosdyn.api.spot_cam import compositor_pb2, service_pb2_grpc
from bosdyn.client.common import BaseClient, handle_common_header_errors
[docs]class CompositorClient(BaseClient):
"""A client calling Spot CAM Compositor services.
"""
default_service_name = 'spot-cam-compositor'
service_type = 'bosdyn.api.spot_cam.CompositorService'
def __init__(self):
super(CompositorClient, self).__init__(service_pb2_grpc.CompositorServiceStub)
[docs] def set_screen(self, name, **kwargs):
"""Change the current view that is being streamed over the network"""
request = compositor_pb2.SetScreenRequest(name=name)
return self.call(self._stub.SetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def set_screen_async(self, name, **kwargs):
"""Async version of set_screen()"""
request = compositor_pb2.SetScreenRequest(name=name)
return self.call_async(self._stub.SetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_screen(self, **kwargs):
"""Get the currently selected screen"""
request = compositor_pb2.GetScreenRequest()
return self.call(self._stub.GetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_screen_async(self, **kwargs):
"""Async version of get_screen()"""
request = compositor_pb2.GetScreenRequest()
return self.call_async(self._stub.GetScreen, request, self._name_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def list_screens(self, **kwargs):
"""List available screens"""
request = compositor_pb2.ListScreensRequest()
return self.call(self._stub.ListScreens, request, self._screens_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def list_screens_async(self, **kwargs):
"""Async version of list_screens()"""
request = compositor_pb2.ListScreensRequest()
return self.call_async(self._stub.ListScreens, request, self._screens_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_visible_cameras(self, **kwargs):
"""List cameras on Spot CAM"""
request = compositor_pb2.GetVisibleCamerasRequest()
return self.call(self._stub.GetVisibleCameras, request, self._streams_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_visible_cameras_async(self, **kwargs):
"""Async version of get_visible_cameras()"""
request = compositor_pb2.GetVisibleCamerasRequest()
return self.call_async(self._stub.GetVisibleCameras, request, self._streams_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def set_ir_colormap(self, colormap, min_temp, max_temp, auto_scale, **kwargs):
"""Set IR colormap to use on Spot CAM
Args:
colormap (bosdyn.api.spot_cam.compositor_pb2.IrColorMap.ColorMap): IR display colormap
min_temp (Float): minimum temperature on the temperature scale
max_temp (Float): maximum temperature on the temperature scale
auto_scale (Boolean): Auto-scale the color map. This is the most human-understandable
option. min_temp and max_temp are ignored if this is set to True
kwargs: extra arguments for controlling RPC details.
"""
scale = compositor_pb2.IrColorMap.ScalingPair(min=min_temp, max=max_temp)
auto = BoolValue(value=auto_scale)
ir_colormap = compositor_pb2.IrColorMap(colormap=colormap, scale=scale, auto_scale=auto)
request = compositor_pb2.SetIrColormapRequest(map=ir_colormap)
return self.call(self._stub.SetIrColormap, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def set_ir_colormap_async(self, colormap, min_temp, max_temp, auto_scale, **kwargs):
"""Async version of set_ir_colormap()"""
scale = compositor_pb2.IrColorMap.ScalingPair(min=min_temp, max=max_temp)
auto = BoolValue(value=auto_scale)
ir_colormap = compositor_pb2.IrColorMap(colormap=colormap, scale=scale, auto_scale=auto)
request = compositor_pb2.SetIrColormapRequest(map=ir_colormap)
return self.call_async(self._stub.SetIrColormap, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_ir_colormap(self, **kwargs):
"""Get currently selected IR colormap on Spot CAM"""
request = compositor_pb2.GetIrColormapRequest()
return self.call(self._stub.GetIrColormap, request, self._colormap_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_ir_colormap_async(self, **kwargs):
"""Async version of get_ir_colormap()"""
request = compositor_pb2.GetIrColormapRequest()
return self.call_async(self._stub.GetIrColormap, request, self._colormap_from_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def set_ir_meter_overlay(self, x, y, enable, unit, **kwargs):
"""Set IR reticle position to use on Spot CAM IR
Args:
x (Float): (0,1) horizontal coordinate of reticle
y (Float): (0,1) vertical coordinate of reticle
enable (Boolean): Enable the reticle on the display
unit (TempUnit): Temperature unit to display
kwargs: extra arguments for controlling RPC details.
"""
coords = compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y)
# setting both coords and meter fields for backwards compatibility
overlay = compositor_pb2.IrMeterOverlay(enable=enable, coords=coords, meter=[coords],
unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def set_ir_meter_overlay_async(self, x, y, enable, unit, **kwargs):
"""Async version of set_ir_meter_overlay()"""
coords = compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y)
# setting both coords and meter fields for backwards compatibility
overlay = compositor_pb2.IrMeterOverlay(enable=enable, coords=coords, meter=[coords],
unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call_async(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def set_multi_ir_meter_overlay(self, coords, enable, unit, **kwargs):
"""Set multiple IR reticle positions to use on Spot CAM IR
Args:
coords (List[Tuple(Float, Float)]): List of (x, y) reticle coordinates in range (0,1)
e.g. [(0.1, 0.2), (0.2, 0.4), (0.7, 0.7)]
enable (Boolean): Enable the reticles on the display
unit (TempUnit): Temperature unit to display
kwargs: extra arguments for controlling RPC details.
"""
coords_proto = [
compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y) for x, y in coords
]
overlay = compositor_pb2.IrMeterOverlay(enable=enable, meter=coords_proto, unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def set_multi_ir_meter_overlay_async(self, coords, enable, unit, **kwargs):
"""Async version of set_multi_ir_meter_overlay()"""
coords_proto = [
compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y) for x, y in coords
]
overlay = compositor_pb2.IrMeterOverlay(enable=enable, meter=coords_proto, unit=unit)
request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
return self.call_async(self._stub.SetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_ir_meter_overlay(self, **kwargs):
"""Get current IR reticle positions"""
request = compositor_pb2.GetIrMeterOverlayRequest()
return self.call(self._stub.GetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
[docs] def get_ir_meter_overlay_async(self, **kwargs):
"""Async version of get_ir_meter_overlay()"""
request = compositor_pb2.GetIrMeterOverlayRequest()
return self.call_async(self._stub.GetIrMeterOverlay, request, self._return_response,
self._compositor_error_from_response, copy_request=False, **kwargs)
@staticmethod
def _return_response(response):
return response
@staticmethod
def _name_from_response(response):
return response.name
@staticmethod
def _screens_from_response(response):
return response.screens
@staticmethod
def _streams_from_response(response):
return response.streams
@staticmethod
def _colormap_from_response(response):
return response.map
@staticmethod
@handle_common_header_errors
def _compositor_error_from_response(response): # pylint: disable=unused-argument
return None