Source code for bosdyn.client.command_line

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Command-line utility code for interacting with robot services."""

from __future__ import division

import abc
import argparse
import datetime
import os
import signal
import socket
import sys
import threading
import time

from deprecated.sphinx import deprecated
from google.protobuf import json_format

import bosdyn.client
from bosdyn.api import data_acquisition_pb2, image_pb2
from bosdyn.api.data_buffer_pb2 import Event, TextMessage
from bosdyn.api.data_index_pb2 import EventsCommentsSpec
from bosdyn.api.keepalive import keepalive_pb2
from bosdyn.api.robot_state_pb2 import BehaviorFault
from bosdyn.util import duration_str, timestamp_to_datetime

from .auth import InvalidLoginError, InvalidTokenError
from .data_acquisition import DataAcquisitionClient
from .data_acquisition_helpers import acquire_and_process_request
from .data_acquisition_plugin import DataAcquisitionPluginClient
from .data_buffer import DataBufferClient
from .data_service import DataServiceClient
from .directory import DirectoryClient, NonexistentServiceError
from .directory_registration import DirectoryRegistrationClient, DirectoryRegistrationResponseError
from .estop import EstopClient, EstopEndpoint, EstopKeepAlive
from .exceptions import Error, InvalidRequestError, ProxyConnectionError
from .image import (ImageClient, ImageResponseError, UnknownImageSourceError, build_image_request,
                    save_images_as_files)
from .keepalive import KeepaliveClient
from .lease import LeaseClient
from .license import LicenseClient
from .local_grid import LocalGridClient
from .log_status import InactiveLogError, LogStatusClient
from .payload import PayloadClient
from .payload_registration import PayloadAlreadyExistsError, PayloadRegistrationClient
from .power import (PowerClient, power_cycle_robot, power_off_payload_ports, power_off_robot,
                    power_off_wifi_radio, power_on_payload_ports, power_on_wifi_radio)
from .robot_id import RobotIdClient
from .robot_state import RobotStateClient
from .time_sync import TimeSyncClient, TimeSyncEndpoint, TimeSyncError, timespec_to_robot_timespan
from .util import add_common_arguments, authenticate, setup_logging



# pylint: disable=too-few-public-methods
[docs]class Command(object, metaclass=abc.ABCMeta): """Command-line command. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ # The name of the command the user should enter on the command line to select this command. NAME = None # Whether authentication is needed before the command is run. # Most commands need authentication. NEED_AUTHENTICATION = True def __init__(self, subparsers, command_dict): command_dict[self.NAME] = self self._parser = subparsers.add_parser(self.NAME, help=self.__doc__)
[docs] def run(self, robot, options): """Invoke the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ try: if self.NEED_AUTHENTICATION: if hasattr(options, 'username') and hasattr( options, 'password') and (options.username or options.password): robot.authenticate(options.username, options.password) else: authenticate(robot) robot.sync_with_directory() # Make sure that we can use all registered services. return self._run(robot, options) except ProxyConnectionError: print('Could not contact robot with hostname "{}".'.format(options.hostname), file=sys.stderr) except InvalidTokenError: print('The provided user token is invalid.', file=sys.stderr) except InvalidLoginError: print('Username and/or password are invalid.', file=sys.stderr) except Error as err: print('{}: {}'.format(type(err).__name__, err), file=sys.stderr)
@abc.abstractmethod def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """
[docs]class Subcommands(Command): """Run subcommands. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. subcommands: List of subcommands to run. """ def __init__(self, subparsers, command_dict, subcommands): super(Subcommands, self).__init__(subparsers, command_dict) command_dest = '{}_command'.format(self.NAME) cmd_subparsers = self._parser.add_subparsers(title=self.__doc__, dest=command_dest) cmd_subparsers.required = True self._subcommands = {} for subcommand in subcommands: subcommand(cmd_subparsers, self._subcommands) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: Execution of the specific subcommand from the options. """ command_dest = '{}_command'.format(self.NAME) subcommand = vars(options)[command_dest] return self._subcommands[subcommand].run(robot, options)
[docs]class DirectoryCommands(Subcommands): """Commands related to the directory service.""" NAME = 'dir' def __init__(self, subparsers, command_dict): """Commands related to the directory service. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DirectoryCommands, self).__init__(subparsers, command_dict, [ DirectoryListCommand, DirectoryGetCommand, DirectoryRegisterCommand, DirectoryUnregisterCommand ])
def _format_dir_entry( # pylint: disable=too-many-arguments name, service_type, authority, tokens, name_width=23, type_width=31, authority_width=27): """Prints the passed values as "name service_type authority tokens", with the first three using the specified width. Args: name: Name of the service. service_type: Type of the service. authority: Authority of the service. tokens: Tokens required for using the service. name_width: Width for printing the name value. type_width: Width for printing the service_type value. authority_width: Width for printing the authority value. """ print(('{:' + str(name_width) + '} {:' + str(type_width) + '} {:' + str(authority_width) + '} {}').format(name, service_type, authority, tokens)) def _token_req_str(entry): """Returns a string representing tokens required for using the service. Args: entry: Service entry being checked. Returns: String with a comma-separated list of required tokens. """ required = [] if entry.user_token_required: required.append('user') if not required: return '' return ', '.join(required) def _show_directory_list(robot, as_proto=False): """Print service directory list for robot. Args: robot: Robot object used to get the list of services. as_proto: Boolean to determine whether the directory entries should be printed as full proto definitions or formatted strings. Returns: True """ entries = robot.ensure_client(DirectoryClient.default_service_name).list() if not entries: print("No services found") return True if as_proto: for entry in entries: print(entry) return True max_name_length = max(len(entry.name) for entry in entries) max_type_length = max(len(entry.type) for entry in entries) max_authority_length = max(len(entry.authority) for entry in entries) _format_dir_entry('name', 'type', 'authority', 'tokens', max_name_length + 4, max_type_length + 4, max_authority_length + 4) print("-" * (20 + max_name_length + max_type_length + max_authority_length)) for entry in entries: _format_dir_entry(entry.name, entry.type, entry.authority, _token_req_str(entry), max_name_length + 4, max_type_length + 4, max_authority_length + 4) return True def _show_directory_entry(robot, service, as_proto=False): """Print a service directory entry. Args: robot: Robot object used to get the list of services. service: Name of the service to print. as_proto: Boolean to determine whether the directory entries should be printed as full proto definitions or formatted strings. Returns: True """ entry = robot.ensure_client(DirectoryClient.default_service_name).get_entry(service) if as_proto: print(entry) else: _format_dir_entry('name', 'type', 'authority', 'tokens') print("-" * 90) _format_dir_entry(entry.name, entry.type, entry.authority, _token_req_str(entry)) return True
[docs]class DirectoryListCommand(Command): """List all services in the directory.""" NAME = 'list' def __init__(self, subparsers, command_dict): """List all services in the directory. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DirectoryListCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ _show_directory_list(robot, as_proto=options.proto) return True
[docs]class DirectoryGetCommand(Command): """Get entry for a given service in the directory.""" NAME = 'get' def __init__(self, subparsers, command_dict): """Get entry for a given service in the directory. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DirectoryGetCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') self._parser.add_argument('service', help='service name to get entry for') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: False if NonexistentServiceError is caught, True otherwise. """ try: _show_directory_entry(robot, options.service, as_proto=options.proto) except NonexistentServiceError: print('The requested service name "{}" does not exist. Available services:'.format( options.service)) _show_directory_list(robot, as_proto=options.proto) return False return True
[docs]class DirectoryRegisterCommand(Command): """Register entry for a service in the directory.""" NAME = 'register' def __init__(self, subparsers, command_dict): """Register entry for a service in the directory. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DirectoryRegisterCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--service-name', required=True, help='unique name of the service') self._parser.add_argument('--service-type', required=True, help='Type of the service, e.g. bosdyn.api.RobotStateService') self._parser.add_argument('--service-authority', required=True, help='authority of the service') self._parser.add_argument('--service-hostname', required=True, help='hostname of the service computer') self._parser.add_argument('--service-port', required=True, type=int, help='port the service is running on') self._parser.add_argument('--no-user-token', action='store_true', required=False, help='disable requirement for user token') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: False if DirectoryRegistrationResponseError is caught, True otherwise. """ directory_registration_client = robot.ensure_client( DirectoryRegistrationClient.default_service_name) try: directory_registration_client.register(options.service_name, options.service_type, options.service_authority, options.service_hostname, options.service_port, user_token_required=not options.no_user_token) except DirectoryRegistrationResponseError as exc: # pylint: disable=no-member print("Failed to register service {}.\nResponse Status: {}".format( options.service_name, bosdyn.api.directory_registration_pb2.RegisterServiceResponse.Status.Name( exc.response.status))) return False else: print("Successfully registered service {}".format(options.service_name)) return True
[docs]class DirectoryUnregisterCommand(Command): """Unregister entry for a service in the directory.""" NAME = 'unregister' def __init__(self, subparsers, command_dict): """Unregister entry for a service in the directory. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DirectoryUnregisterCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--service-name', required=True, help='unique name of the service') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: False if DirectoryRegistrationResponseError is caught, True otherwise. """ directory_registration_client = robot.ensure_client( DirectoryRegistrationClient.default_service_name) try: directory_registration_client.unregister(options.service_name) except DirectoryRegistrationResponseError as exc: # pylint: disable=no-member print("Failed to unregister service {}.\nResponse Status: {}".format( options.service_name, bosdyn.api.directory_registration_pb2.UnregisterServiceResponse.Status.Name( exc.response.status))) return False else: print("Successfully unregistered service {}".format(options.service_name)) return True
[docs]class PayloadCommands(Subcommands): """Commands related to the payload and payload registration services.""" NAME = 'payload' def __init__(self, subparsers, command_dict): """Commands related to the payload and payload registration services. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(PayloadCommands, self).__init__(subparsers, command_dict, [PayloadListCommand, PayloadRegisterCommand])
def _show_payload_list(robot, as_proto=False): """Print payload list for robot. Args: robot: Robot object used to get the list of services. as_proto: Boolean to determine whether the payload entries should be printed as full proto definitions or formatted strings. Returns: True """ payload_protos = robot.ensure_client(PayloadClient.default_service_name).list_payloads() if not payload_protos: print("No payloads found") return True if as_proto: for payload in payload_protos: print(payload) return True # Print out the payload name, description, and GUID in columns with set width. name_width = 30 description_width = 60 guid_width = 36 print(('\n{:' + str(name_width) + '} {:' + str(description_width) + '} {:' + str(guid_width) + '}').format('Name', 'Description', 'GUID')) print("-" * (5 + name_width + description_width + guid_width)) for payload in payload_protos: print(('{:' + str(name_width) + '} {:' + str(description_width) + '} {:' + str(guid_width) + '}').format(payload.name, payload.description, payload.GUID)) return True
[docs]class PayloadListCommand(Command): """List all payloads registered with the robot.""" NAME = 'list' def __init__(self, subparsers, command_dict): """List all payloads registered with the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(PayloadListCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ _show_payload_list(robot, as_proto=options.proto) return True
def _register_payload(robot, name, guid, secret): """Register a payload with the robot. Args: robot: Robot object used to register the payload. name: The name that will be assigned to the registered payload. guid: The GUID that will be assigned to the registered payload. secret: The secret that the payload will be registered with. Returns: True """ payload_registration_client = robot.ensure_client( PayloadRegistrationClient.default_service_name) payload = bosdyn.api.payload_pb2.Payload(GUID=guid, name=name) try: payload_registration_client.register_payload(payload, secret) except PayloadAlreadyExistsError: print('\nA payload with this GUID is already registered. Check the robot Admin Console.') else: print('\nPayload successfully registered with the robot.\n' 'Before it can be used, the payload must be authorized in the Admin Console.') return True
[docs]class PayloadRegisterCommand(Command): """Register a payload with the robot.""" NAME = 'register' def __init__(self, subparsers, command_dict): """Register a payload with the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(PayloadRegisterCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--payload-name', required=True, help='name of the payload') self._parser.add_argument('--payload-guid', required=True, help='guid of the payload') self._parser.add_argument('--payload-secret', required=True, help='secret for the payload') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ _register_payload(robot, options.payload_name, options.payload_guid, options.payload_secret)
[docs]class FaultCommands(Subcommands): """Commands related to the fault service and robot state service (for fault reading).""" NAME = 'fault' def __init__(self, subparsers, command_dict): """Commands related to the fault service and robot state service (for fault reading). Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(FaultCommands, self).__init__(subparsers, command_dict, [FaultShowCommand, FaultWatchCommand])
def _show_all_faults(robot): """Print faults for the robot. Args: robot: Robot object used to get the list of services. """ robot_state_client = robot.ensure_client(RobotStateClient.default_service_name) robot_state = robot_state_client.get_robot_state() system_fault_state = robot_state.system_fault_state behavior_fault_state = robot_state.behavior_fault_state service_fault_state = robot_state.service_fault_state print('\n' + '-' * 80) if len(system_fault_state.faults) == 0: print("No active system faults.") else: for fault in system_fault_state.faults: print(''' {fault.name} Error Message: {fault.error_message} Onset Time: {timestamp}''' \ .format(fault=fault, timestamp=timestamp_to_datetime(fault.onset_timestamp))) print() if len(behavior_fault_state.faults) == 0: print("No active behavior faults.") else: for fault in behavior_fault_state.faults: print(''' {cause} Onset Time: {timestamp} Clearable: {clearable}''' \ .format(cause=BehaviorFault.Cause.Name(fault.cause), timestamp=timestamp_to_datetime(fault.onset_timestamp), clearable=BehaviorFault.Status.Name(fault.status))) print() if len(service_fault_state.faults) == 0: print("No active service faults.") else: for fault in service_fault_state.faults: print(''' {fault.fault_id.fault_name} Service Name: {fault.fault_id.service_name} Payload GUID: {fault.fault_id.payload_guid} Error Message: {fault.error_message} Onset Time: {timestamp}'''\ .format(fault=fault, timestamp=timestamp_to_datetime(fault.onset_timestamp)))
[docs]class FaultShowCommand(Command): """Show all faults currently active in robot state.""" NAME = 'show' def __init__(self, subparsers, command_dict): """Show all faults currently active in robot state. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(FaultShowCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ _show_all_faults(robot) return True
[docs]class FaultWatchCommand(Command): """Watch all faults in robot state and print them out.""" NAME = 'watch' def __init__(self, subparsers, command_dict): """Watch all service faults in robot state and print them out. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(FaultWatchCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ print('Press Ctrl-C or send SIGINT to exit\n\n') try: while True: _show_all_faults(robot) time.sleep(1) except KeyboardInterrupt: pass return True
[docs]class LogStatusCommands(Subcommands): """Start, update and terminate experiment logs, start and terminate retro logs and check status of active logs for robot.""" NAME = 'log-status' NEED_AUTHENTICATION = True def __init__(self, subparsers, command_dict): """Interact with logs for robot Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(LogStatusCommands, self).__init__(subparsers, command_dict, [ GetLogCommand, GetActiveLogStatusesCommand, ExperimentLogCommand, StartRetroLogCommand, TerminateLogCommand, ])
[docs]class GetLogCommand(Command): """Get log status but log id.""" NAME = 'get' def __init__(self, subparsers, command_dict): """Get log status from robot Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetLogCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('id', help='id of log bundle to display') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ client = robot.ensure_client(LogStatusClient.default_service_name) response = client.get_log_status(options.id) print(response.log_status) return True
[docs]class GetActiveLogStatusesCommand(Command): """Get active log bundles for robot.""" NAME = 'active' def __init__(self, subparsers, command_dict): """Retrieve active log statuses for robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetActiveLogStatusesCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ client = robot.ensure_client(LogStatusClient.default_service_name) response = client.get_active_log_statuses() print(response.log_statuses) return True
[docs]class ExperimentLogCommand(Subcommands): """Give experiment log commands to robot.""" NAME = 'experiment' def __init__(self, subparsers, command_dict): """Start a timed or continuous experiment log. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(ExperimentLogCommand, self).__init__(subparsers, command_dict, [ StartTimedExperimentLogCommand, StartContinuousExperimentLogCommand, ])
[docs]class StartTimedExperimentLogCommand(Command): """Start a timed experiment log.""" NAME = 'timed' def __init__(self, subparsers, command_dict): """Start timed experiment log Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(StartTimedExperimentLogCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('seconds', type=float, help='how long should the experiment run?') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ client = robot.ensure_client(LogStatusClient.default_service_name) response = client.start_experiment_log(options.seconds) print(response.log_status) return True
[docs]class StartContinuousExperimentLogCommand(Command): """Start a continuous experiment log.""" NAME = 'continuous' def __init__(self, subparsers, command_dict): """Start continuous experiment log, defaulted to update keep alive time by 10 seconds every 5 seconds. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(StartContinuousExperimentLogCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('-sleep', type=float, default=5, help='how long should thread sleep before extending')
[docs] @staticmethod def handle_keyboard_interruption(client, log_id): try: print(" Received keyboard interruption\n\n") response = client.terminate_log(log_id) print(response.log_status) except KeyboardInterrupt: client.terminate_log_async(log_id) print("Log will terminate shortly") response = client.get_log_status(log_id) print(response.log_status)
def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ client = robot.ensure_client(LogStatusClient.default_service_name) response = client.start_experiment_log(options.sleep * 2) log_id = response.log_status.id print("Experiment log id: ", log_id) print('Use terminate command, press Ctrl-C or send SIGINT to complete log\n\n') try: while True: time.sleep(options.sleep) client.update_experiment(log_id, options.sleep * 2) except InactiveLogError: response = client.get_log_status(log_id) print(response.log_status) except KeyboardInterrupt: self.handle_keyboard_interruption(client, log_id) return True
[docs]class StartRetroLogCommand(Command): """Start a retro log.""" NAME = 'retro' def __init__(self, subparsers, command_dict): """Start a retro log Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(StartRetroLogCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('seconds', type=float, help='how long should the retro log run?') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ client = robot.ensure_client(LogStatusClient.default_service_name) response = client.start_retro_log(options.seconds) print(response.log_status) return True
[docs]class TerminateLogCommand(Command): """Terminate log gathering process.""" NAME = 'terminate' def __init__(self, subparsers, command_dict): """Terminate log on robot Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(TerminateLogCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('id', help='id of log to terminate') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ client = robot.ensure_client(LogStatusClient.default_service_name) response = client.terminate_log(options.id) print(response.log_status) return True
[docs]class RobotIdCommand(Command): """Show robot-id.""" NAME = 'id' NEED_AUTHENTICATION = False def __init__(self, subparsers, command_dict): """Show robot-id. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(RobotIdCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ proto = robot.ensure_client(RobotIdClient.default_service_name).get_id() if options.proto: print(proto) return True nickname = '' if proto.nickname and proto.nickname != proto.serial_number: nickname = proto.nickname release = proto.software_release version = release.version print(u"{:20} {:15} {:10} {} ({})".format(proto.serial_number, proto.computer_serial_number, nickname, proto.species, proto.version)) print(" Software: {}.{}.{} ({} {})".format(version.major_version, version.minor_version, version.patch_level, release.changeset, timestamp_to_datetime(release.changeset_date))) print(" Installed: {}".format(timestamp_to_datetime(release.install_date))) return True
[docs]class DataBufferCommands(Subcommands): """Commands related to the data-buffer service.""" NAME = 'log' def __init__(self, subparsers, command_dict): """Commands related to the data-buffer service. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DataBufferCommands, self).__init__(subparsers, command_dict, [TextMsgCommand, OperatorCommentCommand])
[docs]class TextMsgCommand(Command): """Send a text-message to the data buffer to be logged.""" NAME = 'textmsg' def __init__(self, subparsers, command_dict): """Send a text-message to the data buffer to be logged. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(TextMsgCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--timestamp', action='store_true', help='achieve time-sync and send timestamp') self._parser.add_argument('--tag', help='Tag for message') parser_log_level = self._parser.add_mutually_exclusive_group() parser_log_level.add_argument('--debug', '-D', action='store_true', help='Log at debug-level') parser_log_level.add_argument('--info', '-I', action='store_true', help='Log at info-level') parser_log_level.add_argument('--warn', '-W', action='store_true', help='Log at warn-level') parser_log_level.add_argument('--error', '-E', action='store_true', help='Log at error-level') self._parser.add_argument('message', help='Message to log') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: False if TimeSyncError is caught, True otherwise. """ robot_timestamp = None if options.timestamp: try: robot_timestamp = robot.time_sync.robot_timestamp_from_local_secs( time.time(), timesync_timeout_sec=1.0) except TimeSyncError as err: print("Failed to send message with timestamp: {}.".format(err)) return False msg_proto = TextMessage(message=options.message, timestamp=robot_timestamp) # pylint: disable=no-member if options.debug: msg_proto.level = TextMessage.LEVEL_DEBUG elif options.warn: msg_proto.level = TextMessage.LEVEL_WARN elif options.error: msg_proto.level = TextMessage.LEVEL_ERROR else: msg_proto.level = TextMessage.LEVEL_INFO if options.tag: msg_proto.tag = options.tag data_buffer_client = robot.ensure_client(DataBufferClient.default_service_name) data_buffer_client.add_text_messages([msg_proto]) return True
[docs]class OperatorCommentCommand(Command): """Send an operator comment to the robot to be logged.""" NAME = 'comment' def __init__(self, subparsers, command_dict): """Send an operator comment to the robot to be logged. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(OperatorCommentCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--timestamp', action='store_true', help='achieve time-sync and send timestamp') self._parser.add_argument('message', help='operator comment text') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: False if TimeSyncError is caught, True otherwise. """ client_timestamp = None if options.timestamp: client_timestamp = time.time() try: robot.time_sync.wait_for_sync(timeout_sec=1.0) except TimeSyncError as err: print("Failed to get timesync for setting comment timestamp: {}.".format(err)) return False robot.operator_comment(options.message, timestamp_secs=client_timestamp) return True
[docs]class DataServiceCommands(Subcommands): """Commands for querying the data-service.""" NAME = 'data' def __init__(self, subparsers, command_dict): """Commands for querying the data-service Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DataServiceCommands, self).__init__( subparsers, command_dict, [GetDataBufferCommentsCommand, GetDataBufferEventsCommand, GetDataBufferStatusCommand])
[docs]class GetDataBufferEventsCommentsCommand(Command): """Parent class for commands grabbing operator comment and events.""" def __init__(self, subparsers, command_dict): """Get operator comments from the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetDataBufferEventsCommentsCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print in proto format') self._parser.add_argument( '-T', '--timespan', help='Time span (default all). "1h" (last hour), "10m-5m" (from 10 to 5 minutes ago).') self._parser.add_argument('-R', '--robot-time', action='store_true', help='Specified timespan is in robot time') @abc.abstractmethod def _run(self, robot, options): pass
[docs] @abc.abstractmethod def pretty_print(self, values): """Print output of request in a human-friendly way."""
def _get_result(self, request_spec, robot, options, get_values_fn): client = robot.ensure_client(DataServiceClient.default_service_name) if options.timespan: if options.robot_time: time_sync_endpoint = None else: time_sync_client = robot.ensure_client(TimeSyncClient.default_service_name) time_sync_endpoint = TimeSyncEndpoint(time_sync_client) if not time_sync_endpoint.establish_timesync(): print("Failed to get timesync for requesting comments.") return False time_range = timespec_to_robot_timespan(options.timespan, time_sync_endpoint) # pylint: disable=no-member request_spec.time_range.CopyFrom(time_range) values = get_values_fn(client.get_events_comments(request_spec)) if options.proto: print(values) else: self.pretty_print(values) return True
[docs]class GetDataBufferCommentsCommand(GetDataBufferEventsCommentsCommand): """Get operator comments from the robot.""" NAME = 'comments' def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ request_spec = EventsCommentsSpec(comments=True) def _get_comments(response): return response.events_comments.operator_comments return self._get_result(request_spec, robot, options, _get_comments)
[docs] def pretty_print(self, values): # pylint: disable=no-self-use last_date_shown = None for comment in values: dtm = datetime.datetime.fromtimestamp(comment.timestamp.seconds + comment.timestamp.seconds * 1e-9) if dtm.date() != last_date_shown: print("\n[{}]".format(dtm.date())) last_date_shown = dtm.date() print(" {} {}".format(dtm.time(), comment.message.strip()))
[docs]class GetDataBufferEventsCommand(GetDataBufferEventsCommentsCommand): """Get events from the robot.""" NAME = 'events' def __init__(self, subparsers, command_dict): """Get operator comments from the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetDataBufferEventsCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--type', help='query for only the given event-type') # pylint: disable=no-member self._parser.add_argument( '--level', choices=Event.Level.keys()[1:], # slice skips UNSET help='limit level to this and above') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ request_spec = EventsCommentsSpec() event_spec = request_spec.events.add() # pylint: disable=no-member if options.type: event_spec.type = options.type if options.level: event_spec.level.value = Event.Level.Value(options.level) # pylint: disable=no-member def _get_events(response): return response.events_comments.events return self._get_result(request_spec, robot, options, _get_events) @staticmethod def _level_name(event): prefix = 'LEVEL_' # pylint: disable=no-member name = event.Level.Name(event.level) if name.startswith(prefix): return name[len(prefix):] return name
[docs] def pretty_print(self, values): # pylint: disable=no-self-use last_date_shown = None for event in values: start_secs = event.start_time.seconds + event.start_time.seconds * 1e-9 start_dt = datetime.datetime.fromtimestamp(start_secs) if start_dt.date() != last_date_shown: print("\n[{}]".format(start_dt.date())) last_date_shown = start_dt.date() if event.end_time and event.end_time != event.start_time: end_secs = event.end_time.seconds + event.end_time.seconds * 1e-9 end_dt = datetime.datetime.fromtimestamp(end_secs) print(" {}-{} (END) ({:16}) {:16} {:16} <{}> ".format( start_dt.time(), end_dt.time(), end_secs - start_secs, event.type, self._level_name(event), event.source)) else: timing = '' if event.end_time else '(START)' print(" {} {} {:16} {:16} <{}> ".format(start_dt.time(), timing, event.type, self._level_name(event), event.source)) if event.description: print("\t{}".format(event.description))
[docs]class GetDataBufferStatusCommand(Command): """Get status of data-buffer on robot.""" NAME = 'status' def __init__(self, subparsers, command_dict): """Get status of data-buffer on robot Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetDataBufferStatusCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--get-blob-specs', '-B', action='store_true', help='get list of channel/msgtype/source combinations') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ client = robot.ensure_client(DataServiceClient.default_service_name) print(client.get_data_buffer_status(get_blob_specs=options.get_blob_specs)) return True
[docs]class RobotStateCommands(Subcommands): """Commands for querying robot state.""" NAME = 'state' def __init__(self, subparsers, command_dict): """Commands for querying robot state. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(RobotStateCommands, self).__init__( subparsers, command_dict, [FullStateCommand, HardwareConfigurationCommand, MetricsCommand, RobotModel])
[docs]class FullStateCommand(Command): """Show robot state.""" NAME = 'full' def __init__(self, subparsers, command_dict): """Show robot state. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(FullStateCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command; prints RobotState proto. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ proto = robot.ensure_client(RobotStateClient.default_service_name).get_robot_state() print(proto) return True
[docs]class HardwareConfigurationCommand(Command): """Show robot hardware configuration.""" NAME = 'hardware' def __init__(self, subparsers, command_dict): """Show robot hardware configuration. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(HardwareConfigurationCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command; prints HardwareConfiguration proto. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ proto = robot.ensure_client( RobotStateClient.default_service_name).get_robot_hardware_configuration() print(proto) return True
[docs]class RobotModel(Command): """Write robot URDF and mesh to local files.""" NAME = 'model' NEED_AUTHENTICATION = False def __init__(self, subparsers, command_dict): """Write robot URDF and mesh to local files. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(RobotModel, self).__init__(subparsers, command_dict) self._parser.add_argument('--outdir', default='Model_Files', help='directory into which to save the files') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ robot_state_client = robot.ensure_client(RobotStateClient.default_service_name) hardware = robot_state_client.get_robot_hardware_configuration() # Write files in user-specified directory, or use default name model_directory = options.outdir # Make the directory, if it does not already exist try: os.makedirs(model_directory) except OSError: pass # Write each link model to its own file for link in hardware.skeleton.links: # Request a Skeleton.Link.ObjModel from the robot for link.name and write it to a file try: obj_model_proto = robot_state_client.get_robot_link_model(link.name) except InvalidRequestError as err: print(err, end='') print(" Name of link: " + link.name) continue # If file_name is empty, ignore if not obj_model_proto.file_name: continue # Write to a file, ignoring the robot path sub_path = '/'.join(obj_model_proto.file_name.split('/')[:-1]) # robot defined path path = os.path.join(model_directory, sub_path) # local path + robot path try: os.makedirs(path) except OSError: pass path_and_name = os.path.join(path, obj_model_proto.file_name.split('/')[-1]) with open(path_and_name, "w") as obj_file: obj_file.write(obj_model_proto.file_contents) print('Link file written to ' + path_and_name) # Write the corresponding urdf file inside the link directory with open(os.path.join(model_directory, "model.urdf"), "w") as urdf_file: urdf_file.write(hardware.skeleton.urdf) print('URDF file written to ' + os.path.join(model_directory, "model.urdf")) return True
[docs]class MetricsCommand(Command): """Show metrics (runtime, etc...).""" NAME = 'metrics' def __init__(self, subparsers, command_dict): """Show metrics (runtime, etc...). Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(MetricsCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print metrics in proto format') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ proto = robot.ensure_client(RobotStateClient.default_service_name).get_robot_metrics() if options.proto: print(proto) return True for metric in proto.metrics: print(self._format_metric(metric)) return True @staticmethod def _secs_to_hms(seconds): """Converts seconds to hours:minutes:seconds string. Args: seconds: Seconds to convert. Returns: String in the format hours:MM:SS. """ isecs = int(seconds) seconds = isecs % 60 minutes = (isecs // 60) % 60 hours = isecs // 3600 return '{:d}:{:02d}:{:02d}'.format(hours, minutes, seconds) @staticmethod def _distance_str(meters): """Converts distance to human-readable string. Args: meters: Distance in meters to convert. Returns: String with meters with two digit precision if distance is less than 1000 meters, or kilometers with two digit precision otherwise. """ if meters < 1000: return '{:.2f} m'.format(meters) return '{:.2f} km'.format(float(meters) / 1000) @staticmethod def _timestamp_str(timestamp): """Converts a timestamp to a human-readable string. Args: timestamp: Protobuf timestamp to convert Returns: Timestamp string in ISO 8601 format """ # The json format of a timestamp is a string that looks like '"2022-01-12T21:56:05Z"', # so we strip off the outer quotes and return that. return json_format.MessageToJson(timestamp).strip('"') @staticmethod def _format_metric(metric): # pylint: disable=too-many-return-statements """Convert metric input to human-readable string. Args: metric: Input metric object to convert. Returns: String in the format: Label float_value units. """ field = metric.WhichOneof('values') if field is None: return '{:20} missing value'.format(metric.label) value = getattr(metric, field) # Special case formatting if field == 'duration': return '{:20} {}'.format(metric.label, MetricsCommand._secs_to_hms(value.seconds)) elif field == 'timestamp': return '{:20} {}'.format(metric.label, MetricsCommand._timestamp_str(value)) elif field == 'float_value': if metric.units == 'm': return '{:20} {}'.format(metric.label, MetricsCommand._distance_str(value)) return '{:20} {:.2f} {}'.format(metric.label, value, metric.units) else: # Default formatting return '{:20} {} {}'.format(metric.label, value, metric.units)
[docs]class TimeSyncCommand(Command): """Find clock difference between this and the robot clock.""" NAME = 'time-sync' def __init__(self, subparsers, command_dict): """Find clock difference between this and the robot clock. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(TimeSyncCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: False if timesync cannot be established, True otherwise. """ endpoint = TimeSyncEndpoint(robot.ensure_client(TimeSyncClient.default_service_name)) if not endpoint.establish_timesync(break_on_success=True): print("Failed to achieve time sync") return False if options.proto: print(endpoint.response) return True print("GRPC round-trip time: {}".format(duration_str(endpoint.round_trip_time))) print("Local time to robot time: {}".format(duration_str(endpoint.clock_skew))) return True
[docs]class LicenseCommand(Command): """Show installed license.""" NAME = "license" def __init__(self, subparsers, command_dict): super(LicenseCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') self._parser.add_argument('-f', '--feature-codes', nargs='+', help='Optional feature list for GetFeatureEnabled API.') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ license_client = robot.ensure_client(LicenseClient.default_service_name) self._get_license_info(license_client, options) self._get_feature_enabled(license_client, options) return True def _get_license_info(self, license_client, options): license_info = license_client.get_license_info() if options.proto: print(license_info) else: print(str(license_info)) def _get_feature_enabled(self, license_client, options): if not options.feature_codes or len(options.feature_codes) == 0: return feature_enabled = license_client.get_feature_enabled(options.feature_codes) for feature in feature_enabled: if feature_enabled[feature]: print(f"Feature {feature} is enabled.") else: print(f"Feature {feature} is not enabled.")
[docs]class LeaseCommands(Subcommands): """Commands related to the lease service.""" NAME = 'lease' def __init__(self, subparsers, command_dict): """Commands related to the lease service. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(LeaseCommands, self).__init__(subparsers, command_dict, [LeaseListCommand])
[docs]class LeaseListCommand(Command): """List all leases.""" NAME = 'list' def __init__(self, subparsers, command_dict): """List all leases. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(LeaseListCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ lease_client = robot.ensure_client(LeaseClient.default_service_name) resources = lease_client.list_leases() if options.proto: print(resources) return True for resource in resources: print(self._format_lease_resource(resource)) return True @staticmethod def _format_lease_resource(resource): return str(resource)
[docs]class EstopCommands(Subcommands): """Commands for interacting with robot estop service.""" NAME = 'estop' def __init__(self, subparsers, command_dict): """Commands for interacting with robot estop service. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(EstopCommands, self).__init__(subparsers, command_dict, [BecomeEstopCommand, GetEstopConfigCommand, GetEstopStatusCommand])
[docs]class GetEstopConfigCommand(Command): """Get estop config of estop service.""" NAME = 'config' def __init__(self, subparsers, command_dict): """Call EstopService GetEstopConfig RPC. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetEstopConfigCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ client = robot.ensure_client(EstopClient.default_service_name) config = client.get_config() print(config)
[docs]class GetEstopStatusCommand(Command): """Get estop status of estop service.""" NAME = 'status' def __init__(self, subparsers, command_dict): """Call EstopService GetEstopSystemStatus RPC. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetEstopStatusCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ client = robot.ensure_client(EstopClient.default_service_name) status = client.get_status() print(status)
[docs]class BecomeEstopCommand(Command): """Grab and hold estop until Ctl-C.""" NAME = 'become-estop' _RPC_PRINT_CHOICES = ['timestamp', 'full'] def __init__(self, subparsers, command_dict): """Grab and hold estop until Ctl-C. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(BecomeEstopCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--timeout', type=float, help='EStop timeout (seconds)', default=10) self._parser.add_argument('--rpc-print', choices=self._RPC_PRINT_CHOICES, default='timestamp', help='How much of the request/response messages to print') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ # Get a client to the estop service. client = robot.ensure_client(EstopClient.default_service_name) # If we'd only like to print timestamps of messages, change the client's callbacks that # convert messages to string. def _timestamp_fmt_request(request): return '(request timestamp: {})'.format(request.header.request_timestamp.ToSeconds()) def _timestamp_fmt_response(response): return '(response timestamp: {})'.format(response.header.response_timestamp.ToSeconds()) if options.rpc_print == 'timestamp': client.request_trim_for_log = _timestamp_fmt_request client.response_trim_for_log = _timestamp_fmt_response # Create the endpoint to the robot estop system. # Timeout should be chosen to balance safety considerations with expected service latency. # See the estop documentation for details. endpoint = EstopEndpoint(client, f"command-line-{socket.gethostname()}", options.timeout) # Have this endpoint to set up the robot's estop system such that it is the sole estop. # See the function's docstring and the estop documentation for details. endpoint.force_simple_setup() # Create the helper class that does periodic check-ins. This also starts the checking-in. exit_signal = threading.Event() # Define a function to call on SIGINT that asserts an estop and cleanly shuts down. def sigint_handler(sig, frame): """Cleanly shut down the application on interrupt.""" #pylint: disable=unused-argument # Signal that we want to exit the program. exit_signal.set() print('Press Ctrl-C or send SIGINT to exit') # Install our signal handler function. signal.signal(signal.SIGINT, sigint_handler) with EstopKeepAlive(endpoint) as keep_alive: # Now we wait. The thread in EstopKeepAlive will continue sending messages to the estop # service on its own. while not exit_signal.wait(10): pass # EStop the robot. keep_alive.stop() # The keep_alive object's exit() function will shut down the thread. # This will let another endpoint fill our role, if they want to use the current # configuration. For details, see the estop documentation. endpoint.deregister() return True
[docs]class OldBecomeEstopCommand(BecomeEstopCommand): """Old version of BecomeEstopCommand."""
[docs] def run(self, robot, options): print('DEPRECATION WARNING: This command is now "bosdyn.client estop become-estop"') return BecomeEstopCommand.run(self, robot, options)
[docs]class ImageCommands(Subcommands): """Commands for querying images.""" NAME = 'image' def __init__(self, subparsers, command_dict): """Commands for querying images. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(ImageCommands, self).__init__(subparsers, command_dict, [ListImageSourcesCommand, GetImageCommand])
def _show_image_sources_list(robot, as_proto=False, service_name=None): """Print available image sources. Args: robot: Robot object on which to run the command. as_proto: Boolean to determine whether to print the full proto message, or a human-readable string in the format: source_name (rows x cols) Returns: True. """ service_name = service_name or ImageClient.default_service_name proto = robot.ensure_client(service_name).list_image_sources() if as_proto: print(proto) else: for image_source in proto: image_formats = [image_pb2.Image.Format.Name(i)[7:] for i in image_source.image_formats] pixel_formats = [ image_pb2.Image.PixelFormat.Name(i)[13:] for i in image_source.pixel_formats ] print("{:30s} ({:d}x{:d}) {:15s} {:15s}".format(image_source.name, image_source.rows, image_source.cols, ','.join(image_formats), ','.join(pixel_formats))) return True
[docs]class ListImageSourcesCommand(Command): """List image sources.""" NAME = 'list-sources' def __init__(self, subparsers, command_dict): """List image sources. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(ListImageSourcesCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') self._parser.add_argument('--service-name', default=ImageClient.default_service_name, help='Image service to query') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: Output of _show_image_sources_list method. """ _show_image_sources_list(robot, as_proto=options.proto, service_name=options.service_name) return True
[docs]class GetImageCommand(Command): """Get an image from the robot and write it to an image file.""" NAME = 'get-image' def __init__(self, subparsers, command_dict): """Get an image from the robot and write it to an image file. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetImageCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--outfile', default=None, help='Filename into which to save the image') self._parser.add_argument('--quality-percent', type=int, default=75, help='Percent image quality (0-100)') self._parser.add_argument('source_name', metavar='SRC', nargs='+', help='Image source name') self._parser.add_argument('--service-name', help='Image service to query', default=ImageClient.default_service_name) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True if successful, False if exceptions are caught. """ image_requests = [ build_image_request(source_name, options.quality_percent) for source_name in options.source_name ] try: response = robot.ensure_client(options.service_name).get_image(image_requests) except UnknownImageSourceError: print('Requested image source "{}" does not exist. Available image sources:'.format( options.source_name)) _show_image_sources_list(robot, service_name=options.service_name) return False except ImageResponseError: print('Robot cannot generate the "{}" at this time. Retry the command.'.format( options.source_name)) return False # Save the image files in the correct format (jpeg, pgm for raw/rle). save_images_as_files(response) return True
[docs]class LocalGridCommands(Subcommands): """Commands for querying local grid maps.""" NAME = 'local_grid' def __init__(self, subparsers, command_dict): """Commands for querying local grid maps. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(LocalGridCommands, self).__init__(subparsers, command_dict, [ListLocalGridTypesCommand, GetLocalGridsCommand])
def _show_local_grid_sources_list(robot, as_proto=False): """Print available local grid sources. Args: robot: Robot object on which to run the command. as_proto: Boolean to determine whether to print the full proto message, or just the list of names. Returns: True. """ proto = robot.ensure_client(LocalGridClient.default_service_name).get_local_grid_types() if as_proto: print(proto) else: for local_grid_type in proto: print(local_grid_type.name) return True
[docs]class ListLocalGridTypesCommand(Command): """List local grid sources.""" NAME = 'types' def __init__(self, subparsers, command_dict): """List local grid sources. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(ListLocalGridTypesCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--proto', action='store_true', help='print listing in proto format') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: Output of _show_local_grid_sources_list method. """ _show_local_grid_sources_list(robot, as_proto=options.proto) return True
[docs]class GetLocalGridsCommand(Command): """Get local grids from the robot.""" NAME = 'get' def __init__(self, subparsers, command_dict): """Get local grids from the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(GetLocalGridsCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--outfile', default=None, help='filename into which to save the image') self._parser.add_argument('types', metavar='SRC', nargs='+', help='image types') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ response = robot.ensure_client(LocalGridClient.default_service_name).get_local_grids( options.types) for local_grid_response in response: print(local_grid_response) return True
[docs]class DataAcquisitionCommand(Subcommands): """Acquire data from the robot and add it in the data buffer with the metadata, or request status.""" NAME = 'acquire' def __init__(self, subparsers, command_dict): """Acquire data from the robot and add it in the data buffer with the metadata, or request status. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DataAcquisitionCommand, self).__init__(subparsers, command_dict, [ DataAcquisitionServiceCommand, DataAcquisitionRequestCommand, DataAcquisitionStatusCommand, DataAcquisitionGetLiveDataCommand ])
[docs]class DataAcquisitionRequestCommand(Command): """Capture and save images or metadata specified in the command line arguments.""" NAME = 'request' def __init__(self, subparsers, command_dict): """Capture and save images or metadata specified in the command line arguments. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DataAcquisitionRequestCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--image-source', metavar='IMG_SRC', default=[], help='Image source name', action='append') self._parser.add_argument('--image-service', metavar='SERVICE_NAME', default=[], help='Image service name for the image source.', action='append') self._parser.add_argument('--data-source', metavar='DATA_SRC', default=[], help='Data source name', action='append') self._parser.add_argument('--action-name', help='The action name to save the data with.', default="quick_captures") self._parser.add_argument('--group-name', help='The group name to save the data with.', default="command_line") self._parser.add_argument('--non-blocking-request', help='Return after making the acquisition request, without monitoring' +\ ' the status for completion.', default=False, action='store_true') def _run(self, robot, options): """Implementation of the 'request' command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ if not options.data_source and not (options.image_source and options.image_service): self._parser.error( 'A request requires either a data source name or an image source+service name.') if len(options.image_source) != len(options.image_service): self._parser.error( 'A request must have a 1:1 correspondence between image source and image service arguments.' ) captures = data_acquisition_pb2.AcquisitionRequestList() captures.data_captures.extend( [data_acquisition_pb2.DataCapture(name=data_name) for data_name in options.data_source]) img_captures = [] for i, src_name in enumerate(options.image_source): img_service = options.image_service[i] img_captures.append( data_acquisition_pb2.ImageSourceCapture(image_service=img_service, image_source=src_name)) captures.image_captures.extend(img_captures) robot.time_sync.wait_for_sync(timeout_sec=1.0) data_acquisition_client = robot.ensure_client(DataAcquisitionClient.default_service_name) success = acquire_and_process_request(data_acquisition_client, captures, options.group_name, options.action_name, block_until_complete=not options.non_blocking_request) return success
[docs]class DataAcquisitionServiceCommand(Command): """Get list of different data acquisition capabilities.""" NAME = 'info' def __init__(self, subparsers, command_dict): """Get list of different data acquisition capabilities. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DataAcquisitionServiceCommand, self).__init__(subparsers, command_dict) # Constants to describe width of columns for printing the data names and types self._data_type_width = 15 self._data_name_width = 35 self._service_name_width = 35 self._has_live_data_width = 30 def _format_and_print_capability(self, data_type, data_name, service_name="", has_live_data=""): """Print the data acquisition capability. Args: data_type (string): Either image or data capabilities. data_name (string): The name of the data acquisition capability service_name(string): For image capabilities, a service name is required. """ print( ('{:' + str(self._data_type_width) + '} {:' + str(self._data_name_width) + '} {:' + str(self._service_name_width) + '} {:' + str(self._has_live_data_width) + '}').format( data_type, data_name, service_name, has_live_data)) def _run(self, robot, options): """Implementation of the 'info' command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ capabilities = robot.ensure_client( DataAcquisitionClient.default_service_name).get_service_info() print("Data Acquisition Service's Available Capabilities\n") self._format_and_print_capability("Data Type", "Data Name", "(optional) Service Name", "(optional) has_live_data") print("-" * (self._data_type_width + self._data_name_width + self._service_name_width + self._has_live_data_width)) for data_name in capabilities.data_sources: self._format_and_print_capability("data", data_name.name, data_name.service_name, str(data_name.has_live_data)) for img_service in capabilities.image_sources: for img in img_service.image_source_names: self._format_and_print_capability("image", img, img_service.service_name) for ncb_worker in capabilities.network_compute_sources: for model in ncb_worker.models.data: self._format_and_print_capability("models", model.model_name, ncb_worker.server_config.service_name) return True
[docs]class DataAcquisitionStatusCommand(Command): """Get status of an acquisition request based on the request id.""" NAME = 'status' def __init__(self, subparsers, command_dict): """Get status of an acquisition request based on the request id. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DataAcquisitionStatusCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('id', type=int, help='Response id to get the status for') def _run(self, robot, options): """Implementation of the 'status' command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: False if NonexistentServiceError is caught, True otherwise. """ response = robot.ensure_client(DataAcquisitionClient.default_service_name).get_status( options.id) print(response) return True
[docs]class DataAcquisitionGetLiveDataCommand(Command): """Call GetLiveData based on service name.""" NAME = 'live' def __init__(self, subparsers, command_dict): """Call GetLiveData based on service name. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(DataAcquisitionGetLiveDataCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--data-source', metavar='DATA_SRC', default=[], help='Data source name', action='append', required=True) def _run(self, robot, options): """Implementation of the 'live' command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True once complete. """ daq_client = robot.ensure_client(DataAcquisitionClient.default_service_name) request = data_acquisition_pb2.LiveDataRequest() request.data_captures.extend( [data_acquisition_pb2.DataCapture(name=data_name) for data_name in options.data_source]) response = daq_client.get_live_data(request) print(response) return True
[docs]class HostComputerIPCommand(Command): """Determine a computer's IP address.""" NAME = 'self-ip' NEED_AUTHENTICATION = False def __init__(self, subparsers, command_dict): """Determine the IP address of the current computer used to talk to the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(HostComputerIPCommand, self).__init__(subparsers, command_dict) def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True """ print("The IP address of the computer used to talk to the robot is: %s" % (bosdyn.client.common.get_self_ip(robot._name)))
[docs]class PowerCommand(Subcommands): """Send power commands to the robot.""" NAME = 'power' def __init__(self, subparsers, command_dict): """Send power commands to the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(PowerCommand, self).__init__( subparsers, command_dict, [ PowerRobotCommand, PowerPayloadsCommand, PowerWifiRadioCommand, ])
[docs]class KeepaliveCommand(Subcommands): """Send keepalive commands to the robot.""" NAME = 'keepalive' def __init__(self, subparsers, command_dict): """Send keepalive commands to the robot. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(KeepaliveCommand, self).__init__(subparsers, command_dict, [ KeepaliveGetStatusCommand, KeepaliveRemovePoliciesCommand, ])
[docs]def lease_details(leases): """Returns list of <resource_name>:<sequence>, ...N.""" lease_strings = [] for lease in leases: sequence_string = ", ".join([str(lease_seq) for lease_seq in lease.sequence]) lease_strings.append(f"{lease.resource}:[{sequence_string}]") return ", ".join(lease_strings)
[docs]class KeepaliveGetStatusCommand(Command): """Get status of keepalive service.""" NAME = 'status' def __init__(self, subparsers, command_dict): """Call KeepaliveService GetStatus RPC. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(KeepaliveGetStatusCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('-f', '--full', action='store_true', default=False, help='Show full GetStatus proto as json.') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ client = robot.ensure_client(KeepaliveClient.default_service_name) status = client.get_status() # Give the option to print out the full message. if options.full: print(json_format.MessageToJson(status)) return True # If there are no policies, there probably isn't anything interesting here. if len(status.status) == 0: print("No active policies") return True # List all live policies in a concise message. if len(status.status) == 1: print("Robot returned 1 live policy:") else: print(f"Robot returned {len(status.status)} live policies:") for live_policy in status.status: rough_robot_timestamp = status.header.response_timestamp.ToSeconds() last_checkin = live_policy.last_checkin.ToSeconds() time_elapsed = rough_robot_timestamp - last_checkin # Go through each action, and create a helpful message describing the status. action_list = [] for action in live_policy.policy.actions: name = action.WhichOneof("action") # If this is a lease_stale or auto_return action, include some lease info. name_maybe_with_details = name if action.HasField("lease_stale"): name_maybe_with_details = f"{name_maybe_with_details} ({lease_details(action.lease_stale.leases)})" if action.HasField("auto_return"): name_maybe_with_details = f"{name_maybe_with_details} ({lease_details(action.auto_return.leases)})" # Add an indicator if the policy action is active or not. action_after_time = action.after.ToSeconds() active_message = "NOT active" if time_elapsed > action.after.ToSeconds(): active_message = "ACTIVE" action_list.append( f"{name_maybe_with_details} after {action_after_time} seconds, {active_message}" ) formatted_string = f"id: {live_policy.policy_id}\n client_name: {live_policy.client_name}\n last checkin {time_elapsed}s ago" # Only the supervisor policy seems to have a name. if live_policy.policy.name: formatted_string = formatted_string + f"\n policy name: '{live_policy.policy.name}'" if live_policy.policy.user_id: formatted_string = formatted_string + f"\n user_id: '{live_policy.policy.user_id}'" if len(live_policy.policy.associated_leases) > 0: formatted_string = formatted_string + f"\n associated_leases: {lease_details(live_policy.policy.associated_leases)}" print(formatted_string) print(" actions:") for index, action in enumerate(action_list): print(f" {action}") print("") # If there is an action control action, we should indicate that. if len(status.active_control_actions) == 0: return True for action in status.active_control_actions: enum_name = keepalive_pb2.GetStatusResponse.PolicyControlAction.Name(action) print(f"Active control action: {enum_name}") return True
[docs]class KeepaliveRemovePoliciesCommand(Command): """Remove keepalive policies.""" NAME = 'remove' def __init__(self, subparsers, command_dict): """Remove keepalive policies. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(KeepaliveRemovePoliciesCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('--policy-id', type=int, nargs='+', help='Specify specific policy ids to remove.') def _run(self, robot, options): """Implementation of the command. Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. Returns: True. """ client = robot.ensure_client(KeepaliveClient.default_service_name) current_policies = client.get_status().status current_policy_ids = [s.policy_id for s in current_policies] # If there are no policies, there probably isn't anything interesting here. if len(current_policies) == 0: print("No active policies") return True to_rm = [] if options.policy_id: # Remove specific policies. for policy_id in options.policy_id: if policy_id in current_policy_ids: to_rm.append(policy_id) else: print(f"Policy '{policy_id}' not found.") else: # Remove all current policies. to_rm = current_policy_ids client.modify_policy(policy_ids_to_remove=to_rm) print(f"Removed {len(to_rm)} policies") return True
[docs]class PowerRobotCommand(Command): """Control the power of the entire robot.""" NAME = 'robot' def __init__(self, subparsers, command_dict): """Power cycle or power off robot computers. Note that this is only compatible with certain robots. Check HardwareConfiguration for details. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(PowerRobotCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('cmd', choices=['cycle', 'off']) def _run(self, robot, options): """ Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ robot.time_sync.wait_for_sync(timeout_sec=1.0) lease_client = robot.ensure_client(bosdyn.client.lease.LeaseClient.default_service_name) with bosdyn.client.lease.LeaseKeepAlive(lease_client, must_acquire=True, return_at_exit=True): power_client = robot.ensure_client(PowerClient.default_service_name) if options.cmd == 'cycle': power_cycle_robot(power_client) elif options.cmd == 'off': power_off_robot(power_client)
[docs]class PowerPayloadsCommand(Command): """Control the power of robot payloads.""" NAME = 'payload' def __init__(self, subparsers, command_dict): """Power on or off robot payloads. Note that this is only compatible with certain robots. Check HardwareConfiguration for details. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(PowerPayloadsCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('on_off', choices=['on', 'off']) def _run(self, robot, options): """ Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ robot.time_sync.wait_for_sync(timeout_sec=1.0) lease_client = robot.ensure_client(bosdyn.client.lease.LeaseClient.default_service_name) with bosdyn.client.lease.LeaseKeepAlive(lease_client, must_acquire=True, return_at_exit=True): power_client = robot.ensure_client(PowerClient.default_service_name) if options.on_off == 'on': power_on_payload_ports(power_client) elif options.on_off == 'off': power_off_payload_ports(power_client)
[docs]class PowerWifiRadioCommand(Command): """Control the power of robot wifi radio.""" NAME = 'wifi' def __init__(self, subparsers, command_dict): """Power on or off robot LTE radio. Note that this is only compatible with certain robots. Check HardwareConfiguration for details. Args: subparsers: List of argument parsers. command_dict: Dictionary of command names which take parsed options. """ super(PowerWifiRadioCommand, self).__init__(subparsers, command_dict) self._parser.add_argument('on_off', choices=['on', 'off']) def _run(self, robot, options): """ Args: robot: Robot object on which to run the command. options: Parsed command-line arguments. """ robot.time_sync.wait_for_sync(timeout_sec=1.0) lease_client = robot.ensure_client(bosdyn.client.lease.LeaseClient.default_service_name) with bosdyn.client.lease.LeaseKeepAlive(lease_client, must_acquire=True, return_at_exit=True): power_client = robot.ensure_client(PowerClient.default_service_name) if options.on_off == 'on': power_on_wifi_radio(power_client) elif options.on_off == 'off': power_off_wifi_radio(power_client)
[docs]def main(args=None): """Command-line interface for interacting with robot services.""" parser = argparse.ArgumentParser(prog='bosdyn.client', description=main.__doc__) add_common_arguments(parser, credentials_no_warn=True) command_dict = {} # command name to fn which takes parsed options subparsers = parser.add_subparsers(title='commands', dest='command') # Register commands that can be run. DirectoryCommands(subparsers, command_dict) PayloadCommands(subparsers, command_dict) FaultCommands(subparsers, command_dict) RobotIdCommand(subparsers, command_dict) LicenseCommand(subparsers, command_dict) LogStatusCommands(subparsers, command_dict) RobotStateCommands(subparsers, command_dict) DataBufferCommands(subparsers, command_dict) DataServiceCommands(subparsers, command_dict) TimeSyncCommand(subparsers, command_dict) LeaseCommands(subparsers, command_dict) OldBecomeEstopCommand(subparsers, command_dict) EstopCommands(subparsers, command_dict) ImageCommands(subparsers, command_dict) LocalGridCommands(subparsers, command_dict) DataAcquisitionCommand(subparsers, command_dict) HostComputerIPCommand(subparsers, command_dict) PowerCommand(subparsers, command_dict) KeepaliveCommand(subparsers, command_dict) options = parser.parse_args(args=args) setup_logging(verbose=options.verbose) # Create robot object and authenticate. sdk = bosdyn.client.create_standard_sdk('BosdynClient') sdk.register_service_client(DataAcquisitionPluginClient) robot = sdk.create_robot(options.hostname) if not options.command: print("Need to specify a command") parser.print_help() return False if not command_dict[options.command].run(robot, options): return False return True