Source code for bosdyn.client.area_callback_service_utils

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import logging
import time
from typing import List

from bosdyn.api import service_fault_pb2
from bosdyn.api.graph_nav import area_callback_pb2
from bosdyn.api.service_customization_pb2 import DictParam
from bosdyn.client.directory import NonexistentServiceError
from bosdyn.client.exceptions import RpcError
from bosdyn.client.fault import (FaultResponseError, ServiceFaultAlreadyExistsError,
                                 ServiceFaultDoesNotExistError)
from bosdyn.client.service_customization_helpers import dict_params_to_dict

_LOGGER = logging.getLogger(__name__)


[docs]class AreaCallbackServiceConfig: """Config data required to run a area callback service.""" def __init__(self, service_name: str, required_lease_resources: List[str] = (), log_begin_callback_data: bool = False, area_callback_information: area_callback_pb2.AreaCallbackInformation = None): """AreaCallbackServiceConfig constructor. Args: service_name (str): The name of the service, for registering with directory. required_lease_resources (List[str]): List of required lease resources. log_begin_callback_data (bool): Log the data field of the begin callback request. area_callback_information (area_callback_pb2.AreaCallbackInformation): Information describing the area callback. """ self.service_name = service_name self.required_lease_resources = required_lease_resources self.log_begin_callback_data = log_begin_callback_data self.area_callback_information = area_callback_information or \ area_callback_pb2.AreaCallbackInformation( required_lease_resources=self.required_lease_resources)
[docs] def parse_params(self, params: DictParam): """ Parse params and validate they agree with the spec stored in area_callback_information. Args: params (DictParam): The parameters being validated. """ return dict_params_to_dict(params, self.area_callback_information.custom_params)
# Helper to raise service faults when other services are unavailable.
[docs]def handle_service_faults(fault_client, robot_state_client, directory_client, service_name, prereq_services): service_fault = service_fault_pb2.ServiceFault() service_fault.fault_id.fault_name = f'{service_name}' service_fault.fault_id.service_name = service_name service_fault.severity = service_fault_pb2.ServiceFault.SEVERITY_CRITICAL check_period = 0.5 # seconds. while True: time.sleep(check_period) # Don't fault the service if it doesn't actually exist. try: registered_service = directory_client.get_entry(service_name) except NonexistentServiceError as exc: continue set_fault = False unavailable_services = [] for service in prereq_services: # Make sure the prereq service exists. try: registered_service = directory_client.get_entry(service) except NonexistentServiceError as exc: set_fault = True unavailable_services.append(service) continue # Make sure the prereq service isn't faulted. state = robot_state_client.get_robot_state() for fault in state.service_fault_state.faults: if fault.fault_id.service_name == service: set_fault = True unavailable_services.append(service) break # Fault the service. if set_fault: service_fault.error_message = 'Faulted due to issues with ' + ','.join( unavailable_services) try: fault_client.trigger_service_fault(service_fault) except ServiceFaultAlreadyExistsError: pass except (RpcError, FaultResponseError) as exc: _LOGGER.error(f"Failed to set {service_name} fault. {exc}") # Otherwise, clear the fault if it exists. else: try: fault_client.clear_service_fault(service_fault.fault_id) set_fault = False except ServiceFaultDoesNotExistError: pass except (RpcError, FaultResponseError) as exc: _LOGGER.error(f"Failed to clear {service_name} fault. {exc}")