# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Lease validator tracks lease usage in intermediate services."""
import copy
import logging
import threading
from bosdyn.api import lease_pb2
from bosdyn.client.exceptions import Error
from bosdyn.client.lease import Lease, LeaseClient
from bosdyn.client.lease_resource_hierarchy import ResourceHierarchy
_LOGGER = logging.getLogger(__name__)
[docs]class LeaseValidator:
"""Lease validator tracks lease usage in intermediate services.
Track the most recent leases seen for each lease resource and test incoming leases against this
state.
Args:
robot (Robot): The robot object for which leases are associated to.
"""
def __init__(self, robot):
# Map tracking the lease resource and the current active lease.
self.active_lease_map = {}
# Lock for the active lease map.
self.active_lease_map_lock = threading.Lock()
# Blocking call to determine the lease resource tree from the robot.
self.hierarchy = None
if robot is not None:
try:
lease_client = robot.ensure_client(LeaseClient.default_service_name)
list_leases_response = lease_client.list_leases_full()
except Exception:
# TODO should we be catching exceptions here? Is this the right exception to catch?
# Some unit testing depends on this behavior.
_LOGGER.exception(
"Unable to set the resource hierarchy for robot %s's LeaseValidator.",
robot.host)
else:
# Resource hierarchy found from the robot! Use it for better validation of incoming
# leases.
self.hierarchy = ResourceHierarchy(list_leases_response.resource_tree)
[docs] def get_active_lease(self, resource):
"""Get the latest active lease.
Args:
resource(string): the resource for the specific lease to be returned.
Returns:
A Lease class object representing the most recent lease for the
requested resource, or None if no lease has been seen.
"""
with self.active_lease_map_lock:
return self._get_active_lease_locked(resource)
[docs] def test_active_lease(self, incoming_lease, allow_super_leases, allow_different_epoch=False):
"""Compare an incoming lease to the latest active lease.
Args:
incoming_lease(Lease object or lease_pb2.Lease): The incoming lease to test.
allow_super_leases(boolean): Should the comparison function consider a super lease as
ok.
Returns:
The LeaseUseResult proto message representing if the incoming lease is ok.
"""
lease_use_results = lease_pb2.LeaseUseResult()
with self.active_lease_map_lock:
status, previous_lease = self._test_active_lease_helper(incoming_lease,
allow_super_leases,
allow_different_epoch)
lease_use_results.status = status
self._populate_base_lease_use_results_locked(incoming_lease, previous_lease,
lease_use_results)
return lease_use_results
[docs] def test_and_set_active_lease(self, incoming_lease, allow_super_leases,
allow_different_epoch=False):
"""Compare an incoming lease to the latest active lease, and if it is ok then set it as
the latest lease.
Args:
incoming_lease(Lease object or lease_pb2.Lease): The incoming lease to test.
allow_super_leases(boolean): Should the comparison function consider a super lease as
ok.
Returns:
The LeaseUseResult proto message representing if the incoming lease is ok. Additionally,
if the result is STATUS_OK, then update the latest active lease to this incoming lease.
"""
lease_use_results = lease_pb2.LeaseUseResult()
with self.active_lease_map_lock:
status, previous_lease = self._test_active_lease_helper(incoming_lease,
allow_super_leases,
allow_different_epoch)
if status == lease_pb2.LeaseUseResult.STATUS_OK:
self._set_active_lease_locked(incoming_lease)
lease_use_results.status = status
self._populate_base_lease_use_results_locked(incoming_lease, previous_lease,
lease_use_results)
return lease_use_results
def _get_active_lease_locked(self, resource):
"""Locked method to get the active lease for a resource.
Note: The active_lease_map_lock must be locked!
"""
assert self.active_lease_map_lock.locked(), "Must have active_lease_map_lock locked."
if self.hierarchy is not None and not self.hierarchy.has_resource(resource):
return None
if resource not in self.active_lease_map:
return None
return self.active_lease_map[resource]
def _test_active_lease_helper(self, incoming_lease, allow_super_lease,
allow_different_epoch=False): # pylint: disable=too-many-return-statements,too-many-branches
"""Helper function to validate the lease and compare it to the active lease.
Returns:
A tuple with the LeaseUseResult.Status and the Lease object representing the
previous/last active lease.
"""
# Convert the lease into a Lease class object.
if isinstance(incoming_lease, lease_pb2.Lease):
if not Lease.is_valid_proto(incoming_lease):
return lease_pb2.LeaseUseResult.STATUS_INVALID_LEASE, None
incoming_lease = Lease(incoming_lease)
else:
if not incoming_lease.is_valid_lease():
return lease_pb2.LeaseUseResult.STATUS_INVALID_LEASE, None
# Check if the lease resource is in the hierarchy.
resc_of_interest = incoming_lease.lease_proto.resource
if self.hierarchy is not None:
if not self.hierarchy.has_resource(resc_of_interest):
return lease_pb2.LeaseUseResult.STATUS_UNMANAGED, None
# Determine the latest maximum lease.
current_lease_proto = self._maximum_lease(
self.hierarchy.get_hierarchy(resc_of_interest))
if not Lease.is_valid_proto(current_lease_proto):
# If the current lease proto is invalid/empty, then we will accept the incoming
# lease so mark it as ok!
return lease_pb2.LeaseUseResult.STATUS_OK, None
current_lease = Lease(current_lease_proto)
else:
# If for some reason we don't have the hierarchy, then fall back on just the active
# lease map.
if resc_of_interest not in self.active_lease_map:
# If the current lease proto is invalid/empty, then we will accept the incoming
# lease so mark it as ok!
return lease_pb2.LeaseUseResult.STATUS_OK, None
current_lease = self.active_lease_map[resc_of_interest]
compare_result = incoming_lease.compare(current_lease)
assert compare_result != Lease.CompareResult.DIFFERENT_RESOURCES, \
"Mismatched resources (%s vs %s) when comparing leases in the LeaseValidator." % (
incoming_lease.lease_proto.resource, current_lease.lease_proto.resource
)
if compare_result is Lease.CompareResult.DIFFERENT_EPOCHS: # pylint: disable=no-else-return
if allow_different_epoch:
return lease_pb2.LeaseUseResult.STATUS_OK, current_lease
else:
return lease_pb2.LeaseUseResult.STATUS_WRONG_EPOCH, current_lease
elif compare_result is Lease.CompareResult.SUPER_LEASE:
if allow_super_lease: # pylint: disable=no-else-return
return lease_pb2.LeaseUseResult.STATUS_OK, current_lease
else:
# If super leases are not allowed, then mark this as older.
return lease_pb2.LeaseUseResult.STATUS_OLDER, current_lease
elif compare_result is Lease.CompareResult.OLDER:
return lease_pb2.LeaseUseResult.STATUS_OLDER, current_lease
elif compare_result in (Lease.CompareResult.SUB_LEASE, Lease.CompareResult.SAME,
Lease.CompareResult.NEWER):
return lease_pb2.LeaseUseResult.STATUS_OK, current_lease
# We should not end up here since all compare results should be enumerated.
assert False, ("The compare_result case [%s] is unhandled by the LeaseValidator." %
compare_result)
def _set_active_lease(self, incoming_lease):
"""Helper set the active lease tracked for the specific lease resource."""
with self.active_lease_map_lock:
self._set_active_lease_locked(incoming_lease)
def _set_active_lease_locked(self, incoming_lease):
"""Locked helper which updates the active lease tracked for the specific lease resource.
Note: The active_lease_map_lock must be locked!
"""
assert self.active_lease_map_lock.locked(), "Must have active_lease_map_lock locked."
# Convert the lease into a Lease class object.
if isinstance(incoming_lease, lease_pb2.Lease):
incoming_lease = Lease(incoming_lease)
self.active_lease_map[incoming_lease.lease_proto.resource] = incoming_lease
if self.hierarchy is not None:
for leaf in self.hierarchy.leaf_resources():
leaf_lease_proto = copy.deepcopy(incoming_lease.lease_proto)
leaf_lease_proto.resource = leaf
self.active_lease_map[leaf] = Lease(leaf_lease_proto)
def _populate_base_lease_use_results_locked(self, attempted_lease, previous_lease,
mutable_lease_use_results):
"""Updates a mutable copy of the LeaseUseResult to fill out the debug fields.
Note: The active_lease_map_lock must be locked!
Args:
attempted_lease (Lease class object): The incoming/requested lease.
previous_lease (Lease class object): Optional previous lease that was last considered
the latest active lease.
mutable_lease_use_results(lease_pb2.LeaseUseResult):
"""
assert self.active_lease_map_lock.locked(), "Must have active_lease_map_lock locked."
if isinstance(attempted_lease, Lease):
attempted_lease = attempted_lease.lease_proto
mutable_lease_use_results.attempted_lease.CopyFrom(attempted_lease)
if previous_lease is not None:
mutable_lease_use_results.previous_lease.CopyFrom(previous_lease.lease_proto)
latest_known_lease = self._get_active_lease_locked(attempted_lease.resource)
if latest_known_lease is not None:
mutable_lease_use_results.latest_known_lease.CopyFrom(latest_known_lease.lease_proto)
if self.hierarchy is not None:
for leaf in self.hierarchy.leaf_resources():
if leaf in self.active_lease_map:
mutable_lease_use_results.latest_resources.extend(
[self.active_lease_map[leaf].lease_proto])
def _maximum_lease(self, hierarchy):
lease_proto = lease_pb2.Lease()
lease_proto.resource = hierarchy.get_resource()
# Determine the epoch for the maximum lease proto.
for leaf in hierarchy.leaf_resources():
if leaf in self.active_lease_map:
leaf_lease = self.active_lease_map[leaf]
if not leaf_lease.is_valid_lease():
continue
# Set the epoch if it is not yet set.
if lease_proto.epoch == "":
lease_proto.epoch = leaf_lease.lease_proto.epoch
result = Lease(lease_proto,
ignore_is_valid_check=True).compare(leaf_lease,
ignore_resources=True)
if result in (Lease.CompareResult.OLDER, Lease.CompareResult.SUPER_LEASE):
lease_proto.client_names[:] = leaf_lease.lease_proto.client_names
lease_proto.sequence[:] = leaf_lease.lease_proto.sequence
return lease_proto
[docs]class LeaseValidatorResponseProcessor: # pylint: disable=too-few-public-methods
"""LeaseValidatorResponseProcessor updates the lease validator using the
latest_known_lease from the response's LeaseUseResult.
Args:
lease_validator (LeaseValidator): validator for a specific robot to be updated.
"""
def __init__(self, lease_validator):
self.lease_validator = lease_validator
[docs] def mutate(self, response):
"""Update the lease validator if a response has a lease_use_result."""
try:
# AttributeError will occur if the response does not have a field named
# 'lease_use_result'
lease_use_results = [response.lease_use_result]
except AttributeError:
try:
# AttributeError will occur if the request class does not have a field named
# 'lease_use_results'
lease_use_results = response.lease_use_results
except AttributeError:
# If we get here, there's no 'lease' field nor a 'leases' field for usage results.
# There are responses that do not have either field, so just return.
return
for result in lease_use_results:
if result.status == lease_pb2.LeaseUseResult.STATUS_UNKNOWN:
continue
self.lease_validator.test_and_set_active_lease(result.latest_known_lease,
allow_super_leases=False)