Source code for bosdyn.client.access_controlled_door_util

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import json
from pathlib import Path
from string import Template

import requests

from bosdyn.client.url_validation_util import safe_api_call

API_TIMEOUT_DEFAULT = 30


[docs]def file_to_json(file_path): """ Helper to read and parse JSON files. Args: file_path (Path): Path to JSON file. Returns: dict: Parsed JSON data. """ return json.loads(file_path.read_text())
# Functions to handle api calls to access control systems
[docs]def door_action(api_calls, door_id, action, path_to_cert=None, is_robot=True): """Executes a sequence of API calls required to perform an action (e.g., open or close) on a specified door, handling data substitutions and certificate verification as needed. api_calls (list of dict): List of API call specifications, where each dict contains information such as 'method', 'url', 'action', 'sni_hostname', 'route', 'request_data', and 'responses'. door_id (str): Identifier of the door to perform the action on. action (list of str): The action(s) to perform (e.g., "open", "close"). Only calls matching the specified action(s) will be executed. path_to_cert (str, optional): Path to a certificate file for SSL verification. If None, no certificate is used. is_robot (bool, optional): Indicates if the API calls are being made on behalf of a robot. Defaults to True. Returns: dict: An error message dictionary containing details about any error encountered during the API calls. If all calls succeed, returns an empty dictionary. """ error_msg = {} cross_call_substitutions = {"door_id": door_id} for call_data in api_calls: if call_data.get("action") not in action: continue method = call_data.get("method") url = call_data.get("url") sni_hostname = call_data.get("sni_hostname") route = call_data.get("route") request_data = call_data.get("request_data", {}) responses = call_data.get("responses", {}) call_action = call_data.get("action", "") if not method or not url: error_msg["extra_message"] = "API call error: missing method or url" break # Format URL and data - note: substitutions are case-INsensitive try: # This will only replace instances of values formatted like $variable or ${variable} url = Template(url).safe_substitute(cross_call_substitutions) templated_request_data = {} for k, v in request_data.items(): if isinstance(v, dict): # Handle nested dicts, like headers templated_request_data[k] = { k2: Template(v2).safe_substitute(cross_call_substitutions) if isinstance( v2, str) else v2 for k2, v2 in v.items() } elif isinstance(v, str): # Handle top level string values templated_request_data[k] = Template(v).safe_substitute( cross_call_substitutions) else: templated_request_data[k] = v # Check if cert file was included at the configuration, if yes use it if path_to_cert: templated_request_data["verify"] = path_to_cert except Exception as e: print(f"{e} has no value, couldn't make substitution") result_of_call, api_error_msg = make_access_control_system_api_call( method, url, templated_request_data, responses, sni_hostname=sni_hostname, is_robot=is_robot, route=route) if api_error_msg: error_msg["action"] = call_action error_msg["api_error"] = api_error_msg break # If the call returned data, store it for subsequent calls if result_of_call: try: for result in result_of_call: tag = result[0] content = result[1] cross_call_substitutions[tag] = content except Exception as e: print(f"API call error: couldn't read data, got exception {e}") error_msg["extra_message"] = f"API call error: couldn't read data, got exception" break return error_msg
[docs]def make_access_control_system_api_call(method, url, request_data, store_responses=None, sni_hostname=None, is_robot=True, route=None): """Makes an HTTP request and optionally extracts specific fields from the JSON response. Args: method (str): HTTP method to use (e.g., 'GET', 'POST') url (str): The endpoint URL for the API call request_data (dict): Request configuration including headers, body, etc. store_responses (Optional[Dict[str, str]]): Dictionary mapping response field names to JSON paths. For example: { "token": "auth.token", # Store response's auth.token as "token" "session_id": "data.session" # Store response's data.session as "session_id" } If None, no data will be extracted from the response. sni_hostname (str|None): If specified, this parameter provides the hostname declared by and expected by the access control server during TLS negotiation. This should only be required if the server's hostname is not resolvable via DNS. route (str|None): Route type to use ("WIFI", "LTE"). If None, default interface (WIFI) will be used. Returns: Tuple[Optional[List[Tuple[str, Any]]], Optional[Dict]]: - First element: If store_responses was provided and matching data was found, returns list of tuples [(field_name, value), ...]. None otherwise. - Second element: If error occurred, returns dict with error details: { 'status_code': int, 'reason': str, 'elapsed': float } None if successful. Example: >>> store_responses = {"auth_token": "data.token"} >>> data, error = make_access_control_system_api_call("POST", "https://api.door/auth", ... {"json": {"key": "value"}}, ... store_responses) >>> if data: ... # data might be [("auth_token", "abc123")] ... token = dict(data)["auth_token"] """ try: response, status_message = safe_api_call(method, url, sni_hostname, timeout=API_TIMEOUT_DEFAULT, is_robot=is_robot, interface=route, **request_data) if response is None: # There was an error during the API call return None, status_message if response.status_code == 200: if store_responses: # Only try to parse JSON and extract data if we need to store responses data_json = response.json() return [(tag, get_value_by_path(data_json, path)) for tag, path in store_responses.items() if get_value_by_path(data_json, path) is not None], None # If we don't need to store responses, just return success return None, None # If the status code isn't 200, something went wrong return None, { 'status_code': response.status_code, 'reason': response.reason, 'elapsed': response.elapsed.total_seconds(), } except requests.exceptions.RequestException as e: print(f"API call failed: {str(e)}") return None, { 'status_code': getattr(e.response, 'status_code', None), 'reason': getattr(e.response, 'reason', str(e)), 'elapsed': getattr(e.response, 'elapsed', None) }
[docs]def get_value_by_path(data_json, path_to_info): """Takes in a json representing the data of a call response, and a string representing the path through the json to the desired data. Traverses the json and returns the data. """ keys = path_to_info.split('.') result = data_json for key in keys: result = result.get(key, None) if result is not None: return result else: return False