Source code for bosdyn.client.sdk

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Sdk is  a repository for settings typically common to a single developer and/or robot fleet."""

import datetime
import glob
import logging
import os
import platform
from enum import Enum

import jwt
import pkg_resources
from deprecated.sphinx import deprecated

from .arm_surface_contact import ArmSurfaceContactClient
from .auth import AuthClient
from .auto_return import AutoReturnClient
from .autowalk import AutowalkClient
from .channel import DEFAULT_MAX_MESSAGE_LENGTH
from .data_acquisition import DataAcquisitionClient
from .data_acquisition_store import DataAcquisitionStoreClient
from .data_buffer import DataBufferClient
from .data_service import DataServiceClient
from .directory import DirectoryClient
from .directory_registration import DirectoryRegistrationClient
from .docking import DockingClient
from .door import DoorClient
from .estop import EstopClient
from .exceptions import Error
from .fault import FaultClient
from .gps.aggregator_client import AggregatorClient
from .gps.registration_client import RegistrationClient
from .graph_nav import GraphNavClient
from .gripper_camera_param import GripperCameraParamClient
from .image import ImageClient
from .inverse_kinematics import InverseKinematicsClient
from .ir_enable_disable import IREnableDisableServiceClient
from .keepalive import KeepaliveClient
from .lease import LeaseClient
from .license import LicenseClient
from .local_grid import LocalGridClient
from .log_status import LogStatusClient
from .manipulation_api_client import ManipulationApiClient
from .map_processing import MapProcessingServiceClient
from .metrics_logging import MetricsLoggingClient
from .network_compute_bridge_client import NetworkComputeBridgeClient
from .payload import PayloadClient
from .payload_registration import PayloadRegistrationClient
from .point_cloud import PointCloudClient
from .power import PowerClient
from .processors import AddRequestHeader
from .ray_cast import RayCastClient
from .recording import GraphNavRecordingServiceClient
from .robot import Robot
from .robot_command import RobotCommandClient
from .robot_id import RobotIdClient
from .robot_state import RobotStateClient
from .spot_check import SpotCheckClient
from .time_sync import TimeSyncClient
from .world_object import WorldObjectClient



[docs]class SdkError(Error): """General class of errors to handle non-response non-rpc errors."""
[docs]class UnsetAppTokenError(SdkError): """Path to app token not set."""
[docs]class UnableToLoadAppTokenError(SdkError): """Cannot load the provided app token path."""
_LOGGER = logging.getLogger(__name__) BOSDYN_RESOURCE_ROOT = os.environ.get('BOSDYN_RESOURCE_ROOT', os.path.join(os.path.expanduser('~'), '.bosdyn'))
[docs]def generate_client_name(prefix=''): """Returns a descriptive client name for API clients with an optional prefix.""" import bosdyn.client.__main__ try: process_info = '{}-{}'.format(os.path.basename(bosdyn.client.__main__.__file__), os.getpid()) except AttributeError: process_info = '{}'.format(os.getpid()) machine_name = platform.node() if not machine_name: import getpass try: user_name = getpass.getuser() # pylint: disable=broad-except except Exception: _LOGGER.warning('Could not get username') user_name = '<unknown host>' # Use the name of the host if available, username otherwise. return '{}{}:{}'.format(prefix, machine_name or user_name, process_info)
_DEFAULT_SERVICE_CLIENTS = [ AggregatorClient, ArmSurfaceContactClient, AuthClient, AutoReturnClient, AutowalkClient, DataAcquisitionClient, DataAcquisitionStoreClient, DataBufferClient, DataServiceClient, DirectoryClient, DirectoryRegistrationClient, DockingClient, DoorClient, EstopClient, FaultClient, GraphNavClient, GraphNavRecordingServiceClient, GripperCameraParamClient, ImageClient, IREnableDisableServiceClient, LeaseClient, KeepaliveClient, LicenseClient, LogStatusClient, LocalGridClient, ManipulationApiClient, MapProcessingServiceClient, NetworkComputeBridgeClient, PayloadClient, PayloadRegistrationClient, PointCloudClient, PowerClient, RayCastClient, RegistrationClient, RobotCommandClient, RobotIdClient, RobotStateClient, SpotCheckClient, InverseKinematicsClient, TimeSyncClient, WorldObjectClient, ]
[docs]def create_standard_sdk(client_name_prefix, service_clients=None, cert_resource_glob=None): """Return an Sdk with the most common configuration. Args: client_name_prefix: prefix to pass to generate_client_name() service_clients: List of service client classes to register in addition to the defaults. cert_resource_glob: Glob expression matching robot certificate(s). Default None to use distributed certificate. Raises: IOError: Robot cert could not be loaded. """ _LOGGER.debug('Creating standard Sdk, cert glob: "%s"', cert_resource_glob) sdk = Sdk(name=client_name_prefix) client_name = generate_client_name(client_name_prefix) sdk.load_robot_cert(cert_resource_glob) sdk.request_processors.append(AddRequestHeader(lambda: client_name)) all_service_clients = _DEFAULT_SERVICE_CLIENTS if service_clients: all_service_clients += service_clients for client in all_service_clients: sdk.register_service_client(client) return sdk
[docs]class Sdk(object): """Repository for settings typically common to a single developer and/or robot fleet. See also Robot for robot-specific settings. Args: name: Name to identify the client when communicating with the robot. """ def __init__(self, name=None): self.cert = None self.client_name = name self.logger = logging.getLogger(name or 'bosdyn.Sdk') self.request_processors = [] self.response_processors = [] self.service_client_factories_by_type = {} self.service_type_by_name = {} # Robots created by this Sdk, keyed by address. self.robots = {} # Set default max message length for sending and receiving. These values are used when # creating channels in the bosdyn.client.Robot class. self.max_send_message_length = DEFAULT_MAX_MESSAGE_LENGTH self.max_receive_message_length = DEFAULT_MAX_MESSAGE_LENGTH # ThreadPoolExecutor instance for asynchronous streaming calls. self.executor = None
[docs] def create_robot( self, address, name=None ): """Get a Robot initialized with this Sdk, creating it if it does not yet exist. Args: address: Network-resolvable address of the robot, e.g. '192.168.80.3' name: A unique identifier for the robot, e.g. 'My First Robot'. Default None to use the address as the name. Returns: A Robot initialized with the current Sdk settings. """ if address in self.robots: return self.robots[address] robot = Robot( name=name or address ) robot.address = address robot.update_from(self) self.robots[address] = robot return robot
[docs] def set_max_message_length(self, max_message_length): """Updates the send and receive max message length values in all the clients/channels created from this point on. Args: max_message_length(int) : Max message length value to use for sending and receiving messages. Returns: None. """ self.max_send_message_length = max_message_length self.max_receive_message_length = max_message_length
[docs] def register_service_client(self, creation_func, service_type=None, service_name=None): """Tell the Sdk how to create a specific type of service client. Args: creation_func: Callable that returns a client. Typically just the class. service_type: Type of the service. If None (default), will try to get the name from creation_func. service_name: Name of the service. If None (default), will try to get the name from creation_func. """ service_name = service_name or creation_func.default_service_name service_type = service_type or creation_func.service_type # Some services won't have a default service name at all. # They will have to get information from the directory. if service_name is not None: self.service_type_by_name[service_name] = service_type self.service_client_factories_by_type[service_type] = creation_func
[docs] def load_robot_cert(self, resource_path_glob=None): """Load the SSL certificate for the robot. Args: resource_path_glob: Optional path to certificate resource(s). If None, will load the certificate in the 'resources' package. Otherwise, should be a glob expression to match certificates. Defaults to None. Raises: IOError: Robot cert could not be loaded. """ self.cert = None if resource_path_glob is None: self.cert = pkg_resources.resource_stream('bosdyn.client.resources', 'robot.pem').read() else: cert_paths = [c for c in glob.glob(resource_path_glob) if os.path.isfile(c)] if not cert_paths: raise IOError('No files matched "{}"'.format(resource_path_glob)) self.cert = bytes() for cert_path in cert_paths: with open(cert_path, 'rb') as cert_file: self.cert += cert_file.read()
[docs] def clear_robots(self): """Remove all cached Robot instances. Subsequent calls to create_robot() will return newly created Robots. Existing robot instances will continue to work, but their time sync and token refresh threads will be stopped. """ for robot in self.robots.values(): robot._shutdown() self.robots = {}
[docs]@deprecated( reason='Decoding tokens is no longer supported in the sdk. Use pyjwt directly instead.', version='3.0.1', action="always") def decode_token(token): """Decodes a JWT token without verification. Args: token: A string representing a token. Returns: Dictionary containing information about the token. Empty dictionary if failed to load token. Raises: UnableToLoadAppTokenError: If the token cannot be read. """ try: values = jwt.decode(token, options={"verify_signature": False}) return values except Exception as exc: raise UnableToLoadAppTokenError('Incorrectly formatted token {} --- {}'.format(token, exc))
[docs]@deprecated( reason='Decoding tokens is no longer supported in the sdk. Use pyjwt directly instead.', version='3.0.1', action="always") def log_token_time_remaining(token): """Log the time remaining until app token expires. Arguments: token: A jwt token Raises: UnableToLoadAppTokenError: If the token expiration information cannot be retrieved. """ token_values = decode_token(token) if 'exp' not in token_values: raise UnableToLoadAppTokenError("Unknown token expiration") # Log time to expiration, with varying levels based on nearness. expire_time = datetime.datetime.fromtimestamp(token_values['exp']) time_to_expiration = expire_time - datetime.datetime.utcnow() if time_to_expiration < datetime.timedelta(seconds=0): _LOGGER.error('Your application token has expired. Please contact ' 'support@bostondynamics.com to request a robot license as application ' 'tokens have been deprecated.') elif time_to_expiration <= datetime.timedelta(days=30): _LOGGER.warning( 'Application token expires in %s days on %s. Please contact ' 'support@bostondynamics.com to request a lease as application tokens ' 'have been deprecated.', time_to_expiration.days, datetime.datetime.strftime(expire_time, '%Y/%m/%d')) else: _LOGGER.debug('Application token expires on %s', datetime.datetime.strftime(expire_time, '%Y/%m/%d'))