# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
import logging
import ssl
import time
from pathlib import Path
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct
import bosdyn.client
import bosdyn.client.util
from bosdyn.api import data_acquisition_pb2, data_acquisition_store_pb2
from bosdyn.client.exceptions import ResponseError
# Logger for all the debug information from the tests.
_LOGGER = logging.getLogger()
[docs]def issue_acquire_data_request(data_acq_client, acquisition_requests, group_name, action_name,
metadata=None, data_timestamp=None):
"""Sends the data acquisition request without blocking until the acquisition completes.
Args:
data_acq_client: DataAcquisition client for send the acquisition requests.
acquisition_requests: Acquisition requests to include in request message.
group_name: Group name for the acquisitions.
action_name: Action name for the acquisitions.
metadata: Metadata to include in the request message.
data_timestamp: Timestamp to use for the acquisitions. If None the timestamp will be
generated by the data acquisition client.
Returns:
The request id (int) and the action id (CaptureActionId). A request id set as None
indicates the AcquireData rpc failed.
"""
# Create action id for the query for this request.
action_id = data_acquisition_pb2.CaptureActionId(action_name=action_name, group_name=group_name,
timestamp=data_timestamp)
# Send an AcquireData request
request_id = None
try:
request_id = data_acq_client.acquire_data(acquisition_requests=acquisition_requests,
action_name=action_name,
group_name=action_id.group_name,
metadata=metadata, data_timestamp=data_timestamp)
except ResponseError as err:
print("Exception raised by issue_acquire_data_request: " + str(err))
return request_id, action_id
[docs]def acquire_and_process_request(data_acquisition_client, acquisition_requests, group_name,
action_name, metadata=None, block_until_complete=True,
data_timestamp=None):
"""Send acquisition request and optionally block until the acquisition completes.
If blocking, the GetStatus RPC is used to monitor the status of the acquisition request.
Args:
data_acquisition_client (DataAcquisitionClient): The client for send the acquisition requests.
acquisition_requests(data_acquisition_pb2.AcquisitionRequestList): Acquisition requests
to include in request message.
group_name(string): Group name for the acquisitions.
action_name(string): Action name for the acquisitions.
metadata(data_acquisition_pb2.Metadata): Metadata to include in the request message.
block_until_complete(Boolean): If true, don't return until the GetStatus completes.
data_timestamp: Timestamp to use for the acquisitions. If None the timestamp will be
generated by the data acquisition client.
Returns:
Boolean indicating if the acquisition completed successfully or not.
"""
# Make the acquire data request. This will return our current request id.
request_id, action_id = issue_acquire_data_request(data_acquisition_client,
acquisition_requests, group_name,
action_name, metadata, data_timestamp)
if not request_id:
# The AcquireData request failed for some reason. No need to attempt to
# monitor the status.
return False
if not block_until_complete:
return True
# Monitor the status of the data acquisition.
print("Waiting for acquisition (id: %s) to complete." % str(request_id))
while True:
get_status_response = None
try:
get_status_response = data_acquisition_client.get_status(request_id)
except ResponseError as err:
print("Exception: %s" % str(err))
return False
print("Current status is: %s" %
data_acquisition_pb2.GetStatusResponse.Status.Name(get_status_response.status))
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_COMPLETE:
return True
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_TIMEDOUT:
print("Unrecoverable request timeout: %s" % get_status_response)
return False
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_DATA_ERROR:
print("Data error was received: %s" % get_status_response)
return False
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_REQUEST_ID_DOES_NOT_EXIST:
print(
"The acquisition request id %s is unknown: %s" % (request_id, get_status_response))
return False
time.sleep(0.2)
return True
[docs]def cancel_acquisition_request(data_acq_client, request_id):
"""Cancels an acquisition request based on the request id
Args:
data_acq_client: DataAcquisition client for send the acquisition requests.
request_id: The id number for the AcquireData request to cancel.
Returns:
None.
"""
if not request_id:
# The incoming request id is invalid. No need to attempt to cancel the request or
# monitor the status.
return
try:
is_cancelled_response = data_acq_client.cancel_acquisition(request_id)
print("Status of the request to cancel the data-acquisition in progress: " +
data_acquisition_pb2.CancelAcquisitionResponse.Status.Name(
is_cancelled_response.status))
except ResponseError as err:
print("ResponseError raised when cancelling: " + str(err))
# Don't attempt to wait for the cancellation success status.
return
# Monitor the status of the cancellation to confirm it was successfully cancelled.
while True:
get_status_response = None
try:
get_status_response = data_acq_client.get_status(request_id)
except ResponseError as err:
print("Exception: " + str(err))
break
print("Request " + str(request_id) + " status: " +
data_acquisition_pb2.GetStatusResponse.Status.Name(get_status_response.status))
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_ACQUISITION_CANCELLED:
print("The request is fully cancelled.")
break
[docs]def clean_filename(filename):
"""Removes bad characters in a filename.
Args:
filename(string): Original filename to clean.
Returns:
Valid filename with removed characters :*?<>|
"""
return "".join(i for i in filename if i not in ":*?<>|")
[docs]def make_time_query_params(start_time_secs, end_time_secs, robot):
"""Create time-based query params for the download request.
Args:
start_time_secs(float): The start time for the download data range.
end_time_secs(float): The end time for the download range.
robot (Robot): The robot object, used to acquire timesync and convert the
times to robot time.
Returns:
The query params (data_acquisition_store_pb2.DataQueryParams) for the time-range download.
"""
from_timestamp = robot.time_sync.robot_timestamp_from_local_secs(start_time_secs)
to_timestamp = robot.time_sync.robot_timestamp_from_local_secs(end_time_secs)
print(from_timestamp.ToJsonString(), to_timestamp.ToJsonString())
query_params = data_acquisition_store_pb2.DataQueryParams(
time_range=data_acquisition_store_pb2.TimeRangeQuery(from_timestamp=from_timestamp,
to_timestamp=to_timestamp))
return query_params
[docs]def make_time_query_params_from_group_name(group_name, data_store_client):
"""Create time-based query params for the download request using the group name.
Args:
group_name(string): The group name for the data to be downloaded.
data_store_client(DataAcquisitionStoreClient): The data store client, used to get the
action ids for the group name.
Returns:
The query params (data_acquisition_store_pb2.DataQueryParams) for the time-range download.
"""
action_id = data_acquisition_pb2.CaptureActionId(group_name=group_name)
query_params = data_acquisition_store_pb2.DataQueryParams(
action_ids=data_acquisition_store_pb2.ActionIdQuery(action_ids=[action_id]))
saved_capture_actions = []
try:
saved_capture_actions = data_store_client.list_capture_actions(query_params)
except Exception as err:
_LOGGER.error("Failed to list the capture action ids for group_name %s: %s", group_name,
err)
return None
# Filter all the CaptureActionIds for the start/end time. These end times are already in
# the robots clock and do not need to be converted using timesync.
start_time = (None, None)
end_time = (None, None)
for action_id in saved_capture_actions:
timestamp = action_id.timestamp
time_secs = timestamp.seconds + timestamp.nanos / 1e9
if time_secs == 0:
# The plugin captures don't seem to set a timestamp, so ignore them when determining
# the start/end times for what to download.
continue
if start_time[0] is None or time_secs < start_time[0]:
start_time = (time_secs, timestamp)
if end_time[0] is None or time_secs > end_time[0]:
end_time = (time_secs, timestamp)
if not (start_time and end_time):
_LOGGER.error("Could not find a start/end time from the list of capture action ids: %s",
saved_capture_actions)
return None
# Ensure the timestamps are ordered correctly and the
assert start_time[0] <= end_time[0]
# Adjust the start/end time by a few seconds each to give buffer room.
start_time[1].seconds -= 3
end_time[1].seconds += 3
_LOGGER.info("Downloading data with a start time of %s seconds and end time of %s seconds.",
start_time[0], end_time[0])
# Make the download data request with a time query parameter.
query_params = data_acquisition_store_pb2.DataQueryParams(
time_range=data_acquisition_store_pb2.TimeRangeQuery(from_timestamp=start_time[1],
to_timestamp=end_time[1]))
return query_params
[docs]def download_data_REST(query_params, hostname, token, destination_folder='.',
additional_params=None):
"""Retrieve all data for a query from the DataBuffer REST API and write it to files.
Args:
query_params(bosdyn.api.DataQueryParams): Query parameters to use to retrieve metadata from
the DataStore service. Must be time-based query parameters only.
hostname(string): Hostname to specify in URL where the DataBuffer service is running.
token(string): User token to specify in https GET request for authentication.
destination_folder(string): Folder where to download the data.
additional_params(dict): Additional GET parameters to append to the URL.
Returns:
Boolean indicating if the data was downloaded successfully or not.
"""
try:
url = 'https://{}/v1/data-buffer/daq-data/'.format(hostname)
absolute_path = Path(destination_folder).absolute()
folder = Path(absolute_path.parent, clean_filename(absolute_path.name), 'REST')
folder.mkdir(parents=True, exist_ok=True)
headers = {"Authorization": "Bearer {}".format(token)}
get_params = additional_params or {}
if query_params.HasField('time_range'):
get_params.update({
'from_nsec': query_params.time_range.from_timestamp.ToNanoseconds(),
'to_nsec': query_params.time_range.to_timestamp.ToNanoseconds()
})
chunk_size = 10 * (1024**2) # This value is not guaranteed.
url = url + '?{}'.format(urlencode(get_params))
request = Request(url, headers=headers)
context = ssl._create_unverified_context()
with urlopen(request, context=context) as resp:
print("Download request HTTPS status code: %s" % resp.status)
# This is the default file name used to download data, updated from response.
if resp.status == 204:
print("No content available for the specified download time range (in seconds): "
"[%d, %d]" % (query_params.time_range.from_timestamp.ToNanoseconds() / 1.0e9,
query_params.time_range.to_timestamp.ToNanoseconds() / 1.0e9))
return False
download_file = Path(folder, "download.zip")
content = resp.headers['Content-Disposition']
if not content or len(content) < 2:
print("ERROR: Content-Disposition is not set correctly")
return False
else:
start_ind = content.find('\"')
if start_ind == -1:
print("ERROR: Content-Disposition does not have a \"")
return False
else:
start_ind += 1
download_file = Path(folder, clean_filename(content[start_ind:-1]))
with open(str(download_file), 'wb') as fid:
while True:
chunk = resp.read(chunk_size)
if len(chunk) == 0:
break
print('.', end='', flush=True)
fid.write(chunk)
except URLError as rest_error:
print("REST Exception:\n")
print(rest_error)
return False
except IOError as io_error:
print("IO Exception:\n")
print(io_error)
return False
# Data downloaded and saved to local disc successfully.
return True