Source code for bosdyn.client.area_callback_service_utils

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import logging
import time
from typing import List

import bosdyn.client
from bosdyn.api import service_fault_pb2
from bosdyn.api.graph_nav import area_callback_pb2
from bosdyn.api.service_customization_pb2 import DictParam
from bosdyn.client.directory import NonexistentServiceError
from bosdyn.client.fault import ServiceFaultAlreadyExistsError, ServiceFaultDoesNotExistError
from bosdyn.client.service_customization_helpers import dict_params_to_dict

_LOGGER = logging.getLogger(__name__)


[docs]class AreaCallbackServiceConfig: """Config data required to run a area callback service.""" def __init__(self, service_name: str, required_lease_resources: List[str] = (), log_begin_callback_data: bool = False, area_callback_information: area_callback_pb2.AreaCallbackInformation = None): """AreaCallbackServiceConfig constructor. Args: service_name (str): The name of the service, for registering with directory. required_lease_resources (List[str]): List of required lease resources. log_begin_callback_data (bool): Log the data field of the begin callback request. area_callback_information (area_callback_pb2.AreaCallbackInformation): Information describing the area callback. """ self.service_name = service_name self.required_lease_resources = required_lease_resources self.log_begin_callback_data = log_begin_callback_data self.area_callback_information = area_callback_information or \ area_callback_pb2.AreaCallbackInformation( required_lease_resources=self.required_lease_resources)
[docs] def parse_params(self, params: DictParam): """ Parse params and validate they agree with the spec stored in area_callback_information. Args: params (DictParam): The parameters being validated. """ return dict_params_to_dict(params, self.area_callback_information.custom_params)
# Helper to raise service faults when other services are unavailable.
[docs]def handle_service_faults(fault_client, robot_state_client, directory_client, service_name, prereq_services): service_fault = service_fault_pb2.ServiceFault() service_fault.fault_id.fault_name = service_name service_fault.fault_id.service_name = service_name service_fault.severity = service_fault_pb2.ServiceFault.SEVERITY_CRITICAL check_period = 0.5 # seconds. while True: time.sleep(check_period) # Don't fault the service if it doesn't actually exist. try: registered_service = directory_client.get_entry(service_name) except NonexistentServiceError as exc: continue except bosdyn.client.Error as exc: _LOGGER.error("Failed to check if %s exists during fault handling: %s", service_name, exc) continue set_fault = False fault_exists = False unavailable_services = [] # Need robot state to check existing faults. try: state = robot_state_client.get_robot_state() except bosdyn.client.Error as exc: _LOGGER.error("Failed to get robot state during fault handling: %s", exc) continue for prereq_service in prereq_services: # Make sure the prereq service exists. try: registered_service = directory_client.get_entry(prereq_service) except NonexistentServiceError as exc: set_fault = True unavailable_services.append(prereq_service) continue except bosdyn.client.Error as exc: _LOGGER.error("Failed to check if %s exists during fault handling: %s", prereq_service, exc) set_fault = True unavailable_services.append(prereq_service) continue # Make sure the prereq service isn't faulted. for fault in state.service_fault_state.faults: if fault.fault_id.service_name == prereq_service: set_fault = True unavailable_services.append(prereq_service) break # Check if the service is already faulted. for fault in state.service_fault_state.faults: if fault.fault_id.service_name == service_name: fault_exists = True break # Fault the service if there isn't an existing fault. if set_fault and not fault_exists: service_fault.error_message = 'Faulted due to issues with ' + ','.join( unavailable_services) try: fault_client.trigger_service_fault(service_fault) _LOGGER.info("Triggered fault on %s", service_name) except ServiceFaultAlreadyExistsError: pass except bosdyn.client.Error as exc: _LOGGER.error("Failed to set %s fault: %s", service_name, exc) # Otherwise, clear the fault if it exists. elif not set_fault and fault_exists: try: fault_client.clear_service_fault(service_fault.fault_id) _LOGGER.info("Cleared fault on %s", service_name) except ServiceFaultDoesNotExistError: pass except bosdyn.client.Error as exc: _LOGGER.error("Failed to clear %s fault: %s", service_name, exc)