# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client for the directory registration service.
A DirectoryRegistrationClient allows a client to modify information about other API services
available on a robot.
"""
import collections
import logging
import threading
import time
from bosdyn.api import (directory_pb2, directory_registration_pb2,
directory_registration_service_pb2_grpc)
from bosdyn.client.common import (BaseClient, error_factory, error_pair,
handle_common_header_errors, handle_unset_status_error)
from .exceptions import ResponseError, RetryableUnavailableError, TimedOutError
_LOGGER = logging.getLogger(__name__)
[docs]class DirectoryRegistrationResponseError(ResponseError):
"""General class of errors for directory registration responses."""
[docs]class ServiceAlreadyExistsError(DirectoryRegistrationResponseError):
"""The service already exists on the robot."""
[docs]class ServiceDoesNotExistError(DirectoryRegistrationResponseError):
"""The specified service does not exist on the robot."""
[docs]class DirectoryRegistrationClient(BaseClient):
"""Write off-robot services and modify their information."""
default_service_name = 'directory-registration'
service_type = 'bosdyn.api.DirectoryRegistrationService'
def __init__(self):
super(DirectoryRegistrationClient, self).__init__(
directory_registration_service_pb2_grpc.DirectoryRegistrationServiceStub)
[docs] def register(
self,
name,
service_type,
authority,
host_ip,
port,
user_token_required=True,
liveness_timeout_secs=0,
**kwargs):
"""Register a service routing with the robot.
If service name already registered, no change will be applied and will raise ServiceAlreadyExistsError.
Every request received by the robot will serve as a heartbeat and update the service last_update field.
Args:
name: The name of the service. Must be unique.
service_type: The GRPC service definition defining the calls to/from this service.
(authority, service_type) must be unique in the directory.
authority: The authority used to direct calls to this service.
(authority, service_type) must be unique in the directory.
host_ip: The ip address of the system that the service is being hosted on.
port: The port number the service can be accessed through on the host system.
user_token_required: If a user token should be verified to access the service.
liveness_timeout_secs: Number of seconds without directory heartbeat before timeout fault.
Raises:
RpcError: Problem communicating with the robot.
ServiceAlreadyExistsError: The service already exists.
DirectoryRegistrationResponseError: Something went wrong during the directory registration.
"""
service_entry = directory_pb2.ServiceEntry(
name=name,
type=service_type,
authority=authority,
user_token_required=user_token_required,
liveness_timeout_secs=liveness_timeout_secs)
return self._call_register_rpc(service_entry, host_ip, port, **kwargs)
def _call_register_rpc(self, service_entry, host_ip, port, **kwargs):
"""Helper function to register a service definition.
Args:
service_entry (directory_pb2.ServiceEntry): Service definition to register.
host_ip: The ip address of the system that the service is being hosted on.
port: The port number the service can be accessed through on the host system.
Raises:
RpcError: Problem communicating with the robot.
ServiceAlreadyExistsError: The service already exists.
DirectoryRegistrationResponseError: Something went wrong during the directory registration.
"""
endpoint = directory_pb2.Endpoint(host_ip=host_ip, port=port)
req = directory_registration_pb2.RegisterServiceRequest(service_entry=service_entry,
endpoint=endpoint)
return self.call(self._stub.RegisterService, req,
error_from_response=_directory_register_error, copy_request=False,
**kwargs)
[docs] def update(
self,
name,
service_type,
authority,
host_ip,
port,
user_token_required=True,
liveness_timeout_secs=0,
**kwargs):
"""Update a service definition of an existing service that matches the service name.
If service name is not registered, will raise ServiceDoesNotExistError.
Every request received by the robot will serve as a heartbeat and update the service last_update field.
Args:
name: The name of the service to be updated.
service_type: The GRPC service definition defining the calls to/from this service.
(authority, service_type) must be unique in the directory.
authority: The authority used to direct calls to this service.
(authority, service_type) must be unique in the directory.
host_ip: The ip address of the system that the service is being hosted on.
port: The port number the service can be accessed through on the host system.
user_token_required: If a user token should be verified to access the service.
liveness_timeout_secs: Number of seconds without directory heartbeat before timeout fault.
Raises:
RpcError: Problem communicating with the robot.
ServiceDoesNotExistError: The service does not exist.
DirectoryRegistrationResponseError: Something went wrong during the directory registration.
"""
service_entry = directory_pb2.ServiceEntry(
name=name,
type=service_type,
authority=authority,
user_token_required=user_token_required,
liveness_timeout_secs=liveness_timeout_secs)
return self._call_update_rpc(service_entry, host_ip, port, **kwargs)
def _call_update_rpc(self, service_entry, host_ip, port, **kwargs):
"""Helper function to update a service definition of an existing service that matches the
service name.
Args:
service_entry (directory_pb2.ServiceEntry): Service definition to update.
host_ip: The ip address of the system that the service is being hosted on.
port: The port number the service can be accessed through on the host system.
Raises:
RpcError: Problem communicating with the robot.
ServiceDoesNotExistError: The service does not exist.
DirectoryRegistrationResponseError: Something went wrong during the directory registration.
"""
endpoint = directory_pb2.Endpoint(host_ip=host_ip, port=port)
req = directory_registration_pb2.UpdateServiceRequest(service_entry=service_entry,
endpoint=endpoint)
return self.call(self._stub.UpdateService, req, error_from_response=_directory_update_error,
copy_request=False, **kwargs)
[docs] def unregister(self, name, **kwargs):
"""Remove a service routing with the robot.
Args:
name: The name of the service to be removed.
Raises:
RpcError: Problem communicating with the robot.
ServiceDoesNotExistError: The service does not exist.
DirectoryRegistrationResponseError: Something went wrong during the directory registration.
"""
req = directory_registration_pb2.UnregisterServiceRequest(service_name=name)
return self.call(self._stub.UnregisterService, req,
error_from_response=_directory_unregister_error, copy_request=False,
**kwargs)
_REGISTER_STATUS_TO_ERROR = collections.defaultdict(lambda:
(DirectoryRegistrationResponseError, None))
_REGISTER_STATUS_TO_ERROR.update({
directory_registration_pb2.RegisterServiceResponse.STATUS_OK: (None, None),
directory_registration_pb2.RegisterServiceResponse.STATUS_ALREADY_EXISTS:
error_pair(ServiceAlreadyExistsError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _directory_register_error(response):
"""Return an exception based on response from Register RPC, None if no error."""
return error_factory(
response, response.status,
status_to_string=directory_registration_pb2.RegisterServiceResponse.Status.Name,
status_to_error=_REGISTER_STATUS_TO_ERROR)
_UPDATE_STATUS_TO_ERROR = collections.defaultdict(lambda:
(DirectoryRegistrationResponseError, None))
_UPDATE_STATUS_TO_ERROR.update({
directory_registration_pb2.UpdateServiceResponse.STATUS_OK: (None, None),
directory_registration_pb2.UpdateServiceResponse.STATUS_NONEXISTENT_SERVICE:
error_pair(ServiceDoesNotExistError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _directory_update_error(response):
"""Return an exception based on response from Update RPC, None if no error."""
return error_factory(
response, response.status,
status_to_string=directory_registration_pb2.UpdateServiceResponse.Status.Name,
status_to_error=_UPDATE_STATUS_TO_ERROR)
_UNREGISTER_STATUS_TO_ERROR = collections.defaultdict(lambda:
(DirectoryRegistrationResponseError, None))
_UNREGISTER_STATUS_TO_ERROR.update({
directory_registration_pb2.UnregisterServiceResponse.STATUS_OK: (None, None),
directory_registration_pb2.UnregisterServiceResponse.STATUS_NONEXISTENT_SERVICE:
error_pair(ServiceDoesNotExistError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _directory_unregister_error(response):
"""Return an exception based on response from Unregister RPC, None if no error."""
return error_factory(
response, response.status,
status_to_string=directory_registration_pb2.UnregisterServiceResponse.Status.Name,
status_to_error=_UNREGISTER_STATUS_TO_ERROR)
[docs]def reset_service_registration(
directory_registration_client,
name,
service_type,
authority,
host_ip,
port,
user_token_required=True,
liveness_timeout_secs=0):
"""Reset a service registration by unregistering the service and then re-registering it.
This is useful when a program wants to register a new service but there may be an old entry
in the robot directory from a previous instance of the program. If the service
does not already exist, the exception will be suppressed and a new registration will
still be performed. Unregistering the service has the advantage of clearing all service
faults, if any existed.
"""
try:
directory_registration_client.unregister(name)
except ServiceDoesNotExistError:
pass
directory_registration_client.register(
name,
service_type,
authority,
host_ip,
port,
user_token_required=user_token_required,
liveness_timeout_secs=liveness_timeout_secs)
[docs]class DirectoryRegistrationKeepAlive(object):
"""Helper class to keep a directory entry updated.
Assuming the directory itself is hosted on the robot, and the service being registered in the
directory is on a payload, use of this class streamlines the following cases:
1) The payload, or the payload-hosted service, is restarted.
2) The robot is restarted.
3) On-robot processes clear out the directory. This can happen in rare cases.
This class will also maintain liveness status with the robot directory, if enabled for this
service, by sending a registration/update request at the specified interval.
Args:
dir_reg_client: Client to the directory registration service.
logger: logging.Logger object to log with. Defaults to None, in which case one with the
class name is acquired.
rpc_timeout_seconds: Number of seconds to wait for a dir_reg_client RPC. Defaults to None,
for no timeout.
rpc_interval_seconds: Interval at which to request service registrations.
"""
def __init__(self, dir_reg_client, logger=None, rpc_timeout_seconds=None,
rpc_interval_seconds=30):
self.authority = None
self.directory_name = None
self.host = None
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.port = None
self.service_type = None
self.dir_reg_client = dir_reg_client
self._end_reregister_signal = threading.Event()
self._lock = threading.Lock()
self._rpc_timeout = rpc_timeout_seconds
self._reregister_period = rpc_interval_seconds
# Configure the thread to do re-registration.
self._thread = threading.Thread(target=self._periodic_reregister)
self._thread.daemon = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
self.unregister()
[docs] def start(
self,
directory_name,
service_type,
authority,
host,
port,
liveness_timeout_secs=None,
user_token_required=True,
reset_service=True):
"""Register, optionally update, and then kick off thread.
Can not be restarted with this method after a shutdown.
Args:
directory_name: See directory.proto for details.
service_type: See directory.proto for details.
authority: See directory.proto for details.
host: See directory.proto for details.
port: See directory.proto for details.
liveness_timeout_secs: See directory.proto for details. Defaults to
2.5x rpc_interval_seconds.
user_token_required: See directory.proto for details.
reset_service: Fully reset the service registration before the periodic registrations.
Raises:
RpcError: Problem communicating with the robot.
"""
if liveness_timeout_secs is None:
liveness_timeout_secs = self._reregister_period * 2.5
if reset_service:
reset_service_registration(
self.dir_reg_client,
directory_name,
service_type,
authority,
host,
port,
user_token_required=user_token_required,
liveness_timeout_secs=liveness_timeout_secs)
else:
try:
self.dir_reg_client.register(
directory_name,
service_type,
authority,
host,
port,
user_token_required=user_token_required,
liveness_timeout_secs=liveness_timeout_secs)
except ServiceAlreadyExistsError as exc:
self.dir_reg_client.update(
directory_name,
service_type,
authority,
host,
port,
user_token_required=user_token_required,
liveness_timeout_secs=liveness_timeout_secs)
self.logger.info('{} service registered/updated.'.format(directory_name))
self.authority = authority
self.directory_name = directory_name
self.host = host
self.port = port
self.service_type = service_type
self.user_token_required = user_token_required
self.liveness_timeout_secs = liveness_timeout_secs
# This will raise an exception if the thread has already started.
self._thread.start()
return self
[docs] def is_alive(self):
"""Are we still periodically re-registering?
Returns:
A bool stating if still alive
"""
return self._thread.is_alive()
[docs] def shutdown(self):
"""Stop the background thread."""
self.logger.info('Shutting down {} keep alive'.format(self.directory_name))
self._end_reregister_signal.set()
self._thread.join()
[docs] def unregister(self):
"""Remove service from the directory.
Raises:
RpcError: Problem communicating with the robot.
ServiceDoesNotExistError: The service does not exist.
"""
self.logger.info('Unregistering {} from directory'.format(self.directory_name))
self.dir_reg_client.unregister(self.directory_name, timeout=self._rpc_timeout)
def _periodic_reregister(self):
"""Handles an accidental removal of the service from the directory.
Raises:
RpcError: Problem communicating with the robot.
"""
self.logger.info('Starting directory registration loop for {}'.format(self.directory_name))
while True:
exec_start = time.time()
try:
self.dir_reg_client.register(
self.directory_name,
self.service_type,
self.authority,
self.host,
self.port,
user_token_required=self.user_token_required,
liveness_timeout_secs=self.liveness_timeout_secs,
timeout=self._rpc_timeout)
except ServiceAlreadyExistsError:
# Ignore "already registered" errors -- we expect those.
# We do not allow anyone to change the directory parameters with an "update" call,
# because we assume that the lifespan of this thread matches the lifespan of the
# service being registered.
pass
except RetryableUnavailableError:
# Ignore transient availability errors and retry.
pass
except TimedOutError:
self.logger.warning('Timed out, timeout set to "{}"'.format(self._rpc_timeout))
except Exception:
# Log all other exceptions, but continue looping in hopes that it resolves itself
self.logger.exception('Caught general exception')
exec_sec = time.time() - exec_start
if self._end_reregister_signal.wait(self._reregister_period - exec_sec):
break