# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client implementation of the Keepalive service."""
from __future__ import print_function
import collections
import logging
import threading
import time
from typing import Callable, List, Union
import bosdyn.client.lease
import bosdyn.util
from bosdyn.api.keepalive import keepalive_pb2, keepalive_service_pb2_grpc
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory, error_pair,
handle_common_header_errors, handle_unset_status_error)
from bosdyn.client.exceptions import ResponseError, RetryableRpcError
[docs]class KeepaliveResponseError(ResponseError):
"""Error in Keepalive RPC"""
[docs]class InvalidLeaseError(KeepaliveResponseError):
"""A policy's associated lease was not the same, super, or sub lease of the active lease."""
[docs]class InvalidPolicyError(KeepaliveResponseError):
"""The specified policy ID was not valid."""
[docs]class Policy():
"""Helper class for API Policy."""
def __init__(self, proto: Union[None, 'keepalive_pb2.Policy'] = None):
"""Constructor"""
self.policy_proto = proto or keepalive_pb2.Policy()
@property
def name(self) -> str:
"""Get or set the name of the Policy"""
return self.policy_proto.name
@name.setter
def name(self, _name: str):
self.policy_proto.name = _name
[docs] def add_associated_lease(self, lease: Union['bosdyn.client.lease.Lease', 'lease_pb2.Lease']):
if isinstance(lease, bosdyn.client.lease.Lease):
self.policy_proto.associated_leases.append(lease.lease_proto)
else:
self.policy_proto.associated_leases.append(lease)
[docs] def add_controlled_motors_off_action(self, after: float):
"""Add a 'controlled motors off' action that triggers after specified time (seconds)."""
self._configure_action(after, lambda action: action.controlled_motors_off.SetInParent())
[docs] def add_record_event_action(self, events: List['bosdyn.api.Event'], after: float):
"""Add a 'record event' action that triggers after specified time (seconds)."""
def copy_events(action):
for event in events:
action.record_event.events.add().CopyFrom(event)
self._configure_action(after, copy_events)
[docs] def add_auto_return_action(self, leases: List['bosdyn.client.lease.Lease'],
params: 'bosdyn.api.auto_return.Params', after: float):
"""Add an 'auto return' action that triggers after specified time (seconds)."""
def copy_params_and_leases(action):
action.auto_return.leases.extend(lease.lease_proto for lease in leases)
action.auto_return.params.CopyFrom(params)
self._configure_action(after, copy_params_and_leases)
[docs] def add_lease_stale_action(self, leases: List['bosdyn.client.lease.Lease'], after: float):
"""Add a 'mark lease stale' action that triggers after specified time (seconds)."""
def copy_leases(action):
action.lease_stale.leases.extend(lease.lease_proto for lease in leases)
self._configure_action(after, copy_leases)
[docs] def shortest_action_delay(self) -> Union[None, float]:
"""Get the shortest delay on an action, or None if no actions are set.
For example:
pol = Policy()
pol.add_controlled_motors_off_action(2.5)
pol.add_immediate_robot_off_action(1.2)
assert pol.shortest_action_delay() == 1.2
"""
delay = None
for actionafter in self.policy_proto.actions:
tmp = bosdyn.util.duration_to_seconds(actionafter.after)
if delay is None or tmp < delay:
delay = tmp
return delay
def _configure_action(self, after: float, set_action: Callable[['keepalive_pb2.ActionAfter'],
None]):
"""Helper function to reduce boilerplate of adding an action."""
action = self.policy_proto.actions.add()
action.after.CopyFrom(bosdyn.util.seconds_to_duration(after))
set_action(action)
[docs]class KeepaliveClient(BaseClient):
"""A client for the Keepalive service.
This client is in BETA and may undergo changes in future releases.
"""
default_service_name = 'keepalive'
service_type = 'bosdyn.api.keepalive.KeepaliveService'
def __init__(self):
super().__init__(keepalive_service_pb2_grpc.KeepaliveServiceStub)
self._timesync_endpoint = None
[docs] def modify_policy(self, to_add: Union['Policy', 'keepalive_pb2.Policy'] = None,
policy_ids_to_remove: List[int] = None, **kwargs):
"""Add given policy and remove policies with given ids."""
request = self._modify_policy_request(to_add, policy_ids_to_remove)
return self.call(self._stub.ModifyPolicy, request, None, modify_policy_error, **kwargs)
[docs] def modify_policy_async(self, to_add: Union['Policy', 'keepalive_pb2.Policy'] = None,
policy_ids_to_remove: List[int] = None, **kwargs):
"""Async version of the modify_policy() RPC."""
request = self._modify_policy_request(to_add, policy_ids_to_remove)
return self.call_async(self._stub.ModifyPolicy, request, None, modify_policy_error,
**kwargs)
[docs] def check_in(self, policy_id: int, **kwargs):
"""Check in for given policy_id, refreshing that policy's timer."""
request = self._check_in_request(policy_id)
return self.call(self._stub.CheckIn, request, None, check_in_error, **kwargs)
[docs] def check_in_async(self, policy_id: int, **kwargs):
"""Async version of the check_in() RPC."""
request = self._check_in_request(policy_id)
return self.call_async(self._stub.CheckIn, request, None, check_in_error, **kwargs)
[docs] def get_status(self, **kwargs):
"""Get status on all policies."""
request = keepalive_pb2.GetStatusRequest()
return self.call(self._stub.GetStatus, request, None, common_header_errors, **kwargs)
[docs] def get_status_async(self, **kwargs):
"""Async version of the get_status() RPC."""
request = keepalive_pb2.GetStatusRequest()
return self.call_async(self._stub.GetStatus, request, None, common_header_errors, **kwargs)
@staticmethod
def _modify_policy_request(to_add: Union['Policy', 'keepalive_pb2.Policy'],
policy_ids_to_remove):
if isinstance(to_add, Policy):
request = keepalive_pb2.ModifyPolicyRequest(to_add=to_add.policy_proto,
policy_ids_to_remove=policy_ids_to_remove)
else:
request = keepalive_pb2.ModifyPolicyRequest(to_add=to_add,
policy_ids_to_remove=policy_ids_to_remove)
return request
@staticmethod
def _check_in_request(policy_id):
request = keepalive_pb2.CheckInRequest(policy_id=policy_id)
return request
_MODIFY_POLICY_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_MODIFY_POLICY_STATUS_TO_ERROR.update({
keepalive_pb2.ModifyPolicyResponse.STATUS_INVALID_LEASE: error_pair(InvalidLeaseError),
keepalive_pb2.ModifyPolicyResponse.STATUS_INVALID_POLICY_ID: error_pair(InvalidPolicyError)
})
_CHECK_IN_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_CHECK_IN_STATUS_TO_ERROR.update(
{keepalive_pb2.CheckInResponse.STATUS_INVALID_POLICY_ID: error_pair(InvalidPolicyError)})
[docs]@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def modify_policy_error(response):
"""ModifyPolicy response to exception."""
return error_factory(response, response.status,
status_to_string=keepalive_pb2.ModifyPolicyResponse.Status.Name,
status_to_error=_MODIFY_POLICY_STATUS_TO_ERROR)
[docs]@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def check_in_error(response):
"""CheckIn response to exception."""
return error_factory(response, response.status,
status_to_string=keepalive_pb2.CheckInResponse.Status.Name,
status_to_error=_CHECK_IN_STATUS_TO_ERROR)
#pylint: disable=too-many-instance-attributes
[docs]class PolicyKeepalive():
"""Specify a keepalive Policy that should be held to.
Meant to be used as a context manager. For example:
client = robot.ensure_client(KeepaliveClient.default_service_name)
pol = Policy()
# After 30 seconds of not hearing from this process, turn the robot motors off.
pol.add_controlled_motors_off_action(30)
with PolicyKeepalive(client, pol, rpc_interval_seconds=3) as policy_keepalive:
# A thread will attempt a CheckIn every 3 seconds.
run_my_code()
"""
#pylint: disable=too-many-arguments
def __init__(self, client: KeepaliveClient, policy: Policy, rpc_timeout_seconds: float = None,
rpc_interval_seconds: float = None, logger: 'logging.Logger' = None,
remove_policy_on_exit: bool = False):
self.logger = logger or logging.getLogger()
self.remove_policy_on_exit = remove_policy_on_exit
self._client = client
self._policy = policy
self._policy_id = None
# If the interval isn't specified manually, try to get the interval from the policy,
# assuming the user wants to check in a few times before the earliest action.
# This will raise an exception if there's no action at all.
self._rpc_interval_seconds = rpc_interval_seconds or policy.shortest_action_delay() / 3
self._rpc_timeout_seconds = rpc_timeout_seconds
self._end_check_in_signal = threading.Event()
self._thread = threading.Thread(target=self._periodic_check_in)
self._thread.daemon = True
def __enter__(self):
"""Add this instance's policy and begin checking in."""
self._policy_id = self._client.modify_policy(to_add=self._policy).added_policy.policy_id
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop checking in, and optionally remove the policy."""
self.shutdown()
if self.remove_policy_on_exit:
self.remove_policy()
[docs] def remove_policy(self):
"""Remove this instance's policy, if it did manage to add one."""
if self._policy_id:
self._client.modify_policy(policy_ids_to_remove=[self._policy_id])
self._policy_id = None
[docs] def start(self):
"""Start the checkin thread."""
self._thread.start()
[docs] def shutdown(self):
"""Stop the checkin thread and block until it ends."""
self._end_periodic_check_in()
self._thread.join()
def _check_in(self):
self._client.check_in(self._policy_id, timeout=self._rpc_timeout_seconds)
def _end_periodic_check_in(self):
self._end_check_in_signal.set()
def _periodic_check_in(self):
while True:
exec_start = time.time()
try:
self._check_in()
except RetryableRpcError as exc:
self.logger.warning('exception during check-in:\n%s\n', exc)
self.logger.info('continuing check-in')
# How long did the RPC and processing of said RPC take?
exec_seconds = time.time() - exec_start
# Block and wait for the stop signal. If we receive it within the check-in period,
# leave the loop. This check must be at the end of the loop!
# Wait up to self._check_in_period seconds, minus the RPC processing time.
# (values < 0 are OK and will return immediately)
if self._end_check_in_signal.wait(self._rpc_interval_seconds - exec_seconds):
break
self.logger.info('Policy check-in stopped')
[docs]def remove_all_policies(keepalive_client, attempts=1):
"""Remove all policies on the robot.
Optionally do this over a few attempts, in case other things are also removing policies.
"""
last_exc = None
for i in range(attempts):
if last_exc:
time.sleep(0.5)
last_exc = None
all_policy_ids = [p.policy_id for p in keepalive_client.get_status().status]
if all_policy_ids:
try:
keepalive_client.modify_policy(policy_ids_to_remove=all_policy_ids)
except InvalidPolicyError as exc:
last_exc = exc
else:
break
else:
break
if last_exc:
raise last_exc