# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Utility functions for bosdyn-orbit"""
import datetime
import hashlib
import hmac
import json
import os
import secrets
import shutil
import sys
import time
from typing import Dict, List
from bosdyn.orbit.exceptions import WebhookSignatureVerificationError
API_TOKEN_ENV_VAR = "BOSDYN_ORBIT_CLIENT_API_TOKEN"
DEFAULT_MAX_MESSAGE_AGE_MS = 5 * 60 * 1000
[docs]def get_api_token() -> str:
""" Obtains an API token from either an environment variable or terminal input
Returns
api_token: the API token obtained from the instance
"""
api_token = os.environ.get(API_TOKEN_ENV_VAR)
if not api_token:
if sys.stdin.isatty():
print('API Token: ', end='', file=sys.stderr)
api_token = input()
return api_token
[docs]def get_latest_created_at_for_run_events(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> datetime.datetime:
""" Given a dictionary of query params, returns the max created at time for run events
Args:
client: the client for the web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
The max created at time for run events in datetime
"""
base_params = {'limit': 1, 'orderBy': '-created_at'}
base_params.update(params)
latest_resource = client.get_run_events(params=base_params).json()
if not latest_resource["resources"]:
client_timestamp_response = client.get_system_time()
ms_since_epoch = int(client_timestamp_response.json()["msSinceEpoch"])
return datetime.datetime.utcfromtimestamp(ms_since_epoch / 1000)
return datetime_from_isostring(latest_resource["resources"][0]["createdAt"])
[docs]def get_latest_run_capture_resources(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> List:
""" Given a dictionary of query params, returns the latest run capture resources in json format
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
A list of resources obtained from a RESTful endpoint
"""
base_params = {'orderBy': '-created_at'}
base_params.update(params)
run_captures = client.get_run_captures(params=base_params).json()
return run_captures["resources"]
[docs]def get_latest_created_at_for_run_captures(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> datetime.datetime:
""" Given a dictionary of query params, returns the max created at time for run captures
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
The max created at time for run captures in datetime
"""
base_params = {'limit': 1, 'orderBy': '-created_at'}
base_params.update(params)
latest_resource = client.get_run_captures(params=base_params).json()
if not latest_resource["resources"]:
client_timestamp_response = client.get_system_time()
ms_since_epoch = int(client_timestamp_response.json()["msSinceEpoch"])
return datetime.datetime.utcfromtimestamp(ms_since_epoch / 1000)
return datetime_from_isostring(latest_resource["resources"][0]["createdAt"])
[docs]def get_latest_run_resource(client: 'bosdyn.orbit.client.Client', params: Dict = {}) -> List:
""" Given a dictionary of query params, returns the latest run resource in json format
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
A list corresponding to a run resource obtained from a RESTful endpoint in json
"""
base_params = {'limit': 1, 'orderBy': 'newest'}
base_params.update(params)
latest_run_json = client.get_runs(params=base_params).json()
if not latest_run_json['resources']:
return None
return latest_run_json['resources'][0]
[docs]def get_latest_run_in_progress(client: 'bosdyn.orbit.client.Client', params: Dict = {}) -> List:
""" Given a dictionary of query params, returns the latest running resource in json format
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
A list corresponding to a run obtained from a RESTful endpoint in json
"""
base_params = {'orderBy': 'newest'}
base_params.update(params)
latest_resources = client.get_runs(params=base_params).json()["resources"]
for resource in latest_resources:
if resource["missionStatus"] not in [
"SUCCESS", "FAILURE", "ERROR", "STOPPED", "NONE", "UNKNOWN"
]:
return resource
return None
[docs]def get_latest_end_time_for_runs(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> datetime.datetime:
""" Given a dictionary of query params, returns the max end time for runs
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
The max end time for runs in datetime
"""
base_params = {'limit': 1, 'orderBy': 'newest'}
base_params.update(params)
latest_resource = client.get_runs(params=base_params).json()
if latest_resource.get("resources"):
latest_end_time = latest_resource.get("resources")[0]["endTime"]
if latest_end_time:
return datetime_from_isostring(latest_end_time)
client_timestamp_response = client.get_system_time()
ms_since_epoch = int(client_timestamp_response.json()["msSinceEpoch"])
return datetime.datetime.utcfromtimestamp(ms_since_epoch / 1000)
[docs]def write_image(img_raw, image_fp: str) -> None:
""" Given a raw image and a desired output directory, writes the image to a file
Args:
img_raw(Raw image object): the input raw image
image_fp: the output filepath for the image
"""
os.makedirs(os.path.dirname(image_fp), exist_ok=True)
with open(image_fp, 'wb') as out_file:
shutil.copyfileobj(img_raw, out_file)
[docs]def data_capture_urls_from_run_events(client: 'bosdyn.orbit.client.Client', run_events: List,
list_of_channel_names: List = None) -> List:
""" Given run events and list of desired channel names, returns the list of data capture urls
Args:
client: the client for Orbit web API
run_events: a json representation of run events obtained from a RESTful endpoint
list_of_channel_names: a list of channel names associated with the desired data captures.
Defaults to None which returns all the available channels.
Returns:
data_urls: a list of urls
"""
all_run_events_resources = run_events["resources"]
data_urls = []
for resource in all_run_events_resources:
all_data_captures = resource["dataCaptures"]
for data_capture in all_data_captures:
if list_of_channel_names is None:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
elif data_capture["channelName"] in list_of_channel_names:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
return data_urls
[docs]def data_capture_url_from_run_capture_resources(client: 'bosdyn.orbit.client.Client',
run_capture_resources: List,
list_of_channel_names: List = None) -> List:
""" Given run capture resources and list of desired channel names, returns the list of data capture urls
Args:
client: the client for Orbit web API
run_capture_resources: a list of resources obtained from a RESTful endpoint
list_of_channel_names: a list of channel names associated with the desired data captures.
Defaults to None which returns all the available channels.
Returns:
data_urls: a list of urls
"""
data_urls = []
for data_capture in run_capture_resources:
if list_of_channel_names is None:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
elif data_capture["channelName"] in list_of_channel_names:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
return data_urls
[docs]def get_action_names_from_run_events(run_events: Dict) -> List:
""" Given run events, returns a list of action names
Args:
run_events: a representation of run events obtained from a RESTful endpoint
Returns:
action_names: a list of action names
"""
all_run_events_resources = run_events["resources"]
action_names = []
for resource in all_run_events_resources:
action_names.append(resource["actionName"])
return action_names
[docs]def datetime_from_isostring(datetime_isostring: str) -> datetime.datetime:
""" Returns the datetime representation of the iso string representation of time
Args:
datetime_isostring: the iso string representation of time
Returns:
The datetime representation of the iso string representation of time
"""
if "Z" in datetime_isostring:
return datetime.datetime.strptime(datetime_isostring, "%Y-%m-%dT%H:%M:%S.%fZ")
if "+" in datetime_isostring:
return datetime.datetime.strptime(datetime_isostring[0:datetime_isostring.index("+")],
"%Y-%m-%dT%H:%M:%S.%f")
[docs]def validate_webhook_payload(payload: Dict, signature_header: str, secret: str,
max_age_ms: int = DEFAULT_MAX_MESSAGE_AGE_MS) -> None:
""" Verifies that the webhook payload came from
Args:
payload: the JSON body of the webhooks req
signature_header: the value of the signature header
secret: the configured secret value for this webhook
max_age_ms: the maximum age of the message before it's considered invalid (default is 5 minutes)
Raises:
bosdyn.orbit.exceptions.WebhookSignatureVerificationError: thrown if the webhook signature is invalid
"""
if not signature_header:
raise WebhookSignatureVerificationError("Signature header cannot be empty")
header_components = dict(entry.split('=') for entry in signature_header.split(','))
send_time = header_components.get('t')
send_time_ms = int(send_time) if send_time is not None and send_time.isdigit() else None
received_hmac = header_components.get('v1')
if not send_time_ms or not received_hmac:
raise WebhookSignatureVerificationError(
"Missing either send time or HMAC in signature header")
current_time_ns = time.time()
current_time_ms = round(current_time_ns) * 1000
time_diff_ms = current_time_ms - send_time_ms
if time_diff_ms > max_age_ms:
raise WebhookSignatureVerificationError(
f"The payload is {time_diff_ms}ms old, which is greater than the maximum age {max_age_ms}ms"
)
full_payload_string = f'{send_time}.{json.dumps(payload, separators=(",",":"))}'
calculated_hmac = hmac.new(bytes.fromhex(secret), full_payload_string.encode('utf-8'),
hashlib.sha256).hexdigest()
time_safe_equal = secrets.compare_digest(received_hmac, calculated_hmac)
if not time_safe_equal:
raise WebhookSignatureVerificationError(
"The received HMAC did not match the expected value")
[docs]def print_json_response(response: 'requests.Response') -> bool:
""" A helper function to print the json response.
Args:
options(Namespace) : parsed args used for configuration options.
Returns:
Boolean that indicates the response is okay with status code 200.
"""
if response.ok:
try:
json_data = response.json()
print(f"JSON Response: {json_data}")
return True
except ValueError:
print("Response is ok but not in JSON format.")
return False
else:
print(f"Request failed with status : {response.text}")
return False