Source code for bosdyn.client.estop

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""For clients to the emergency stop (estop) service."""

import collections
import ctypes
import enum
import logging
import os
import queue
import threading
import time

from google.protobuf.duration_pb2 import Duration

from bosdyn.api import estop_pb2, estop_service_pb2_grpc

from .common import (BaseClient, common_header_errors, error_factory, handle_common_header_errors,
                     handle_unset_status_error)
from .exceptions import Error, ResponseError, RpcError, TimedOutError


[docs]class EstopResponseError(ResponseError): """General class of errors for Estop service."""
[docs]class EndpointUnknownError(EstopResponseError): """The endpoint specified in the request is not registered."""
[docs]class IncorrectChallengeResponseError(EstopResponseError): """The challenge and/or response was incorrect."""
[docs]class EndpointMismatchError(EstopResponseError): """Target endpoint did not match."""
[docs]class ConfigMismatchError(EstopResponseError): """Registered to the wrong configuration."""
[docs]class InvalidEndpointError(EstopResponseError): """New endpoint was invalid."""
[docs]class InvalidIdError(EstopResponseError): """Tried to replace a EstopConfig, but provided bad ID."""
[docs]class MotorsOnError(EstopResponseError): """The operation is not allowed while motors are on."""
StopLevel = enum.IntEnum('StopLevel', estop_pb2.EstopStopLevel.items())
[docs]class EstopClient(BaseClient): """Client to the estop service.""" default_service_name = 'estop' service_type = 'bosdyn.api.EstopService' def __init__(self, name="EstopClient - PID: " + str(os.getpid())): super(EstopClient, self).__init__(estop_service_pb2_grpc.EstopServiceStub, name=name)
[docs] def register(self, target_config_id, endpoint, **kwargs): """Register the endpoint in the target configuration. Args: target_config_id: The identification of the current configuration on the robot. endpoint: Estop endpoint. kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: estop_pb2.EstopEndpoint that has been registered. """ req = self._build_register_request(target_config_id, endpoint) return self.call(self._stub.RegisterEstopEndpoint, req, _new_endpoint_from_register_response, _register_endpoint_error_from_response, copy_request=False, **kwargs)
[docs] def register_async(self, target_config_id, endpoint, **kwargs): """Async version of register()""" req = self._build_register_request(target_config_id, endpoint) return self.call_async(self._stub.RegisterEstopEndpoint, req, _new_endpoint_from_register_response, _register_endpoint_error_from_response, copy_request=False, **kwargs)
[docs] def deregister(self, target_config_id, endpoint, **kwargs): """Deregister the endpoint in the target configuration. Args: target_config_id: The identification of the current configuration on the robot. endpoint: Estop endpoint. kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. """ req = self._build_deregister_request(target_config_id, endpoint) self.call(self._stub.DeregisterEstopEndpoint, req, None, _deregister_endpoint_error_from_response, copy_request=False, **kwargs)
[docs] def deregister_async(self, target_config_id, endpoint, **kwargs): """Async version of deregister()""" req = self._build_deregister_request(target_config_id, endpoint) return self.call_async(self._stub.DeregisterEstopEndpoint, req, None, _deregister_endpoint_error_from_response, copy_request=False, **kwargs)
[docs] def get_config(self, **kwargs): """Return the estop configuration of the robot. Args: kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: estop_pb2.EstopConfig the robot is currently using. """ return self.call(self._stub.GetEstopConfig, estop_pb2.GetEstopConfigRequest(), _active_config_from_config_response, common_header_errors, copy_request=False, **kwargs)
[docs] def get_config_async(self, **kwargs): """Async version of get_config()""" return self.call_async(self._stub.GetEstopConfig, estop_pb2.GetEstopConfigRequest(), _active_config_from_config_response, common_header_errors, copy_request=False, **kwargs)
[docs] def set_config(self, config, target_config_id, **kwargs): """Change the estop configuration of the robot. Args: config: New configuration to set. target_config_id: The identification of the current configuration on the robot. kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: estop_pb2.EstopConfig the robot is currently using. """ req = estop_pb2.SetEstopConfigRequest(config=config, target_config_id=target_config_id) return self.call(self._stub.SetEstopConfig, req, _active_config_from_config_response, _set_config_error_from_response, copy_request=False, **kwargs)
[docs] def set_config_async(self, config, target_config_id, **kwargs): """Async version of set_config()""" req = estop_pb2.SetEstopConfigRequest(config=config, target_config_id=target_config_id) return self.call_async(self._stub.SetEstopConfig, req, _active_config_from_config_response, _set_config_error_from_response, copy_request=False, **kwargs)
[docs] def get_status(self, **kwargs): """Return the estop status of the robot. Args: kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: estop_pb2.EstopSystemStatus from the server. """ return self.call(self._stub.GetEstopSystemStatus, estop_pb2.GetEstopSystemStatusRequest(), _estop_sys_status_from_response, common_header_errors, **kwargs)
[docs] def get_status_async(self, **kwargs): """Async version of get_status()""" return self.call_async(self._stub.GetEstopSystemStatus, estop_pb2.GetEstopSystemStatusRequest(), _estop_sys_status_from_response, common_header_errors, **kwargs)
[docs] def check_in(self, stop_level, endpoint, challenge, response, suppress_incorrect=False, **kwargs): """Check in with the estop system. Args: stop_level: Integer / enum representing desired stop level. See StopLevel enum. endpoint: The endpoint asserting the stop level. challenge: A previously received challenge from the server. response: A response to the 'challenge' argument. suppress_incorrect: Set True to prevent an IncorrectChallengeResponseError from being raised when STATUS_INVALID is returned. Useful for the first check-in, before a challenge has been sent by the server. kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: A new challenge from the server. """ req = self._build_check_in_request(stop_level, endpoint, challenge, response) err_from_resp = self._choose_check_in_err_func(suppress_incorrect=suppress_incorrect) return self.call(self._stub.EstopCheckIn, req, _challenge_from_check_in_response, err_from_resp, copy_request=False, **kwargs)
[docs] def check_in_async(self, stop_level, endpoint, challenge, response, suppress_incorrect=False, **kwargs): """Async version of check_in()""" req = self._build_check_in_request(stop_level, endpoint, challenge, response) err_from_resp = self._choose_check_in_err_func(suppress_incorrect=suppress_incorrect) return self.call_async(self._stub.EstopCheckIn, req, _challenge_from_check_in_response, err_from_resp, copy_request=False, **kwargs)
@staticmethod def _build_check_in_request(stop_level, endpoint, challenge, response): if isinstance(endpoint, EstopEndpoint): endpoint = endpoint.to_proto() return estop_pb2.EstopCheckInRequest(stop_level=stop_level, endpoint=endpoint, challenge=challenge, response=response) @staticmethod def _build_register_request(target_config_id, endpoint): if isinstance(endpoint, EstopEndpoint): endpoint = endpoint.to_proto() req = estop_pb2.RegisterEstopEndpointRequest(target_config_id=target_config_id, new_endpoint=endpoint) req.target_endpoint.role = endpoint.role return req @staticmethod def _build_deregister_request(target_config_id, endpoint): if isinstance(endpoint, EstopEndpoint): endpoint = endpoint.to_proto() req = estop_pb2.DeregisterEstopEndpointRequest(target_config_id=target_config_id, target_endpoint=endpoint) return req @staticmethod def _choose_check_in_err_func(suppress_incorrect): if suppress_incorrect: return _check_in_error_from_response_no_incorrect else: return _check_in_error_from_response
[docs]class EstopEndpoint(object): """Endpoint in the software estop system.""" # This is an estop role required in every configuration. REQUIRED_ROLE = 'PDB_rooted' def __init__(self, client, name, estop_timeout, role=REQUIRED_ROLE, first_checkin=True, estop_cut_power_timeout=None): self.client = client self.role = role self.estop_timeout = estop_timeout self.estop_cut_power_timeout = estop_cut_power_timeout self._last_set_level = None self._challenge = None self._name = name self._unique_id = None self._config_id = None self._lock = threading.Lock() self._locked_first_checkin = first_checkin self.logger = logging.getLogger(self._name) def __str__(self): if self.estop_cut_power_timeout is None: return '{} (timeout {:.3}s)'.format(self._name, self.estop_timeout) else: return '{} (timeout {:.3}s, cut_power_timeout {:.3}s)'.format( self._name, self.estop_timeout, self.estop_cut_power_timeout) @property def last_set_level(self): return self._last_set_level def _first_checkin(self): with self._lock: return self._locked_first_checkin def _set_first_checkin(self, val): with self._lock: self._locked_first_checkin = val def _set_challenge_without_exception_from_future(self, fut): """Set challenge from the response in the future, if at all possible.""" new_challenge = None try: # Try to get the FutureWrapper.result() new_challenge = fut.result() except EstopResponseError as exc: # If this failed with an EstopResponseError, we can likely still extract a challenge. new_challenge = _challenge_from_check_in_response(exc.response) except (Error) as exc: # Otherwise, if it was a common ResponseError or an issue with the RPC itself, # we likely cannot. self.logger.warning('Could not set challenge (error: %s)', exc.__class__.__name__) # If we got a new challenge, set it. if new_challenge is not None: self.set_challenge(new_challenge)
[docs] def set_challenge(self, challenge): """Thread-safe write to the challenge.""" with self._lock: self._challenge = challenge
[docs] def get_challenge(self): """Thread-safe read of the challenge.""" with self._lock: return self._challenge
[docs] def force_simple_setup(self): """Replaces the existing estop configuration with a single-endpoint configuration.""" new_config = estop_pb2.EstopConfig() new_config_endpoint = new_config.endpoints.add() new_config_endpoint.CopyFrom(self.to_proto()) active_config = self.client.get_config() active_config = self.client.set_config(new_config, active_config.unique_id) self._unique_id = active_config.endpoints[0].unique_id self.register(active_config.unique_id)
[docs] def stop(self, **kwargs): """Issue a CUT stop level command to the robot, cutting motor power immediately. Args: kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: None. Will raise an error if RPC failed. """ self.logger.debug('Stopping') self.check_in_at_level(StopLevel.ESTOP_LEVEL_CUT, **kwargs)
[docs] def settle_then_cut(self, **kwargs): """Issue a SETTLE_THEN_CUT stop level. The robot will attempt to sit before cutting motor power. Args: kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: None. Will raise an error if RPC failed. """ self.logger.debug('Stopping with SETTLE_THEN_CUT') self.check_in_at_level(StopLevel.ESTOP_LEVEL_SETTLE_THEN_CUT, **kwargs)
[docs] def allow(self, **kwargs): """Issue a NONE stop level command to the robot, allowing motor power. Args: kwargs: Passed to underlying RPC. Example: timeout=5 to cancel the RPC after 5 seconds. Returns: None. Will raise an error if RPC failed. """ self.logger.debug('Releasing') self.check_in_at_level(StopLevel.ESTOP_LEVEL_NONE, **kwargs)
[docs] def check_in_at_level(self, level, **kwargs): """Check in at a specified level. Meant for internal use, but may be helpful for higher-level wrappers. """ try: self.set_challenge( self.client.check_in(level, self, self.get_challenge(), self._response(), suppress_incorrect=self._first_checkin(), **kwargs)) except EstopResponseError as exc: self.set_challenge(_challenge_from_check_in_response(exc.response)) raise else: self._last_set_level = level self._set_first_checkin(False)
[docs] def deregister(self, **kwargs): """Deregister this endpoint from the configuration.""" self.logger.debug('Deregistering') self.client.deregister(self._config_id, self)
[docs] def register(self, target_config_id, **kwargs): """Register this endpoint to the given configuration.""" self.logger.debug('Registering to %s', target_config_id) new_endpoint = self.client.register(target_config_id, self) # If we were successful, update our config id and the rest of our data from the returned # endpoint protobuf. self._config_id = target_config_id self.from_proto(new_endpoint) self.logger.debug('Doing check-in to seed challenge...') self.stop()
[docs] def stop_async(self, **kwargs): """Async version of stop()""" self.logger.debug('Stopping (async)') return self.check_in_at_level_async(StopLevel.ESTOP_LEVEL_CUT, **kwargs)
[docs] def settle_then_cut_async(self, **kwargs): """Async version of settle_then_cut()""" self.logger.debug('Stopping with SETTLE_THEN_CUT (async)') return self.check_in_at_level_async(StopLevel.ESTOP_LEVEL_SETTLE_THEN_CUT, **kwargs)
[docs] def allow_async(self, **kwargs): """Async version of allow()""" self.logger.debug('Releasing (async)') return self.check_in_at_level_async(StopLevel.ESTOP_LEVEL_NONE, **kwargs)
[docs] def check_in_at_level_async(self, level, **kwargs): fut = self.client.check_in_async(level, self, self.get_challenge(), self._response(), suppress_incorrect=self._first_checkin(), **kwargs) fut.add_done_callback(lambda fut: self._set_challenge_without_exception_from_future(fut)) fut.add_done_callback(lambda fut: self._set_first_checkin(False)) return fut
[docs] def deregister_async(self, **kwargs): """Async version of deregister()""" self.logger.debug('Deregistering (async)') return self.client.deregister_async(self._config_id, self)
[docs] def from_proto(self, proto): """Set member variables based on given estop_pb2.EstopEndpoint.""" if self._name != proto.name: self.logger.info('Changing name to %s', proto.name) self._name = proto.name self.logger = logging.getLogger(self._name) self.role = proto.role self.estop_timeout = proto.timeout.seconds + proto.timeout.nanos * 1e-9 self._unique_id = proto.unique_id if proto.cut_power_timeout is None: self.estop_cut_power_timeout = None else: self.estop_cut_power_timeout = proto.cut_power_timeout.seconds + proto.cut_power_timeout.nanos * 1e-9
[docs] def to_proto(self): """Return estop_pb2.EstopEndpoint based on current member variables.""" t_seconds = int(self.estop_timeout) t_nanos = int((self.estop_timeout - t_seconds) * 1e9) if self.estop_cut_power_timeout is None: return estop_pb2.EstopEndpoint(role=self.role, name=self._name, unique_id=self._unique_id, timeout=Duration(seconds=t_seconds, nanos=t_nanos)) else: cpt_seconds = int(self.estop_cut_power_timeout) cpt_nanos = int((self.estop_cut_power_timeout - cpt_seconds) * 1e9) return estop_pb2.EstopEndpoint( role=self.role, name=self._name, unique_id=self._unique_id, timeout=Duration(seconds=t_seconds, nanos=t_nanos), cut_power_timeout=Duration(seconds=cpt_seconds, nanos=cpt_nanos))
def _response(self): """Generate a response for self._challenge.""" challenge = self.get_challenge() return None if challenge is None else response_from_challenge(challenge) @property def unique_id(self): """Return the _unique_id. Should be used as read-only.""" return self._unique_id
[docs]class EstopKeepAlive(object): """Wraps an EstopEndpoint to do periodic check-ins, keeping software estop from timing out. This is intended to be the common implementation of both periodic checking-in and one-time check-ins. See the command line utility and the "Big Red Button" application for examples. You should not access any of the "private" members, or the wrapped endpoint. """ def __init__(self, endpoint, rpc_timeout_seconds=None, rpc_interval_seconds=None, keep_running_cb=None, max_status_queue_size=20): """Kicks off periodic check-in on a thread.""" self._endpoint = endpoint self._lock = threading.Lock() self._end_check_in_signal = threading.Event() self._desired_stop_level = StopLevel.ESTOP_LEVEL_NONE # By default, only let our RPCs last as long as the estop timeout. self._rpc_timeout = rpc_timeout_seconds or self._endpoint.estop_timeout self._check_in_period = rpc_interval_seconds or self._endpoint.estop_timeout / 3.0 if self._rpc_timeout <= 0: raise ValueError('Invalid rpc_timeout_seconds "{}"'.format(self._rpc_timeout)) if self._check_in_period < 0: raise ValueError('Invalid rpc_interval_seconds "{}"'.format(self._check_in_period)) self._keep_running = keep_running_cb or (lambda: True) self.logger.debug('New %s for endpoint "%s"', self.__class__, self._endpoint) self.status_queue = queue.Queue(maxsize=max_status_queue_size) self._update_status(self.KeepAliveStatus.OK) # Do an initial check-in, and just log any errors that occur. # This lets us get a challenge from the estop system, so we can begin using # valid challenge/response pairs. try: self._check_in() #pylint: disable=broad-except except Exception as exc: self.logger.warning('Estop initial check-in exception:\n{}\n'.format(exc)) # Configure the thread to do check-ins, and begin checking in. self._thread = threading.Thread(target=self._periodic_check_in) self._thread.daemon = True self._thread.start() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown()
[docs] def shutdown(self): self.logger.debug('Shutting down') self._end_periodic_check_in() self._thread.join()
@property def last_set_level(self): return self._endpoint.last_set_level @property def logger(self): return self._endpoint.logger
[docs] def allow(self): with self._lock: self._desired_stop_level = StopLevel.ESTOP_LEVEL_NONE self._check_in()
[docs] def settle_then_cut(self): with self._lock: self._desired_stop_level = StopLevel.ESTOP_LEVEL_SETTLE_THEN_CUT # In the case of stopping, we want the RPC to timeout only after the estop itself has # timed out. This handles the case where a user calls settle_then_cut() with : # - a very low self._rpc_timeout and # - a very large check-in period and # - doesn't handle RPC timeout errors themselves. self._check_in(rpc_timeout=self._endpoint.estop_timeout)
[docs] def stop(self): with self._lock: self._desired_stop_level = StopLevel.ESTOP_LEVEL_CUT # In the case of stopping, we want the RPC to timeout only after the estop itself has # timed out. This handles the case where a user calls stop() with : # - a very low self._rpc_timeout and # - a very large check-in period and # - doesn't handle RPC timeout errors themselves. self._check_in(rpc_timeout=self._endpoint.estop_timeout)
def _end_periodic_check_in(self): """Stop checking into the robot estop system.""" self.logger.debug('Stopping check-in') self._end_check_in_signal.set() def _error(self, msg, exception=None, disable=False): """Handle an error message; optionally disable the application. GUI applications should override this function, to make sure the error_msg gets to the GUI. """ self._update_status(self.KeepAliveStatus.ERROR, msg) self.logger.error(msg) if disable: self._end_periodic_check_in() self._update_status(self.KeepAliveStatus.DISABLED, msg) def _ok(self): """Handle an ok message. GUI applications should override this function, to make sure the error_msg gets to the GUI. """ self._update_status(self.KeepAliveStatus.OK) self.logger.debug('Check-in successful') def _update_status(self, status, msg=''): """Update the estop_keep_alive status by populating the queue, clearing old entries if the queue is full. Note: this method is not thread safe because if called by multiple different threads it could create a race condition which will raise a FullQueue exception. The EstopKeepAlive only uses this in a single background thread for the _periodic_check_in method. """ if self.status_queue.full(): # Remove an element to clear out the status queue for the new element. self.status_queue.get_nowait() self.status_queue.put((status, msg), block=False) def _check_in(self, rpc_timeout=None): """Check in, optionally specifying a non-standard RPC timeout.""" rpc_timeout = rpc_timeout or self._rpc_timeout with self._lock: self._endpoint.check_in_at_level(self._desired_stop_level, timeout=rpc_timeout) def _periodic_check_in(self): """Send estop API CheckIn messages to robot estop system in loop.""" # Sleep for portion of the timeout (and convert from nanoseconds to seconds) self.logger.info('Starting estop check-in') while True: # Include the time it takes to execute keep_running, in case it takes a significant # portion of our check in period. exec_start = time.time() if not self._keep_running(): break try: self._check_in() except TimedOutError as exc: self._error('RPC took longer than {:.2f} seconds'.format(self._rpc_timeout), exception=exc) except RpcError as exc: self._error( 'Transport exception during check-in:\n{}\n' ' (resuming check-in)'.format(exc), exception=exc) except EndpointUnknownError as exc: # Disable ourself to show we cannot estop any longer. self._error(str(exc), exception=exc, disable=True) # We really do want to catch anything. #pylint: disable=broad-except except Exception as exc: self.logger.warning(('Generic exception during check-in:\n{}\n' ' (resuming check-in)').format(exc)) else: # No errors! self._ok() # How long did the RPC and processing of said RPC take? exec_sec = time.time() - exec_start # Block and wait for the stop signal. If we receive it within the check-in period, # leave the loop. This check must be at the end of the loop! # Wait up to self._check_in_period seconds, minus the RPC processing time. # (values < 0 are OK and will return immediately) if self._end_check_in_signal.wait(self._check_in_period - exec_sec): break self.logger.info('Estop check-in stopped') @property def endpoint(self): """Return the _endpoint. Should be used as read-only.""" return self._endpoint @property def client(self): """Return the _endpoint.client. Should be used as read-only.""" return self._endpoint.client
[docs] class KeepAliveStatus(enum.Enum): OK = 0 ERROR = 1 DISABLED = 2
[docs]def is_estopped(estop_client, **kwargs): """Returns true if robot is estopped, false otherwise. Raises: RpcError: Problem communicating with the robot """ response = estop_client.get_status(**kwargs) return response.stop_level != estop_pb2.ESTOP_LEVEL_NONE
[docs]def response_from_challenge(challenge): return ctypes.c_ulonglong(~challenge).value
_CHECK_IN_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _CHECK_IN_STATUS_TO_ERROR.update({ estop_pb2.EstopCheckInResponse.STATUS_OK: (None, None), estop_pb2.EstopCheckInResponse.STATUS_ENDPOINT_UNKNOWN: (EndpointUnknownError, EndpointUnknownError.__doc__), estop_pb2.EstopCheckInResponse.STATUS_INCORRECT_CHALLENGE_RESPONSE: (IncorrectChallengeResponseError, IncorrectChallengeResponseError.__doc__), }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _check_in_error_from_response(response): """Return an exception based on response from EstopCheckIn RPC, None if no error.""" return error_factory(response, response.status, status_to_string=estop_pb2.EstopCheckInResponse.Status.Name, status_to_error=_CHECK_IN_STATUS_TO_ERROR) def _check_in_error_from_response_no_incorrect(resp): """Return an exception based on response from EstopCheckIn RPC, ignoring incorrect chal/resp""" if resp.status == estop_pb2.EstopCheckInResponse.STATUS_INCORRECT_CHALLENGE_RESPONSE: return None return _check_in_error_from_response(resp) _SET_CONFIG_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _SET_CONFIG_STATUS_TO_ERROR.update({ estop_pb2.SetEstopConfigResponse.STATUS_SUCCESS: (None, None), estop_pb2.SetEstopConfigResponse.STATUS_INVALID_ID: (InvalidIdError, InvalidIdError.__doc__), estop_pb2.SetEstopConfigResponse.STATUS_MOTORS_ON: (MotorsOnError, MotorsOnError.__doc__), }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _set_config_error_from_response(response): """Return an exception based on response from SetEstopConfig RPC, None if no error.""" return error_factory(response, response.status, status_to_string=estop_pb2.SetEstopConfigResponse.Status.Name, status_to_error=_SET_CONFIG_STATUS_TO_ERROR) _DEREGISTER_ENDPOINT_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _DEREGISTER_ENDPOINT_STATUS_TO_ERROR.update({ estop_pb2.DeregisterEstopEndpointResponse.STATUS_SUCCESS: (None, None), estop_pb2.DeregisterEstopEndpointResponse.STATUS_ENDPOINT_MISMATCH: (EndpointMismatchError, EndpointMismatchError.__doc__), estop_pb2.DeregisterEstopEndpointResponse.STATUS_CONFIG_MISMATCH: (ConfigMismatchError, ConfigMismatchError.__doc__), estop_pb2.DeregisterEstopEndpointResponse.STATUS_MOTORS_ON: (MotorsOnError, MotorsOnError.__doc__), }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _deregister_endpoint_error_from_response(response): """Return an exception based on response from DeregisterEstopEndpoint RPC, None if no error.""" return error_factory(response, response.status, status_to_string=estop_pb2.DeregisterEstopEndpointResponse.Status.Name, status_to_error=_DEREGISTER_ENDPOINT_STATUS_TO_ERROR) _REGISTER_ENDPOINT_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _REGISTER_ENDPOINT_STATUS_TO_ERROR.update({ estop_pb2.RegisterEstopEndpointResponse.STATUS_SUCCESS: (None, None), estop_pb2.RegisterEstopEndpointResponse.STATUS_ENDPOINT_MISMATCH: (EndpointMismatchError, EndpointMismatchError.__doc__), estop_pb2.RegisterEstopEndpointResponse.STATUS_CONFIG_MISMATCH: (ConfigMismatchError, ConfigMismatchError.__doc__), estop_pb2.RegisterEstopEndpointResponse.STATUS_INVALID_ENDPOINT: (InvalidEndpointError, InvalidEndpointError.__doc__), }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _register_endpoint_error_from_response(response): """Return an exception based on response from RegisterEstopEndpoint RPC, None if no error.""" return error_factory(response, response.status, status_to_string=estop_pb2.RegisterEstopEndpointResponse.Status.Name, status_to_error=_REGISTER_ENDPOINT_STATUS_TO_ERROR) def _new_endpoint_from_register_response(response): return response.new_endpoint def _active_config_from_config_response(response): return response.active_config def _challenge_from_check_in_response(response): return response.challenge def _estop_sys_status_from_response(response): return response.status