Source code for bosdyn.client.channel

# Copyright (c) 2020 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import logging

import grpc

from .exceptions import (RpcError, ClientCancelledOperationError, InvalidAppTokenError,
                         InvalidClientCertificateError, NonexistentAuthorityError, NotFoundError,
                         PermissionDeniedError, ProxyConnectionError, ResponseTooLargeError,
                         ServiceFailedDuringExecutionError, ServiceUnavailableError, TimedOutError,
                         UnableToConnectToRobotError, UnauthenticatedError, UnknownDnsNameError,
                         UnimplementedError, TransientFailureError)

TransportError = grpc.RpcError

_LOGGER = logging.getLogger(__name__)


[docs]class RefreshingAccessTokenAuthMetadataPlugin(grpc.AuthMetadataPlugin): """Plugin to refresh access token. Args: token_cb: Callable that returns a tuple of (app_token, user_token) add_app_token (bool): Whether to include an app token in the metadata. This is necessary for compatibility with old robot software. """ def __init__(self, token_cb, add_app_token): self._token_cb = token_cb self._add_app_token = add_app_token def __call__(self, context, callback): app_token, user_token = self._token_cb() user_token_metadata = ('authorization', 'Bearer {}'.format(user_token)) app_token_metadata = ('x-bosdyn-apptoken', app_token) if self._add_app_token: metadata = (user_token_metadata, app_token_metadata) else: metadata = (user_token_metadata,) error = None callback(metadata, error)
[docs]def create_secure_channel_creds(cert, token_cb, add_app_token): """Returns credentials for establishing a secure channel. Uses previously set values on the linked Sdk and self. """ transport_creds = grpc.ssl_channel_credentials(root_certificates=cert) plugin = RefreshingAccessTokenAuthMetadataPlugin(token_cb, add_app_token=add_app_token) # Encrypted connections carry either just transport credentials sufficient to # establish TLS or both transport and authorization credentials. The auth # credentials provide the token with every GRPC call on this channel. auth_creds = grpc.metadata_call_credentials(plugin) return grpc.composite_channel_credentials(transport_creds, auth_creds)
[docs]def create_secure_channel(address, port, creds, authority): """Create a secure channel to given host:port. Args: address: Connection host address. port: Connection port. creds: A ChannelCredentials instance. authority: Authority option for the channel. Returns: A secure channel. """ socket = '{}:{}'.format(address, port) options = (('grpc.ssl_target_name_override', authority),) return grpc.secure_channel(socket, creds, options)
[docs]def create_insecure_channel(address, port, authority=None): """Create an insecure channel to given host and port. This method is only used for testing purposes. Applications must use secure channels to communicate with services running on Spot. Args: address: Connection host address. port: Connection port. authority: Authority option for the channel. Returns: An insecure channel. """ socket = '{}:{}'.format(address, port) options = None if authority: options = (('grpc.ssl_target_name_override', authority),) return grpc.insecure_channel(socket, options=options)
[docs]def translate_exception(rpc_error): """Translated a GRPC error into an SDK RpcError. Args: rpc_error: RPC error to translate. Returns: Specific sub-type of RpcError. """ code = rpc_error.code() details = rpc_error.details() if code is grpc.StatusCode.CANCELLED: if str(401) in details: return UnauthenticatedError(rpc_error, UnauthenticatedError.__doc__) elif str(403) in details: return InvalidAppTokenError(rpc_error, InvalidAppTokenError.__doc__) elif str(404) in details: return NotFoundError(rpc_error, NotFoundError.__doc__) elif str(502) in details: return ServiceUnavailableError(rpc_error, ServiceUnavailableError.__doc__) elif str(504) in details: return TimedOutError(rpc_error, TimedOutError.__doc__) return ClientCancelledOperationError(rpc_error, ClientCancelledOperationError.__doc__) elif code is grpc.StatusCode.DEADLINE_EXCEEDED: return TimedOutError(rpc_error, TimedOutError.__doc__) elif code is grpc.StatusCode.UNIMPLEMENTED: return UnimplementedError(rpc_error, UnimplementedError.__doc__) elif code is grpc.StatusCode.PERMISSION_DENIED: return PermissionDeniedError(rpc_error, PermissionDeniedError.__doc__) elif code is grpc.StatusCode.RESOURCE_EXHAUSTED: if "Received message larger than max" in details: return ResponseTooLargeError(rpc_error, ResponseTooLargeError.__doc__) debug = rpc_error.debug_error_string() if 'is not in peer certificate' in debug: return NonexistentAuthorityError(rpc_error, NonexistentAuthorityError.__doc__) elif 'Failed to connect to remote host' in debug or 'Failed to create subchannel' in debug: return ProxyConnectionError(rpc_error, ProxyConnectionError.__doc__) elif 'Exception calling application' in debug: return ServiceFailedDuringExecutionError(rpc_error, ServiceFailedDuringExecutionError.__doc__) elif 'Handshake failed' in debug: return InvalidClientCertificateError(rpc_error, InvalidClientCertificateError.__doc__) elif 'Name resolution failure' in debug: return UnknownDnsNameError(rpc_error, UnknownDnsNameError.__doc__) elif 'channel is in state TRANSIENT_FAILURE' in debug: return TransientFailureError(rpc_error, TransientFailureError.__doc__) elif 'Connect Failed' in debug: # This error should be checked last because a lot of grpc errors contain said substring. return UnableToConnectToRobotError(rpc_error, UnableToConnectToRobotError.__doc__) # Handle arbitrary UNAVAILABLE cases if code is grpc.StatusCode.UNAVAILABLE: return UnableToConnectToRobotError(rpc_error, UnableToConnectToRobotError.__doc__) _LOGGER.warning('Unclassified exception: %s', rpc_error) return RpcError(rpc_error, RpcError.__doc__)