# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to the Spot CAM StreamQuality service."""
import logging
_LOGGER = logging.getLogger(__name__)
from bosdyn.api.spot_cam import service_pb2_grpc, streamquality_pb2
from bosdyn.client.common import BaseClient, common_header_errors, handle_common_header_errors
[docs]class StreamQualityClient(BaseClient):
"""A client calling Spot CAM StreamQuality service.
"""
default_service_name = 'spot-cam-stream-quality'
service_type = 'bosdyn.api.spot_cam.StreamQualityService'
def __init__(self):
super(StreamQualityClient, self).__init__(service_pb2_grpc.StreamQualityServiceStub)
[docs] def set_stream_params(self, target_bitrate=None, refresh_interval=None, idr_interval=None,
awb_mode=None, auto_exposure=None, sync_auto_exposure=None,
manual_exposure=None, **kwargs):
"""Change image compression and postprocessing.
At most one of auto_exposure, sync_auto_exposure, and manual_exposure can be specified.
The others should be set to None if one is specified. Otherwise, they should all be None.
Args:
target_bitrate (int): The compression level in target BPS
refreshinterval (int): How often the entire feed should be refreshed (in frames)
idrinterval (int): How often an IDR message should get sent (in frames)
awb_mode (AwbModeEnum): Options for automatic white balancing mode
auto_exposure (AutoExposure): Runs exposure independently on each of the ring cameras
sync_auto_exposure (SyncAutoExposure): Runs a single autoexposure algorithm that takes
into account data from all ring cameras
manual_exposure (ManualExposure): Manual exposure sets an exposure for all ring cameras
"""
request = self._build_SetStreamParamsRequest(target_bitrate, refresh_interval, idr_interval,
awb_mode, auto_exposure, sync_auto_exposure,
manual_exposure)
return self.call(self._stub.SetStreamParams, request, self._params_from_response,
self._streamquality_error_from_response, copy_request=False, **kwargs)
[docs] def set_stream_params_async(self, target_bitrate=None, refresh_interval=None, idr_interval=None,
awb_mode=None, auto_exposure=None, sync_auto_exposure=None,
manual_exposure=None, **kwargs):
"""Async version of set_stream_params()."""
request = self._build_SetStreamParamsRequest(target_bitrate, refresh_interval, idr_interval,
awb_mode, auto_exposure, sync_auto_exposure,
manual_exposure)
return self.call_async(self._stub.SetStreamParams, request, self._params_from_response,
self._streamquality_error_from_response, copy_request=False,
**kwargs)
[docs] def get_stream_params(self, **kwargs):
"""Get image quality and processing settings."""
request = streamquality_pb2.GetStreamParamsRequest()
return self.call(self._stub.GetStreamParams, request, self._params_from_response,
self._streamquality_error_from_response, copy_request=False, **kwargs)
[docs] def get_stream_params_async(self, **kwargs):
"""Async version of get_stream_params()."""
request = streamquality_pb2.GetStreamParamsRequest()
return self.call_async(self._stub.GetStreamParams, request, self._params_from_response,
self._streamquality_error_from_response, copy_request=False,
**kwargs)
[docs] def enable_congestion_control(self, enable=True, **kwargs):
"""Enable congestion control."""
request = streamquality_pb2.EnableCongestionControlRequest(enable_congestion_control=enable)
return self.call(self._stub.EnableCongestionControl, request, None,
self._streamquality_error_from_response, copy_request=False, **kwargs)
[docs] def enable_congestion_control_async(self, enable=True, **kwargs):
"""Async version of enable_congestion_control()."""
request = streamquality_pb2.EnableCongestionControlRequest(enable_congestion_control=enable)
return self.call_async(self._stub.EnableCongestionControl, request, None,
self._streamquality_error_from_response, copy_request=False,
**kwargs)
@staticmethod
def _build_SetStreamParamsRequest(target_bitrate, refresh_interval, idr_interval, awb_mode,
auto_exposure, sync_auto_exposure, manual_exposure):
exposure_args = [auto_exposure, sync_auto_exposure, manual_exposure]
if sum([arg is not None for arg in exposure_args]) > 1:
raise ValueError("Only one exposure argument can be specified at a time.")
request = streamquality_pb2.SetStreamParamsRequest()
if target_bitrate:
request.params.targetbitrate.value = target_bitrate
if refresh_interval:
request.params.refreshinterval.value = refresh_interval
if idr_interval:
request.params.idrinterval.value = idr_interval
if awb_mode:
request.params.awb.CopyFrom(streamquality_pb2.StreamParams.AwbMode(awb=awb_mode))
if auto_exposure:
request.params.auto_exposure.CopyFrom(auto_exposure)
if sync_auto_exposure:
request.params.sync_exposure.CopyFrom(sync_auto_exposure)
if manual_exposure:
request.params.manual_exposure.CopyFrom(manual_exposure)
return request
@staticmethod
def _params_from_response(response):
return response.params
@staticmethod
@handle_common_header_errors
def _streamquality_error_from_response(response): # pylint: disable=unused-argument
return None