Data Acquisition Plugin Service

A set of helper functions for implementing a Data Acquisition plugin service.

The DataAcquisitionPluginService class in this module is the recommended way to create a plugin service for data acquisition. To use it, you must define a function for performing the data collection with the following signature:

def data_collect_fn(request:AcquirePluginDataRequest, store_helper: DataAcquisitionStoreHelper)

This function is responsible for collecting all of your plugin’s data, and storing it with through the store_helper. The store_helper (DataAcquisitionStoreHelper) kicks off asynchronous data store calls for each piece of data, metadata, or images. The plugin service class has an internal helper function which calls DataAcquisitionStoreHelper.wait_for_Stores_complete method that blocks until all saving to the data acquisition store is finished. The helper class will monitor the futures for completion and update the state status and errors appropriately.

If errors occur during the data collection+saving process, use state.add_errors to report which intended DataIdentifiers had problems:

state.add_errors([DataIdentifiers], 'Failure to collect data 1')

Long-running acquisitions should be sure to call state.cancel_check() occasionally to exit early and cleanly if the acquisition has been cancelled by the user or a timeout. If your plugin has extra cleanup that it needs to perform in the event of a cancelled request, wrap your check in a try-except block that can catch the RequestCancelledError thrown by the cancel_check function, then perform the cleanup and re-raise the exception. For example:

try:
    # Data collection and storage here
    state.cancel_check()
except RequestCancelledError:
    # Perform cleanup here
    raise

Note, the data acquisition plugin service helper class will monitor and respond to the GetStatus RPC. However, the data_collect_fn function should update the status to STATUS_SAVING when it transitions to storing the data.

exception bosdyn.client.data_acquisition_plugin_service.RequestCancelledError[source]

Bases: Exception

The request has been cancelled and should no longer be handled.

bosdyn.client.data_acquisition_plugin_service.make_error(data_id, error_msg, error_data=None)[source]

Helper to simplify creating a DataError to send to RequestState.add_errors.

Parameters:
  • data_id (DataIdentifier) – The proto for the data identifier which has an error.

  • error_msg (string) – The error message to associate with the data id.

  • error_data (google.protobuf.Any) – Additional data to be packed with the error.

Returns:

The DataError protobuf message for this errored data id.

class bosdyn.client.data_acquisition_plugin_service.RequestState[source]

Bases: object

Interface for a data collection to update its state as it proceeds.

Each AcquireData RPC made to the plugin service will create an instance of RequestState to manage the incoming acquisition request’s overall state, including if it has been cancelled, any errors that occur, and the current status of the request.

kNonError = {1, 2}
set_status(status)[source]

Update the status of the request.

Parameters:

status (GetStatusResponse.Status) – An updated status enum to be set in the stored GetStatusResponse.

set_complete_if_no_error(logger=None)[source]

Mark that everything is complete.

add_saved(data_ids)[source]

Record that some data was saved successfully.

Parameters:

data_ids (Iterable[DataId]) – Data IDs that have been successfully saved.

add_errors(data_errors)[source]

Report that some errors have occurred during the data capture. Use the make_error function to simplify creating data errors.

Parameters:

data_errors (Iterable[DataError]) – data errors to include as errors in the status.

has_data_errors()[source]

Return True if any data errors have been added to this status.

cancel_check()[source]

Raises RequestCancelledError if the request has already been cancelled.

is_cancelled()[source]

Query if the request is already cancelled.

class bosdyn.client.data_acquisition_plugin_service.DataAcquisitionStoreHelper(store_client, state, cancel_interval=1)[source]

Bases: object

This class simplifies the management of data acquisition stores for a single request.

Request state will be updated according to store progress.

Parameters:
  • store_client (bosdyn.client.DataAcquisitionStoreClient) – A data acquisition store client.

  • state (RequestState) – state of the request, to be modified with errors or completion.

  • cancel_interval (float) – How often to check for cancellation of the request while waiting for the futures to complete.

store_client

A data acquisition store client.

Type:

bosdyn.client.DataAcquisitionStoreClient

state

state of the request, to be modified with errors or completion.

Type:

RequestState

cancel_interval

How often to check for cancellation of the request while waiting for the futures to complete.

Type:

float

data_id_future_pairs

The data identifier and the associated future which results from the async store data rpc.

Type:

List[Pair(DataIdentifier, Future)]

store_metadata(metadata, data_id)[source]

Store metadata with the data acquisition store service.

Parameters:
  • metadata (bosdyn.api.AssociatedMetadata) – Metadata message to store.

  • data_id (bosdyn.api.DataIdentifier) – Data identifier to use for storing this data.

Raises:

RPCError – Problem communicating with the robot.

store_image(image_capture, data_id)[source]

Store an image with the data acquisition store service.

Parameters:
  • image_capture (bosdyn.api.ImageCapture) – Image to store.

  • data_id (bosdyn.api.DataIdentifier) – Data identifier to use for storing this data.

Raises:

RPCError – Problem communicating with the robot.

store_data(message, data_id, file_extension=None)[source]

Store a data message with the data acquisition store service.

Parameters:
  • message (bytes) – Data to store.

  • data_id (bosdyn.api.DataIdentifier) – Data identifier to use for storing this data.

  • file_extension (string) – File extension to use for writing the data to a file.

Raises:

RPCError – Problem communicating with the robot.

cancel_check()[source]

Raises RequestCancelledError if the request has already been cancelled.

wait_for_stores_complete()[source]

Block and wait for all stores to complete. Update state with store success/failures.

Raises:

RequestCancelledError – The data acquisition request was cancelled.

class bosdyn.client.data_acquisition_plugin_service.DataAcquisitionPluginService(robot, capabilities, data_collect_fn, acquire_response_fn=None, executor=None, logger=None, live_response_fn=None)[source]

Bases: DataAcquisitionPluginServiceServicer

Implementation of a data acquisition plugin. It relies on the provided data_collect_fn to implement the heart of the data collection and storage.

Parameters:
  • robot – Authenticated robot object.

  • capabilities – List of DataAcquisitionCapability that describe what this plugin can do.

  • data_collect_fn – Function that performs the data collection and storage. Ordered input arguments (to data_collect_fn): data_acquisition_pb2.AcquirePluginDataRequest, DataAcquisitionStoreHelper. Output(to data_collect_fn): None

  • acquire_response_fn – Optional function that can validate a request and provide a timeout deadline. Function returns a boolean indicating if the request is valid; if False, the response is returned immediately without calling the data collection function or saving any data. Ordered input arguments (to acquire_response_fn): data_acquisition_pb2.AcquirePluginDataRequest, data_acquisition_pb2.AcquirePluginDataResponse. Output (to data_collect_fn): Boolean

  • live_response_fn – Optional function that sends signals data to the robot for purposes of displaying it on the tablet and Orbit during teleoperation. Input argument (to live_response_fn): data_acquisition_pb2.LiveDataRequest.

  • executor – Optional thread pool.

logger

Logger used by the service.

Type:

logging.Logger

capabilities

List of capabilities that describe what this plugin can do.

Type:

List[DataAcquisitionCapability]

data_collect_fn

Function that performs the data collection and storage.

acquire_response_fn

Function that can validate a request and provide a timeout deadline.

live_response_fn

Function that sends signals data to the robot for purposes of displaying it on the tablet and Orbit during teleoperation.

request_manager

Helper class which manages the RequestStates created with each acquisition RPC.

Type:

RequestManager

executor

Thread pool to run the plugin service on.

Type:

ThreadPoolExecutor

robot

Authenticated robot object.

Type:

Robot

store_client

Client for the data acquisition store service.

Type:

DataAcquisitionStoreClient

service_type = 'bosdyn.api.DataAcquisitionPluginService'
validate_params(request, response)[source]

Validate that any parameters set in the request are valid according the spec.

AcquirePluginData(request, context)[source]

Trigger a data acquisition and store results in the data acquisition store service.

Parameters:
  • request (data_acquisition_pb2.AcquirePluginDataRequest) – The data acquisition request.

  • context (GRPC ClientContext) – tracks internal grpc statuses and information.

Returns:

An AcquirePluginDataResponse containing a request_id to use with GetStatus.

GetStatus(request, context)[source]

Query the status of a data acquisition by ID.

Parameters:
  • request (data_acquisition_pb2.GetStatusRequest) – The get status request.

  • context (GRPC ClientContext) – tracks internal grpc statuses and information.

Returns:

An GetStatusResponse containing the details of the data acquisition.

GetServiceInfo(request, context)[source]

Get a list of data acquisition capabilities.

Parameters:
  • request (data_acquisition_pb2.GetServiceInfoRequest) – The get service info request.

  • context (GRPC ClientContext) – tracks internal grpc statuses and information.

Returns:

An GetServiceInfoResponse containing the list of data acquisition capabilities for the plugin.

CancelAcquisition(request, context)[source]

Cancel a data acquisition by ID.

Parameters:
  • request (data_acquisition_pb2.CancelAcquisitionRequest) – The cancel acquisition request.

  • context (GRPC ClientContext) – tracks internal grpc statuses and information.

Returns:

An CancelAcquisitionResponse containing the status of the cancel operation.

GetLiveData(request, context)[source]

Get the live data available from this plugin.

Parameters:
  • request (data_acquisition_pb2.LiveDataRequest) – The live data request with params.

  • context (GRPC ClientContext) – tracks internal grpc statuses and information (unused).

Returns:

Result of live_response_fn.

Return type:

response (data_acquisition_pb2.LiveDataResponse)

class bosdyn.client.data_acquisition_plugin_service.RequestManager[source]

Bases: object

Manage request lifecycles and status.

The RequestManager manages some internals of the RequestState class, so it will access its protected variables. We leave those variables protected so that users of the RequestState class are less tempted to fiddle with them incorrectly, but we turn off the linting for the rest of this file.

add_request()[source]

Create a new request to manage

get_request_state(request_id)[source]

Get the RequestState object for managing a request.

Parameters:

request_id (int) – The request_id for the acquisition request being inspected.

get_status_proto(request_id)[source]

Get a copy of the current status for the specified request.

Parameters:

request_id (int) – The request_id for the acquisition request being inspected.

mark_request_cancelled(request_id)[source]

Mark a request as cancelled, and no longer able to be updated.

Parameters:

request_id (int) – The request_id for the acquisition request being cancelled.

mark_request_finished(request_id)[source]

Mark a request as finished, and able to be removed later.

Parameters:

request_id (int) – The request_id for the acquisition request being completed.

cleanup_requests(older_than_time=None)[source]

Remove all requests that were completed farther in the past than older_than_time.

Defaults to removing anything older than 30 seconds.

Parameters:

older_than_time (float) – Optional time (in seconds) that requests will be removed after.