Source code for bosdyn.client.image_service_helpers

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import inspect
import logging
import sys
import threading
import time
from abc import ABC, abstractmethod

import numpy as np

from bosdyn.api import (header_pb2, image_pb2, image_service_pb2, image_service_pb2_grpc,
                        service_customization_pb2, service_fault_pb2)
from bosdyn.client.exceptions import RpcError
from bosdyn.client.fault import (FaultClient, ServiceFaultAlreadyExistsError,
                                 ServiceFaultDoesNotExistError)
from bosdyn.client.image import UnsupportedPixelFormatRequestedError
from bosdyn.client.server_util import populate_response_header
from bosdyn.client.service_customization_helpers import create_value_validator, validate_dict_spec
from bosdyn.client.util import setup_logging
from bosdyn.util import sec_to_nsec, seconds_to_duration

_LOGGER = logging.getLogger(__name__)

CLEAR_FAULT_RPC_TIMEOUT_SECS = 0.1


[docs]def convert_RGB_to_grayscale(image_data_RGB_np): """Convert numpy image from RGB to grayscale using Pillow's formula. Args: image_data_RGB_np (numpy array): The RGB image data output from a capture as a 3D numpy array with R, G, B channels in that order in the array. Returns: Converted image_data_np as another 2D numpy array. """ # gray = 0.299 * red + 0.587 * green + 0.114 * blue # This code performs the above math using integer math and bit shifts for speed. # The constants below are the same as the ones in the above equation multiplied by 2^16 # The final addition makes it round the correct direction when it gets truncated by the bit shift. rgb = image_data_RGB_np.astype(np.uint32) return ( (rgb[:, :, 0] * 19595 + rgb[:, :, 1] * 38470 + rgb[:, :, 2] * 7471 + 0x8000) >> 16).astype( np.uint8)
[docs]class CameraInterface(ABC): """Abstract class interface for capturing and decoding images from a camera. This interface is used by the VisualImageSource to ensure that each image source has the expected capture and decoding methods with the right specification. """ def __init__(self): self.capture_lock = threading.Lock()
[docs] @abstractmethod def blocking_capture(self, *, custom_params=None, **kwargs): """Communicates with the camera payload to collect the image data. Ensure this is threadsafe as multiple threads/requests may try to call blocking_capture at the same time. A Lock (self.capture_lock) is provided as a convenience to help with this. Args: custom_params (service_customization_pb2.DictParam): Custom parameters defined by the image source affecting the resulting capture **kwargs: Other keyword arguments including future additions to the request that affect the capture. If an implementer is looking to use kwargs, it is expected that they update their blocking_capture function signature to accept named keyword arguments such as custom_params Returns: A tuple with image data (in any format), and the capture timestamp in seconds (float) in the service computer's clock. """
[docs] @abstractmethod def image_decode(self, image_data, image_proto, image_req): """Decode the image data into an Image proto based on the requested format and quality. Args: image_data (Same format as the output of blocking_capture): The image data output from a capture. image_proto (image_pb2.Image): The proto message to populate with decoded image data. image_req (image_pb2.ImageRequest): The image request associated with the image_data. Returns: None. Mutates the image_proto message with the decoded data. This function should set the image data, pixel format, image format, and potentially the transform snapshot fields within the image_proto protobuf message. """
[docs]class VisualImageSource(): """Helper class to represent a single image source. This helper class can be used to track state associated with an image source. It can capture images and decode image data for an image service. It can be configured to: - throw faults for capture or decode failures - request images continuously on a background thread to allow image services to respond more rapidly to GetImage requests. Args: image_name (string): The name of the image source. camera_interface (subclass of CameraInterface): A camera class which inherits from and implements the abstract methods of the CameraInterface class. rows (int): The number of rows of pixels in the image. cols (int): The number of cols of pixels in the image. gain (float | function): The sensor's gain in dB. This can be a fixed value or a function which returns the gain as a float. exposure (float | function): The exposure time for an image in seconds. This can be a fixed value or a function which returns the exposure time as a float. pixel_formats (image_pb2.Image.PixelFormat[]): Supported pixel formats. param_spec (service_customization_pb2.DictParam.Spec): A set of custom parameters passed into this image source logger (logging.Logger): Logger for debug and warning messages. """ def __init__(self, image_name, camera_interface, rows=None, cols=None, gain=None, exposure=None, pixel_formats=[], logger=None, param_spec=None): self.image_source_name = image_name self.supported_pixel_formats = pixel_formats self.image_source_proto = self.make_image_source(image_name, rows, cols, self.supported_pixel_formats, param_spec=param_spec) self.get_image_capture_params = lambda request_custom_params=None: self.make_capture_parameters( gain=gain, exposure=exposure, request_custom_params=request_custom_params) self.param_spec = param_spec if param_spec: #Fail fast if the spec being used is invalid, and otherwise get a function to easily validate values self.value_validator = create_value_validator(param_spec) else: #Validate against empty parameter set self.value_validator = create_value_validator( service_customization_pb2.DictParam.Spec()) # Ensure the camera_interface is a subclass of CameraInterface and has the defined capture and # decode methods. assert isinstance(camera_interface, CameraInterface) self.camera_interface = camera_interface self.capture_function = lambda custom_params=None, **kwargs: self._do_capture_with_error_checking( self.camera_interface.blocking_capture, custom_params=custom_params, **kwargs) # Optional background thread to continuously capture image data. This will help an image # service to respond quickly to a GetImage request, since it can use the last captured image. self.capture_thread = None # Fault client to report errors. Requires the image service name to # properly create the fault id. self.fault_client = None # Create fault to throw if a failure occurs when calling the blocking_capture function. camera_capture_fault_id = service_fault_pb2.ServiceFaultId( fault_name="Image Capture Failure for %s" % self.image_source_name) self.camera_capture_fault = service_fault_pb2.ServiceFault( fault_id=camera_capture_fault_id, severity=service_fault_pb2.ServiceFault.SEVERITY_WARN) # Create a fault to throw if a failure occurs when calling the image_decode function. decode_data_fault_id = service_fault_pb2.ServiceFaultId( fault_name="Decoding Image %s Failure" % self.image_source_name) self.decode_data_fault = service_fault_pb2.ServiceFault( fault_id=decode_data_fault_id, severity=service_fault_pb2.ServiceFault.SEVERITY_WARN) # This is the set of active fault id names that can potentially get cleared. self.active_fault_id_names = set() # Logger for warning messages and the last error message (used only when the background thread # is capturing such that error messages from the last capture can be shown still). self.logger = logger or _LOGGER self.last_error_message = None
[docs] def set_logger(self, logger): """Override the existing logger for the VisualImageSource class.""" if logger is not None: self.logger = logger
[docs] def create_capture_thread(self, custom_params=None): """Initialize a background thread to continuously capture images. """ self.capture_thread = ImageCaptureThread(self.image_source_name, self.capture_function, custom_params=custom_params) self.capture_thread.start_capturing()
[docs] def initialize_faults(self, fault_client, image_service): """Initialize a fault client and faults for the image source (linked to the image service). The fault client can be used to throw faults for capture and decode errors. All faults associated with the image service name provided will be cleared. Args: fault_client (FaultClient): The fault client to communicate with the robot. image_service (string): The image service name to associate faults with. """ self.fault_client = fault_client # Update the fault ids to include the correct image service name. self.camera_capture_fault.fault_id.service_name = image_service self.decode_data_fault.fault_id.service_name = image_service # Attempt to clear any previous faults for the image service. if self.fault_client is not None: try: self.fault_client.clear_service_fault( service_fault_pb2.ServiceFaultId(service_name=image_service), clear_all_service_faults=True) except ServiceFaultDoesNotExistError: self.active_fault_id_names.clear() except RpcError as exc: self.logger.error("RPC error in initialize_faults: %s.", exc)
[docs] def trigger_fault(self, error_message, fault): """Trigger a service fault for a failure to the fault service. Args: error_message (string): The error message to provide in the camera capture fault. fault (service_fault_pb2.ServiceFault): The complete service fault to be issued. """ if self.fault_client is not None and fault is not None: fault.error_message = error_message try: self.fault_client.trigger_service_fault(fault) self.active_fault_id_names.add(fault.fault_id.fault_name) except ServiceFaultAlreadyExistsError: pass except RpcError as exc: self.logger.error("RPC error in trigger_fault: %s.", exc)
[docs] def clear_fault(self, fault): """Attempts to clear the camera capture fault from the fault service. Args: fault (service_fault_pb2.ServiceFault): The fault (which contains an ID) to be cleared. """ if self.fault_client is not None and fault is not None: try: if fault.fault_id.fault_name in self.active_fault_id_names: self.fault_client.clear_service_fault(fault.fault_id, timeout=CLEAR_FAULT_RPC_TIMEOUT_SECS) self.active_fault_id_names.remove(fault.fault_id.fault_name) except ServiceFaultDoesNotExistError: self.logger.warning("No service fault found to clear.") except RpcError as exc: self.logger.error("RPC error in clear_fault: %s.", exc)
def _maybe_log_error(self, error_message=None, show_last_error=False): """Logs the error message if it is new (to prevent spam). Args: error_message (string): The new error message. show_last_error (Boolean): If true, the error message will be logged even if it is not new. Returns: None. Logs messages to the logger. """ if error_message is not None: self.last_error_message = error_message if show_last_error: # force the printout of the last error message by sending the log message at a logger level # high enough to not be filtered out. self.logger.error(self.last_error_message) else: self.logger.warning(error_message) def _do_capture_with_error_checking(self, capture_func, custom_params=None, **capture_func_kwargs): """Calls the blocking capture function and checks for any exceptions. This function will print warning messages and trigger a camera capture fault if an exception is thrown by the camera interface's blocking capture function. Args: capture_func (CameraInterface.blocking_capture): The function capturing the image data and timestamp. custom_params (service_customization_pb2.DictParam): Custom parameters passed to the blocking capture affecting the resulting capture **capture_func_kwargs: Other keyword arguments for the capture_func """ try: if custom_params or capture_func_kwargs: # If supplying custom_params, try a blocking capture function that can handle them try: img, timestamp = capture_func(custom_params=custom_params, **capture_func_kwargs) except TypeError: img, timestamp = capture_func() else: # Supports pre-3.3 blocking_capture if no custom_params or kwargs (both introduced in 3.3) are passed img, timestamp = capture_func() # Clear out any old error messages if the capture succeeds. self.last_error_message = None # Clear any previous camera capture faults after a successful image capture for # this image source. self.clear_fault(self.camera_capture_fault) return img, timestamp except Exception as err: error_message = "Failed to capture an image from %s: %s %s" % (self.image_source_name, type(err), err) self._maybe_log_error(error_message) self.trigger_fault(error_message, self.camera_capture_fault) return None, None
[docs] def get_image_and_timestamp(self, custom_params=None, **capture_func_kwargs): """Retrieve the latest captured image and timestamp. Args: custom_params (service_customization_pb2.DictParam): Custom parameters passed to the object's capture_function affecting the resulting capture **capture_func_kwargs: Other keyword arguments for the capture_function Returns: The latest captured image and the time (in seconds) associated with that image capture. Throws a camera capture fault and returns None if the image cannot be retrieved. """ if self.capture_thread is not None: thread_capture_output = self.capture_thread.get_latest_captured_image( custom_params=custom_params, **capture_func_kwargs) valid_thread_capture, image, timestamp = thread_capture_output.is_valid, thread_capture_output.image, thread_capture_output.timestamp if valid_thread_capture: if image is None or timestamp is None: # Force the printout of the last error message since the capture failed. self._maybe_log_error(show_last_error=True) return image, timestamp else: return self.capture_function(custom_params=custom_params, **capture_func_kwargs) else: # Call the capture function (which is wrapped with an error checker) to block and get the data. # capture_function already handles pre-3.3 blocking capture compatibility return self.capture_function(custom_params=custom_params, **capture_func_kwargs)
[docs] def image_decode_with_error_checking(self, image_data, image_proto, image_req): """Decode the image data into an Image proto based on the requested format and quality. Args: image_data(any format): The image data returned by the camera interface's blocking_capture function. image_proto (image_pb2.Image): The image proto to be mutated with the decoded data. image_req (image_pb2.ImageRequest): The image request associated with the image_data. Returns: image_pb2.ImageResponse.Status indicating if the decoding succeeds, or image format conversion or pixel conversion failed. Throws a decode data fault if the image or pixels cannot be decoded to the desired format. Mutates the image_proto Image proto with the decoded data if successful. """ decode_format = None quality_percent = None pixel_format = None if image_req: decode_format = image_req.image_format quality_percent = image_req.quality_percent pixel_format = image_req.pixel_format if pixel_format and (pixel_format not in self.supported_pixel_formats): return image_pb2.ImageResponse.STATUS_UNSUPPORTED_PIXEL_FORMAT_REQUESTED try: # Older function definition did not have image_req # Try/except for backwards compatibility try: self.camera_interface.image_decode(image_data, image_proto, image_req) except TypeError: self.camera_interface.image_decode(image_data, image_proto, decode_format, quality_percent) # Clear any previous decode data faults after a successful decoding by this image source. self.clear_fault(self.decode_data_fault) return image_pb2.ImageResponse.STATUS_OK except Exception as err: decode_format_str = None if decode_format is None: decode_format_str = "unknown format" else: decode_format_str = image_pb2.Image.Format.Name(decode_format) error_message = "Failed to decode image %s to format %s: %s %s" % ( self.image_source_name, decode_format_str, type(err), err) self._maybe_log_error(error_message) self.trigger_fault(error_message, self.decode_data_fault) return image_pb2.ImageResponse.STATUS_UNSUPPORTED_IMAGE_FORMAT_REQUESTED
[docs] def stop_capturing(self, timeout_secs=10): """Stop the image capture thread if one exists. Args: timeout_secs(int): The timeout [seconds] for attempting to join and destroy the capture thread. """ if self.capture_thread is not None: self.capture_thread.stop_capturing(timeout_secs)
[docs] @staticmethod def make_image_source(source_name, rows=None, cols=None, pixel_formats=[], image_type=image_pb2.ImageSource.IMAGE_TYPE_VISUAL, image_formats=[image_pb2.Image.FORMAT_JPEG], param_spec=None): """Create an instance of the image_pb2.ImageSource for a given source name. Args: source_name (string): The name for the camera source. rows (int): The number of rows of pixels in the image. cols (int): The number of cols of pixels in the image. image_type (image_pb2.ImageType): The type of image (e.g. visual, depth). image_formats (image_pb2.Image.Format): The image formats supported (jpeg, raw) pixel_formats (image_pb2.Image.PixelFormat): The pixel formats supported param_spec (service_customization_pb2.DictParam.Spec): A set of custom parameters passed into this image source Returns: An ImageSource with the cols, rows, and image type populated. """ source = image_pb2.ImageSource() source.name = source_name if rows is not None and cols is not None: source.rows = rows source.cols = cols # Image from the ricoh theta is a JPEG, which is considered a visual image source (no depth data). source.image_type = image_type source.image_formats.extend(image_formats) source.pixel_formats.extend(pixel_formats) if param_spec: source.custom_params.CopyFrom(param_spec) return source
[docs] @staticmethod def make_capture_parameters(gain=None, exposure=None, request_custom_params=None): """Creates an instance of the image_pb2.CaptureParameters protobuf message. Args: gain (float | function): The sensor's gain in dB. This can be a fixed value or a function which returns the gain as a float. exposure (float | function): The exposure time for an image in seconds. This can be a fixed value or a function which returns the exposure time as a float. request_custom_params (service_customization_pb2.DictParam): Custom Params associated with the image request. Should not be 'None', but left as an option to accomodate old callers Returns: An instance of the protobuf CaptureParameters message. """ params = image_pb2.CaptureParameters() if gain: if callable(gain): params.gain = gain() else: params.gain = gain if exposure: if callable(exposure): params.exposure_duration.CopyFrom(seconds_to_duration(exposure())) elif exposure == float('inf'): params.exposure_duration.CopyFrom(seconds_to_duration(sys.maxsize)) else: params.exposure_duration.CopyFrom(seconds_to_duration(exposure)) if request_custom_params: params.custom_params.CopyFrom(request_custom_params) return params
[docs]class ThreadCaptureOutput: """Small struct to represent the output of an ImageCaptureThread's get_latest_captured_image in a future-compatible way Args: is_valid (Boolean): Whether the latest capture uses the custom parameters and other arguments supplied in the latest request, and is therefore returned image (Any | None): If the latest capture is valid, the image data in any format (e.g. numpy, bytes, array) timestamp (float): The timestamp that the latest valid capture was taken """ def __init__(self, is_valid, image, timestamp): self.is_valid = is_valid self.image = image self.timestamp = timestamp
[docs]class ImageCaptureThread(): """Continuously query and store the last successfully captured image and its associated timestamp for a single camera device. Args: image_source_name(string): The image source name. capture_func (CameraInterface.blocking_capture): The function capturing the image data and timestamp. capture_period_secs (int): Amount of time (in seconds) between captures to wait before triggering the next capture. Defaults to 0.05s between captures. custom_params (service_customization_pb2.DictParam): Custom parameters passed to capture_func affecting the resulting capture **capture_func_kwargs: Other keyword arguments for the capture_func """ def __init__(self, image_source_name, capture_func, capture_period_secs=0.05, custom_params=None, **capture_func_kwargs): # Name of the image source that is being requested from. self.image_source_name = image_source_name # Indicate if the image capture thread is alive. self.stop_capturing_event = threading.Event() # Track the last image and timestamp for this image source. self.last_captured_image = None self.last_captured_time = None # Has a capture with the latest parameters completed (not necessarily successfully) self.has_updated_capture = False # Lock for the thread. self._thread_lock = threading.Lock() self._thread = None # The wait time between captures. self.capture_period_secs = capture_period_secs # Custom parameters the thread is currently running with self.custom_params = custom_params #Other keyword arguments the thread is currently running with self.capture_func_kwargs = capture_func_kwargs # Original user-passed capture_func self.init_capture_function = capture_func # Function that completes the capture # Expected function signature: blocking_capture_function(custom_params=None): returns (image data[numpy bytes array], time[float]) self.capture_function = self._make_capture_func(capture_func, custom_params=None, **capture_func_kwargs)
[docs] def start_capturing(self): """Start the background thread for the image captures.""" print("Starting the thread for %s" % self.image_source_name) self._thread = threading.Thread(target=self._do_image_capture) self._thread.daemon = True self._thread.start()
[docs] def maybe_update_thread(self, custom_params=None, **capture_func_kwargs): if custom_params != self.custom_params or capture_func_kwargs != self.capture_func_kwargs: self.capture_function = self._make_capture_func(self.init_capture_function, custom_params, **capture_func_kwargs) self.custom_params = custom_params self.capture_func_kwargs = capture_func_kwargs self.has_updated_capture = False
[docs] def set_last_captured_image(self, image_frame, capture_time): """Update the last image capture and timestamp.""" with self._thread_lock: self.last_captured_image = image_frame self.last_captured_time = capture_time self.has_updated_capture = True
[docs] def get_latest_captured_image(self, custom_params=None, **capture_func_kwargs): """Returns the last found image and timestamp in a ThreadCaptureOutput object if that image uses the latest params. Otherwise returns a ThreadCaptureOutput object with is_valid = False and capture/timestamp as None. """ with self._thread_lock: if (custom_params == self.custom_params and capture_func_kwargs == self.capture_func_kwargs and self.has_updated_capture): return ThreadCaptureOutput(True, self.last_captured_image, self.last_captured_time) else: self.maybe_update_thread(custom_params=custom_params, **capture_func_kwargs) return ThreadCaptureOutput(False, None, None)
def _make_capture_func(self, capture_func, custom_params=None, **capture_func_kwargs): """Update the capture function to use custom_params and capture_func_kwargs if it can""" # capture_func is likely provided through create_capture_thread, which already does handling for pre-3.3 blocking_capture # Still, check for the custom_params argument to ensure we accomodate pre-3.3 direct implementations of ImageCaptureThreads if "custom_params" in inspect.signature(capture_func).parameters.keys(): #If using a blocking capture function that takes in custom params or kwargs, one should supply those def output_capture_function(): return capture_func(custom_params=custom_params, **capture_func_kwargs) else: output_capture_function = capture_func return output_capture_function def _do_image_capture(self): """Main loop for the image capture thread, which requests and saves images.""" while not self.stop_capturing_event.is_set(): # Get the image by calling the blocking capture function. start_time = time.time() capture, capture_time = self.capture_function() self.set_last_captured_image(capture, capture_time) # Wait for the total capture period (where the wait time is adjusted based on how # long the capture took). wait_time = self.capture_period_secs - (time.time() - start_time) if self.stop_capturing_event.wait(wait_time): # If stop_capturing_event is set, then break from the capture loop now. break
[docs] def stop_capturing(self, timeout_secs=10): """Stop the image capture thread.""" self.stop_capturing_event.set() self._thread.join(timeout=timeout_secs)
[docs]class CameraBaseImageServicer(image_service_pb2_grpc.ImageServiceServicer): """GRPC service to provide access to multiple different image sources. The service can list the available image (device) sources and query each source for image data. Args: bosdyn_sdk_robot (Robot): The robot instance for the service to connect to. service_name (string): The name of the image service. image_sources(List[VisualImageSource]): The list of image sources (provided as a VisualImageSource). logger (logging.Logger): Logger for debug and warning messages. use_background_capture_thread (bool): If true, the image service will create a thread that continuously captures images so the image service can respond rapidly to the GetImage request. If false, the image service will call an image sources' blocking_capture_function during the GetImage request. background_capture_params (service_customization_pb2.DictParam): If use_background_capture_thread is true, custom image source parameters used for all of the background captures. Otherwise ignored """ def __init__(self, bosdyn_sdk_robot, service_name, image_sources, logger=None, use_background_capture_thread=True, background_capture_params=None): super(CameraBaseImageServicer, self).__init__() if logger is None: # Set up the logger to remove duplicated messages and use a specific logging format. setup_logging(include_dedup_filter=True) self.logger = _LOGGER else: self.logger = logger self.bosdyn_sdk_robot = bosdyn_sdk_robot # Service name this servicer is associated with in the robot directory. self.service_name = service_name # Fault client to report service faults self.fault_client = self.bosdyn_sdk_robot.ensure_client(FaultClient.default_service_name) # Get a timesync endpoint from the robot instance such that the image timestamps can be # reported in the robot's time. self.bosdyn_sdk_robot.time_sync.wait_for_sync() # A list of all the image sources available by this service. List[VisualImageSource] self.image_sources_mapped = dict() # Key: source name (string), Value: VisualImageSource for source in image_sources: # Set the logger for each visual image source to be the logger of the camera service class. source.set_logger(self.logger) # Set up the fault client so service faults can be created. source.initialize_faults(self.fault_client, self.service_name) # Potentially start the capture threads in the background. if use_background_capture_thread: source.create_capture_thread(background_capture_params) # Save the visual image source class associated with the image source name. self.image_sources_mapped[source.image_source_name] = source
[docs] def ListImageSources(self, request, context): """Obtain the list of ImageSources for this given service. Args: request (image_pb2.ListImageSourcesRequest): The list images request. context (GRPC ClientContext): tracks internal grpc statuses and information. Returns: A list of all image sources known to this image service. Note, there could be multiple image services registered with the robot's directory service that can have different image sources. """ response = image_pb2.ListImageSourcesResponse() for source in self.image_sources_mapped.values(): response.image_sources.add().CopyFrom(source.image_source_proto) populate_response_header(response, request) return response
def _set_format_and_decode(self, image_data, img_proto, img_req): """Calls the image_decode_with_error_checking function, which returns a (Boolean, Boolean) if the decoding succeeds.""" # This function should set the image data, pixel format, image format, and transform snapshot fields. return self.image_sources_mapped[ img_req.image_source_name].image_decode_with_error_checking( image_data, img_proto, img_req)
[docs] def GetImage(self, request, context): """Gets the latest image capture from all the image sources specified in the request. Args: request (image_pb2.GetImageRequest): The image request, which specifies the image sources to query, and other format parameters. context (GRPC ClientContext): tracks internal grpc statuses and information. Returns: The ImageSource and Image data for the last captured image from each image source name specified in the request. """ response = image_pb2.GetImageResponse() for img_req in request.image_requests: img_resp = response.image_responses.add() src_name = img_req.image_source_name if src_name not in self.image_sources_mapped: # The requested camera source does not match the name of the Ricoh Theta camera, so it cannot # be completed and will have a failure status in the response message. img_resp.status = image_pb2.ImageResponse.STATUS_UNKNOWN_CAMERA self.logger.warning("Camera source '%s' is unknown.", src_name) continue if img_req.resize_ratio < 0 or img_req.resize_ratio > 1: img_resp.status = image_pb2.ImageResponse.STATUS_UNSUPPORTED_RESIZE_RATIO_REQUESTED self.logger.warning("Resize ratio %f is unsupported.", img_req.resize_ratio) continue if img_req.HasField("custom_params"): value_validation_error = self.image_sources_mapped[src_name].value_validator( img_req.custom_params) if value_validation_error: img_resp.status = image_pb2.ImageResponse.STATUS_CUSTOM_PARAMS_ERROR img_resp.custom_param_error.CopyFrom(value_validation_error) continue # Set the image source information in the response. img_resp.source.CopyFrom(self.image_sources_mapped[src_name].image_source_proto) # Set the image capture parameters in the response. img_resp.shot.capture_params.CopyFrom( self.image_sources_mapped[src_name].get_image_capture_params(img_req.custom_params)) if img_req.HasField("custom_params"): #If future keyword arguments are added here, they'll need to be added to this call #get_image_and_timestamp already calls a 'sanitized' capture function that handles pre-3.3 blocking_captures captured_image, img_time_seconds = self.image_sources_mapped[ src_name].get_image_and_timestamp(custom_params=img_req.custom_params) else: #get_image_and_timestamp() can accomodate pre-3.3 blocking_capture calls if no custom params are supplied captured_image, img_time_seconds = self.image_sources_mapped[ src_name].get_image_and_timestamp() if captured_image is None or img_time_seconds is None: img_resp.status = image_pb2.ImageResponse.STATUS_IMAGE_DATA_ERROR error_message = "Failed to capture an image from %s on the server." % src_name response.header.error.message = error_message self.logger.warning(error_message) continue # Convert the image capture time from the local clock time into the robot's time. Then set it as # the acquisition timestamp for the image data. img_resp.shot.acquisition_time.CopyFrom( self.bosdyn_sdk_robot.time_sync.robot_timestamp_from_local_secs(img_time_seconds)) img_resp.shot.image.rows = img_resp.source.rows img_resp.shot.image.cols = img_resp.source.cols # Set the image data. img_resp.shot.image.format = img_req.image_format decode_status = self._set_format_and_decode(captured_image, img_resp.shot.image, img_req) if decode_status != image_pb2.ImageResponse.STATUS_OK: img_resp.status = decode_status # Set that we successfully got the image. if img_resp.status == image_pb2.ImageResponse.STATUS_UNKNOWN: img_resp.status = image_pb2.ImageResponse.STATUS_OK # No header error codes, so set the response header as CODE_OK. populate_response_header(response, request) return response
def __del__(self): for source in self.image_sources_mapped.values(): source.stop_capturing()