Part 2: Capturing Images
In this part of the tutorial, you will:
Write an image service.
Run the image service, connecting to the robot.
Test that the image service functions properly.
Understanding Image Services
An image service is a grpc service that implements the [bosdyn.api.ImageService] interface. A single image service can handle multiple cameras, exposing them as image sources. It then responds to GetImage requests by providing the images for the requested sources. The on-robot “image” service is exactly this interface, and provides access to the images from the robot’s on-board cameras. Note that image services are expected to return images “quickly”. If you have an image sensor that takes significant time to capture its images (more than several seconds), then you should instead implement a data acquisition plugin which can report on its capture status, and is not used for live views.
We have several helper classes to simplify the creation of new image services. These classes are found in
CameraInterface: This is the abstract interface you will need to implement to read from particular camera hardware. This is the interface that we will implement in this tutorial. This implementation will require two methods:
blocking_capture(): Takes the data from the camera and returns image data and a timestamp.
image_decode(): Converts the image data into the correct format for returning a response to clients.
VisualImageSource: Handles “running” a
CameraInterface. It will manage details such as reporting faults, and optionally creating a thread to continually read from the camera.
CameraBaseImageServicer: Implements the details of the grpc service that will respond to requests.
Preparing the environment
For initial development, it is usually easiest to first connect the webcam (or other sensor) to your development machine and test and debug everything in that environment before deploying to the payload computer. Connect your webcam to your machine and verify that it works.
For this example, we will use OpenCV to read from the camera hardware.
Enter your Spot API virtualenv
my_spot_env with the name of the virtualenv that you created using the Spot Quickstart Guide:
Install OpenCV's python bindings in your virtualenv:
python3 -m pip install opencv-python==4.5.*
Make a folder called
~/data_capture that we’ll put everything into:
mkdir ~/data_capture cd ~/data_capture
Copy (or download)) the script below into a file called
web_cam_image_service.py in the
~/data_capture folder. This is a simplified version of the web cam example (which includes some extra options to control how images are captured).
Web cam image service
Set up the imports for images and the Spot API
import logging import os import time import cv2 import numpy as np import bosdyn.util from bosdyn.client.directory_registration import (DirectoryRegistrationClient, DirectoryRegistrationKeepAlive) from bosdyn.client.util import setup_logging from bosdyn.client.server_util import GrpcServiceRunner from bosdyn.api import image_pb2 from bosdyn.api import image_service_pb2_grpc from bosdyn.client.image_service_helpers import (VisualImageSource, CameraBaseImageServicer, CameraInterface, convert_RGB_to_grayscale)
Define some constants that we will use in the file.
DIRECTORY_NAME = 'web-cam-service' AUTHORITY = 'robot-web-cam' SERVICE_TYPE = 'bosdyn.api.ImageService' _LOGGER = logging.getLogger(__name__)
Now we will create the CameraInterface that does the work. The OpenCV VideoCapture class will handle the bulk of interfacing with standard webcams.
class WebCam(CameraInterface): """Provide access to the latest web cam data using openCV's VideoCapture.""" def __init__(self, device_name):
OpenCV device names can be either an integer index or a device name string (such as /dev/video0 on linux or even the name of a video file), For usage simplicity, we will allow either and attempt to open the camera, raising an exception if that fails. We also generate a name for the image source from the device name, the details of which will be defined later.
try: device_name = int(device_name) except ValueError as err: pass self.image_source_name = device_name_to_source_name(device_name) self.capture = cv2.VideoCapture(device_name) if not self.capture.isOpened(): err = "Unable to open a cv2.VideoCapture connection to %s" % device_name _LOGGER.warning(err) raise Exception(err)
We will also try to query the camera’s gain and exposure and reported resolution, so that we can report that data for the image source.
self.camera_exposure, self.camera_gain = None, None try: self.camera_gain = self.capture.get(cv2.CAP_PROP_GAIN) except cv2.error as e: _LOGGER.warning("Unable to determine camera gain: %s", e) self.camera_gain = None try: self.camera_exposure = self.capture.get(cv2.CAP_PROP_EXPOSURE) except cv2.error as e: _LOGGER.warning("Unable to determine camera exposure time: %s", e) self.camera_exposure = None # Determine the dimensions of the image. self.rows = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.cols = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))
Finally we’ll set a default jpeg quality to use if none is specified.
self.default_jpeg_quality = 75
Next we define the actual capture function. For simplicity we only use
time.time() as an approximation of the true capture time.
def blocking_capture(self): capture_time = time.time() success, image = self.capture.read() if success: return image, capture_time else: raise Exception("Unsuccessful call to cv2.VideoCapture().read()")
Lastly, we need to decode that data into the output proto. For this method,
image_data is the same data we returned from a
blocking_capture() call. Image_proto is the output Image protobuf that we need to fill out, and image_req is the ImageRequest that contains image_format, quality_percent, pixel_format and resize_ratio options requested by the client.
def image_decode(self, image_data, image_proto, image_req):
First, we convert the pixel format correctly and set the format in the output object.
pixel_format = image_req.pixel_format converted_image_data = image_data if pixel_format == image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U8: converted_image_data = convert_RGB_to_grayscale( cv2.cvtColor(image_data, cv2.COLOR_BGR2RGB)) if pixel_format == image_pb2.Image.PIXEL_FORMAT_UNKNOWN: image_proto.pixel_format = image_pb2.Image.PIXEL_FORMAT_RGB_U8 else: image_proto.pixel_format = pixel_format
If the resize_ratio is set, then we resize the image appropriately.
resize_ratio = image_req.resize_ratio quality_percent = image_req.quality_percent if resize_ratio < 0 or resize_ratio > 1: raise ValueError("Resize ratio %s is out of bounds." % resize_ratio) if resize_ratio != 1.0 and resize_ratio != 0: image_proto.rows = int(image_proto.rows * resize_ratio) image_proto.cols = int(image_proto.cols * resize_ratio) converted_image_data = cv2.resize(converted_image_data, (image_proto.cols, image_proto.rows), interpolation = cv2.INTER_AREA)
Next, we set the actual image data. If raw data was requested, we set the same bytes that we get from the camera.
# Set the image data. image_format = image_req.image_format if image_format == image_pb2.Image.FORMAT_RAW: image_proto.data = np.ndarray.tobytes(converted_image_data) image_proto.format = image_pb2.Image.FORMAT_RAW
If JPEG encoding was requested, we encode it with OpenCV. If the quality_percent was not filled out in the request, we’ll use our default quality.
elif image_format == image_pb2.Image.FORMAT_JPEG or image_format == image_pb2.Image.FORMAT_UNKNOWN or image_format is None: quality = self.default_jpeg_quality if 0 < quality_percent <= 100: quality = quality_percent encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), int(quality)] image_proto.data = cv2.imencode('.jpg', converted_image_data, encode_param).tobytes() image_proto.format = image_pb2.Image.FORMAT_JPEG
If we don’t recognize the requested format, we’ll raise an error.
else: raise Exception( "Image format %s is unsupported." % image_pb2.Image.Format.Name(image_format))
Now the image proto is filled out to be returned with the response. Note that we are not filling out any position transforms for the image, as in this case we don’t actually know where the camera is mounted.
Next we’ll make the helper that turns the OpenCV device name into a nice source name to report to clients.
def device_name_to_source_name(device_name): if type(device_name) == int: return "video" + str(device_name) else: return os.path.basename(device_name)
Now it’s time to create the image service. We take a list of device names and create our new WebCam class for each one, along with a VisualImageSource instance to run each one. We hard-code supported pixel formats in this tutorial when we create the VisualImageSource object. Please refer to SDK example on how to determine correct list of supported pixel formats.
def make_webcam_image_service(bosdyn_sdk_robot, service_name, device_names, logger=None): image_sources =  for device in device_names: web_cam = WebCam(device) img_src = VisualImageSource(web_cam.image_source_name, web_cam, rows=web_cam.rows, cols=web_cam.cols, gain=web_cam.camera_gain, exposure=web_cam.camera_exposure, pixel_formats=[image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U8, image_pb2.Image.PIXEL_FORMAT_RGB_U8]) image_sources.append(img_src)
Finally we create and return our image service from the image sources.
return CameraBaseImageServicer(bosdyn_sdk_robot, service_name, image_sources, logger)
Now that we have a way to create our image service (what gRPC calls a servicer), let’s make a function to run it. For this case we use the GrpcServiceRunner helper, which handles all the details of creating a gRPC server, adding the servicer to it, and running the service.
def run_service(bosdyn_sdk_robot, port, service_name, device_names, logger=None): add_servicer_to_server_fn = image_service_pb2_grpc.add_ImageServiceServicer_to_server service_servicer = make_webcam_image_service(bosdyn_sdk_robot, service_name, device_names, logger=logger) return GrpcServiceRunner(service_servicer, add_servicer_to_server_fn, port, logger=logger)
All that is left is creating the command line interface for this script to run it. Here are the options for configuring the devices. As a convenience, if no devices are specified we will default to index 0.
def add_web_cam_arguments(parser): parser.add_argument( '--device-name', help=('Image source to query. If none are passed, it will default to the first available ' 'source.'), nargs='*', default=['0'])
Now let’s create the main entry point. First we will set up and parse all command line arguments. These include our common arguments for connecting to a robot and creating a service.
if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(allow_abbrev=False) bosdyn.client.util.add_base_arguments(parser) bosdyn.client.util.add_payload_credentials_arguments(parser) bosdyn.client.util.add_service_endpoint_arguments(parser) add_web_cam_arguments(parser) options = parser.parse_args()
We default the device name to “0” so the script captures images from
devices = options.device_name if not devices: # No sources were provided. Set the default source as index 0 to point to the first # available device found by the operating system. devices = ["0"]
We set up logging options and then create the sdk and robot objects to communicate with Spot.
setup_logging(options.verbose, include_dedup_filter=True) sdk = bosdyn.client.create_standard_sdk("ImageServiceSDK") robot = sdk.create_robot(options.hostname)
We need to authenticate with the robot. Instead of using a username and password, we want to identify as a payload. In order for this to work, the payload must be registered and have been authorized through Spot’s web service.
Now we create our service runner using the helper we defined previously.
service_runner = run_service(robot, options.port, DIRECTORY_NAME, devices, logger=_LOGGER)
Lastly, we need to register our new service with the robot and run it. We will use the
DirectoryRegistrationKeepAlive helper to handle the registration.
dir_reg_client = robot.ensure_client(DirectoryRegistrationClient.default_service_name) keep_alive = DirectoryRegistrationKeepAlive(dir_reg_client, logger=_LOGGER) keep_alive.start(DIRECTORY_NAME, SERVICE_TYPE, AUTHORITY, options.host_ip, service_runner.port) with keep_alive: service_runner.run_until_interrupt()
We now have a complete, runnable service that we can test and use.
Testing the service
Authentication of an image service or a data acquisition plugin with a robot is done by using payload authentication instead of username/password login credentials. This allows us to authenticate these applications with payload credentials already created to register the payloads they pull data from. They can also use the SpotCORE payload credentials for authentication.
Registering a payload
In order to test these services, we need payload authentication credentials. Those credentials are created by registering a payload with the robot. The payload can represent an actual physical payload with mass and dimensions specified in the payload registration request, or a massless payload. To register a massless payload, please run the payload SDK example https://github.com/boston-dynamics/spot-sdk/tree/master/python/examples/payloads.
For more information on registering payloads, please take a look at this SDK documentation article.
The SpotCORE payload credentials can also be used to authenticate these image and data acquisition plugin services. They are located in
/opt/payload_credentials/payload_guid_and_secret in SpotCORE.
For testing on our development machine, we will use the credentials created and registered in Part 1.
Running the service
The simplest invocation of the service is just to run
export WEBCAM_PORT=5000 python3 web_cam_image_service.py --payload-credentials-file $CRED_FILE $ROBOT_IP --host-ip $SELF_IP --port $WEBCAM_PORT
This will use the first webcam found by OpenCV. Note that you will need to either disable any firewall or open the specified port to allow the robot to contact the service. If you are unsure of the correct ip address to use for
--host-ip, you can discover it via the bosdyn.client command line program:
python3 -m bosdyn.client $ROBOT_IP self-ip
Be sure that the
$WEBCAM_PORT is not blocked on your computer.
Once this is running, use the
image_service_tester.py program) to test its basic functionality.
python3 image_service_tester.py $ROBOT_IP --service-name web-cam-service --check-data-acquisition
This will attempt to capture images from all reported sources, as well as capture images through the data acquisition service.
Common errors at this point:
Firewall blocking requests
Incorrect host-ip specified
An additional test to perform is to use the robot’s tablet, and view the web-cam-service image source live on the tablet.