# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Command-line utility code for interacting with robot services."""
from __future__ import division
import abc
import argparse
import datetime
import os
import signal
import socket
import sys
import threading
import time
from deprecated.sphinx import deprecated
from google.protobuf import json_format
import bosdyn.client
from bosdyn.api import data_acquisition_pb2, image_pb2
from bosdyn.api.data_buffer_pb2 import Event, TextMessage
from bosdyn.api.data_index_pb2 import EventsCommentsSpec
from bosdyn.api.keepalive import keepalive_pb2
from bosdyn.api.robot_state_pb2 import BehaviorFault
from bosdyn.util import duration_str, timestamp_to_datetime
from .auth import InvalidLoginError, InvalidTokenError
from .data_acquisition import DataAcquisitionClient
from .data_acquisition_helpers import acquire_and_process_request
from .data_acquisition_plugin import DataAcquisitionPluginClient
from .data_buffer import DataBufferClient
from .data_service import DataServiceClient
from .directory import DirectoryClient, NonexistentServiceError
from .directory_registration import DirectoryRegistrationClient, DirectoryRegistrationResponseError
from .estop import EstopClient, EstopEndpoint, EstopKeepAlive
from .exceptions import Error, InvalidRequestError, ProxyConnectionError
from .image import (ImageClient, ImageResponseError, UnknownImageSourceError, build_image_request,
save_images_as_files)
from .keepalive import KeepaliveClient
from .lease import LeaseClient
from .license import LicenseClient
from .local_grid import LocalGridClient
from .log_status import InactiveLogError, LogStatusClient
from .payload import PayloadClient
from .payload_registration import PayloadAlreadyExistsError, PayloadRegistrationClient
from .power import (PowerClient, power_cycle_robot, power_off_payload_ports, power_off_robot,
power_off_wifi_radio, power_on_payload_ports, power_on_wifi_radio)
from .robot_id import RobotIdClient
from .robot_state import RobotStateClient
from .time_sync import TimeSyncClient, TimeSyncEndpoint, TimeSyncError, timespec_to_robot_timespan
from .util import add_common_arguments, authenticate, setup_logging
# pylint: disable=too-few-public-methods
[docs]class Command(object, metaclass=abc.ABCMeta):
"""Command-line command.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
# The name of the command the user should enter on the command line to select this command.
NAME = None
# Whether authentication is needed before the command is run.
# Most commands need authentication.
NEED_AUTHENTICATION = True
def __init__(self, subparsers, command_dict):
command_dict[self.NAME] = self
self._parser = subparsers.add_parser(self.NAME, help=self.__doc__)
[docs] def run(self, robot, options):
"""Invoke the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
try:
if self.NEED_AUTHENTICATION:
if hasattr(options, 'username') and hasattr(
options, 'password') and (options.username or options.password):
robot.authenticate(options.username, options.password)
else:
authenticate(robot)
robot.sync_with_directory() # Make sure that we can use all registered services.
return self._run(robot, options)
except ProxyConnectionError:
print('Could not contact robot with hostname "{}".'.format(options.hostname),
file=sys.stderr)
except InvalidTokenError:
print('The provided user token is invalid.', file=sys.stderr)
except InvalidLoginError:
print('Username and/or password are invalid.', file=sys.stderr)
except Error as err:
print('{}: {}'.format(type(err).__name__, err), file=sys.stderr)
@abc.abstractmethod
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
[docs]class Subcommands(Command):
"""Run subcommands.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
subcommands: List of subcommands to run.
"""
def __init__(self, subparsers, command_dict, subcommands):
super(Subcommands, self).__init__(subparsers, command_dict)
command_dest = '{}_command'.format(self.NAME)
cmd_subparsers = self._parser.add_subparsers(title=self.__doc__, dest=command_dest)
cmd_subparsers.required = True
self._subcommands = {}
for subcommand in subcommands:
subcommand(cmd_subparsers, self._subcommands)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
Execution of the specific subcommand from the options.
"""
command_dest = '{}_command'.format(self.NAME)
subcommand = vars(options)[command_dest]
return self._subcommands[subcommand].run(robot, options)
[docs]class DirectoryCommands(Subcommands):
"""Commands related to the directory service."""
NAME = 'dir'
def __init__(self, subparsers, command_dict):
"""Commands related to the directory service.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DirectoryCommands, self).__init__(subparsers, command_dict, [
DirectoryListCommand, DirectoryGetCommand, DirectoryRegisterCommand,
DirectoryUnregisterCommand
])
def _format_dir_entry( # pylint: disable=too-many-arguments
name, service_type, authority, tokens, name_width=23, type_width=31, authority_width=27):
"""Prints the passed values as "name service_type authority tokens", with the first three using
the specified width.
Args:
name: Name of the service.
service_type: Type of the service.
authority: Authority of the service.
tokens: Tokens required for using the service.
name_width: Width for printing the name value.
type_width: Width for printing the service_type value.
authority_width: Width for printing the authority value.
"""
print(('{:' + str(name_width) + '} {:' + str(type_width) + '} {:' + str(authority_width) +
'} {}').format(name, service_type, authority, tokens))
def _token_req_str(entry):
"""Returns a string representing tokens required for using the service.
Args:
entry: Service entry being checked.
Returns:
String with a comma-separated list of required tokens.
"""
required = []
if entry.user_token_required:
required.append('user')
if not required:
return ''
return ', '.join(required)
def _show_directory_list(robot, as_proto=False):
"""Print service directory list for robot.
Args:
robot: Robot object used to get the list of services.
as_proto: Boolean to determine whether the directory entries should be printed as full
proto definitions or formatted strings.
Returns:
True
"""
entries = robot.ensure_client(DirectoryClient.default_service_name).list()
if not entries:
print("No services found")
return True
if as_proto:
for entry in entries:
print(entry)
return True
max_name_length = max(len(entry.name) for entry in entries)
max_type_length = max(len(entry.type) for entry in entries)
max_authority_length = max(len(entry.authority) for entry in entries)
_format_dir_entry('name', 'type', 'authority', 'tokens', max_name_length + 4,
max_type_length + 4, max_authority_length + 4)
print("-" * (20 + max_name_length + max_type_length + max_authority_length))
for entry in entries:
_format_dir_entry(entry.name, entry.type, entry.authority, _token_req_str(entry),
max_name_length + 4, max_type_length + 4, max_authority_length + 4)
return True
def _show_directory_entry(robot, service, as_proto=False):
"""Print a service directory entry.
Args:
robot: Robot object used to get the list of services.
service: Name of the service to print.
as_proto: Boolean to determine whether the directory entries should be printed as full
proto definitions or formatted strings.
Returns:
True
"""
entry = robot.ensure_client(DirectoryClient.default_service_name).get_entry(service)
if as_proto:
print(entry)
else:
_format_dir_entry('name', 'type', 'authority', 'tokens')
print("-" * 90)
_format_dir_entry(entry.name, entry.type, entry.authority, _token_req_str(entry))
return True
[docs]class DirectoryListCommand(Command):
"""List all services in the directory."""
NAME = 'list'
def __init__(self, subparsers, command_dict):
"""List all services in the directory.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DirectoryListCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
_show_directory_list(robot, as_proto=options.proto)
return True
[docs]class DirectoryGetCommand(Command):
"""Get entry for a given service in the directory."""
NAME = 'get'
def __init__(self, subparsers, command_dict):
"""Get entry for a given service in the directory.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DirectoryGetCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
self._parser.add_argument('service', help='service name to get entry for')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
False if NonexistentServiceError is caught, True otherwise.
"""
try:
_show_directory_entry(robot, options.service, as_proto=options.proto)
except NonexistentServiceError:
print('The requested service name "{}" does not exist. Available services:'.format(
options.service))
_show_directory_list(robot, as_proto=options.proto)
return False
return True
[docs]class DirectoryRegisterCommand(Command):
"""Register entry for a service in the directory."""
NAME = 'register'
def __init__(self, subparsers, command_dict):
"""Register entry for a service in the directory.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DirectoryRegisterCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--service-name', required=True,
help='unique name of the service')
self._parser.add_argument('--service-type', required=True,
help='Type of the service, e.g. bosdyn.api.RobotStateService')
self._parser.add_argument('--service-authority', required=True,
help='authority of the service')
self._parser.add_argument('--service-hostname', required=True,
help='hostname of the service computer')
self._parser.add_argument('--service-port', required=True, type=int,
help='port the service is running on')
self._parser.add_argument('--no-user-token', action='store_true', required=False,
help='disable requirement for user token')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
False if DirectoryRegistrationResponseError is caught, True otherwise.
"""
directory_registration_client = robot.ensure_client(
DirectoryRegistrationClient.default_service_name)
try:
directory_registration_client.register(options.service_name, options.service_type,
options.service_authority,
options.service_hostname, options.service_port,
user_token_required=not options.no_user_token)
except DirectoryRegistrationResponseError as exc:
# pylint: disable=no-member
print("Failed to register service {}.\nResponse Status: {}".format(
options.service_name,
bosdyn.api.directory_registration_pb2.RegisterServiceResponse.Status.Name(
exc.response.status)))
return False
else:
print("Successfully registered service {}".format(options.service_name))
return True
[docs]class DirectoryUnregisterCommand(Command):
"""Unregister entry for a service in the directory."""
NAME = 'unregister'
def __init__(self, subparsers, command_dict):
"""Unregister entry for a service in the directory.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DirectoryUnregisterCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--service-name', required=True,
help='unique name of the service')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
False if DirectoryRegistrationResponseError is caught, True otherwise.
"""
directory_registration_client = robot.ensure_client(
DirectoryRegistrationClient.default_service_name)
try:
directory_registration_client.unregister(options.service_name)
except DirectoryRegistrationResponseError as exc:
# pylint: disable=no-member
print("Failed to unregister service {}.\nResponse Status: {}".format(
options.service_name,
bosdyn.api.directory_registration_pb2.UnregisterServiceResponse.Status.Name(
exc.response.status)))
return False
else:
print("Successfully unregistered service {}".format(options.service_name))
return True
[docs]class PayloadCommands(Subcommands):
"""Commands related to the payload and payload registration services."""
NAME = 'payload'
def __init__(self, subparsers, command_dict):
"""Commands related to the payload and payload registration services.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(PayloadCommands, self).__init__(subparsers, command_dict,
[PayloadListCommand, PayloadRegisterCommand])
def _show_payload_list(robot, as_proto=False):
"""Print payload list for robot.
Args:
robot: Robot object used to get the list of services.
as_proto: Boolean to determine whether the payload entries should be printed as full
proto definitions or formatted strings.
Returns:
True
"""
payload_protos = robot.ensure_client(PayloadClient.default_service_name).list_payloads()
if not payload_protos:
print("No payloads found")
return True
if as_proto:
for payload in payload_protos:
print(payload)
return True
# Print out the payload name, description, and GUID in columns with set width.
name_width = 30
description_width = 60
guid_width = 36
print(('\n{:' + str(name_width) + '} {:' + str(description_width) + '} {:' + str(guid_width) +
'}').format('Name', 'Description', 'GUID'))
print("-" * (5 + name_width + description_width + guid_width))
for payload in payload_protos:
print(('{:' + str(name_width) + '} {:' + str(description_width) + '} {:' + str(guid_width) +
'}').format(payload.name, payload.description, payload.GUID))
return True
[docs]class PayloadListCommand(Command):
"""List all payloads registered with the robot."""
NAME = 'list'
def __init__(self, subparsers, command_dict):
"""List all payloads registered with the robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(PayloadListCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
_show_payload_list(robot, as_proto=options.proto)
return True
def _register_payload(robot, name, guid, secret):
"""Register a payload with the robot.
Args:
robot: Robot object used to register the payload.
name: The name that will be assigned to the registered payload.
guid: The GUID that will be assigned to the registered payload.
secret: The secret that the payload will be registered with.
Returns:
True
"""
payload_registration_client = robot.ensure_client(
PayloadRegistrationClient.default_service_name)
payload = bosdyn.api.payload_pb2.Payload(GUID=guid, name=name)
try:
payload_registration_client.register_payload(payload, secret)
except PayloadAlreadyExistsError:
print('\nA payload with this GUID is already registered. Check the robot Admin Console.')
else:
print('\nPayload successfully registered with the robot.\n'
'Before it can be used, the payload must be authorized in the Admin Console.')
return True
[docs]class PayloadRegisterCommand(Command):
"""Register a payload with the robot."""
NAME = 'register'
def __init__(self, subparsers, command_dict):
"""Register a payload with the robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(PayloadRegisterCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--payload-name', required=True, help='name of the payload')
self._parser.add_argument('--payload-guid', required=True, help='guid of the payload')
self._parser.add_argument('--payload-secret', required=True, help='secret for the payload')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
_register_payload(robot, options.payload_name, options.payload_guid, options.payload_secret)
[docs]class FaultCommands(Subcommands):
"""Commands related to the fault service and robot state service (for fault reading)."""
NAME = 'fault'
def __init__(self, subparsers, command_dict):
"""Commands related to the fault service and robot state service (for fault reading).
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(FaultCommands, self).__init__(subparsers, command_dict,
[FaultShowCommand, FaultWatchCommand])
def _show_all_faults(robot):
"""Print faults for the robot.
Args:
robot: Robot object used to get the list of services.
"""
robot_state_client = robot.ensure_client(RobotStateClient.default_service_name)
robot_state = robot_state_client.get_robot_state()
system_fault_state = robot_state.system_fault_state
behavior_fault_state = robot_state.behavior_fault_state
service_fault_state = robot_state.service_fault_state
print('\n' + '-' * 80)
if len(system_fault_state.faults) == 0:
print("No active system faults.")
else:
for fault in system_fault_state.faults:
print('''
{fault.name}
Error Message: {fault.error_message}
Onset Time: {timestamp}''' \
.format(fault=fault, timestamp=timestamp_to_datetime(fault.onset_timestamp)))
print()
if len(behavior_fault_state.faults) == 0:
print("No active behavior faults.")
else:
for fault in behavior_fault_state.faults:
print('''
{cause}
Onset Time: {timestamp}
Clearable: {clearable}''' \
.format(cause=BehaviorFault.Cause.Name(fault.cause),
timestamp=timestamp_to_datetime(fault.onset_timestamp),
clearable=BehaviorFault.Status.Name(fault.status)))
print()
if len(service_fault_state.faults) == 0:
print("No active service faults.")
else:
for fault in service_fault_state.faults:
print('''
{fault.fault_id.fault_name}
Service Name: {fault.fault_id.service_name}
Payload GUID: {fault.fault_id.payload_guid}
Error Message: {fault.error_message}
Onset Time: {timestamp}'''\
.format(fault=fault, timestamp=timestamp_to_datetime(fault.onset_timestamp)))
[docs]class FaultShowCommand(Command):
"""Show all faults currently active in robot state."""
NAME = 'show'
def __init__(self, subparsers, command_dict):
"""Show all faults currently active in robot state.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(FaultShowCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
_show_all_faults(robot)
return True
[docs]class FaultWatchCommand(Command):
"""Watch all faults in robot state and print them out."""
NAME = 'watch'
def __init__(self, subparsers, command_dict):
"""Watch all service faults in robot state and print them out.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(FaultWatchCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
print('Press Ctrl-C or send SIGINT to exit\n\n')
try:
while True:
_show_all_faults(robot)
time.sleep(1)
except KeyboardInterrupt:
pass
return True
[docs]class LogStatusCommands(Subcommands):
"""Start, update and terminate experiment logs, start and terminate retro logs and check status of
active logs for robot."""
NAME = 'log-status'
NEED_AUTHENTICATION = True
def __init__(self, subparsers, command_dict):
"""Interact with logs for robot
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(LogStatusCommands, self).__init__(subparsers, command_dict, [
GetLogCommand,
GetActiveLogStatusesCommand,
ExperimentLogCommand,
StartRetroLogCommand,
TerminateLogCommand,
])
[docs]class GetLogCommand(Command):
"""Get log status but log id."""
NAME = 'get'
def __init__(self, subparsers, command_dict):
"""Get log status from robot
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetLogCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('id', help='id of log bundle to display')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
client = robot.ensure_client(LogStatusClient.default_service_name)
response = client.get_log_status(options.id)
print(response.log_status)
return True
[docs]class GetActiveLogStatusesCommand(Command):
"""Get active log bundles for robot."""
NAME = 'active'
def __init__(self, subparsers, command_dict):
"""Retrieve active log statuses for robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetActiveLogStatusesCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
client = robot.ensure_client(LogStatusClient.default_service_name)
response = client.get_active_log_statuses()
print(response.log_statuses)
return True
[docs]class ExperimentLogCommand(Subcommands):
"""Give experiment log commands to robot."""
NAME = 'experiment'
def __init__(self, subparsers, command_dict):
"""Start a timed or continuous experiment log.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(ExperimentLogCommand, self).__init__(subparsers, command_dict, [
StartTimedExperimentLogCommand,
StartContinuousExperimentLogCommand,
])
[docs]class StartTimedExperimentLogCommand(Command):
"""Start a timed experiment log."""
NAME = 'timed'
def __init__(self, subparsers, command_dict):
"""Start timed experiment log
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(StartTimedExperimentLogCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('seconds', type=float, help='how long should the experiment run?')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
client = robot.ensure_client(LogStatusClient.default_service_name)
response = client.start_experiment_log(options.seconds)
print(response.log_status)
return True
[docs]class StartContinuousExperimentLogCommand(Command):
"""Start a continuous experiment log."""
NAME = 'continuous'
def __init__(self, subparsers, command_dict):
"""Start continuous experiment log, defaulted to update keep alive time by 10 seconds every 5 seconds.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(StartContinuousExperimentLogCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('-sleep', type=float, default=5,
help='how long should thread sleep before extending')
[docs] @staticmethod
def handle_keyboard_interruption(client, log_id):
try:
print(" Received keyboard interruption\n\n")
response = client.terminate_log(log_id)
print(response.log_status)
except KeyboardInterrupt:
client.terminate_log_async(log_id)
print("Log will terminate shortly")
response = client.get_log_status(log_id)
print(response.log_status)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
client = robot.ensure_client(LogStatusClient.default_service_name)
response = client.start_experiment_log(options.sleep * 2)
log_id = response.log_status.id
print("Experiment log id: ", log_id)
print('Use terminate command, press Ctrl-C or send SIGINT to complete log\n\n')
try:
while True:
time.sleep(options.sleep)
client.update_experiment(log_id, options.sleep * 2)
except InactiveLogError:
response = client.get_log_status(log_id)
print(response.log_status)
except KeyboardInterrupt:
self.handle_keyboard_interruption(client, log_id)
return True
[docs]class StartRetroLogCommand(Command):
"""Start a retro log."""
NAME = 'retro'
def __init__(self, subparsers, command_dict):
"""Start a retro log
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(StartRetroLogCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('seconds', type=float, help='how long should the retro log run?')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
client = robot.ensure_client(LogStatusClient.default_service_name)
response = client.start_retro_log(options.seconds)
print(response.log_status)
return True
[docs]class TerminateLogCommand(Command):
"""Terminate log gathering process."""
NAME = 'terminate'
def __init__(self, subparsers, command_dict):
"""Terminate log on robot
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(TerminateLogCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('id', help='id of log to terminate')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
client = robot.ensure_client(LogStatusClient.default_service_name)
response = client.terminate_log(options.id)
print(response.log_status)
return True
[docs]class RobotIdCommand(Command):
"""Show robot-id."""
NAME = 'id'
NEED_AUTHENTICATION = False
def __init__(self, subparsers, command_dict):
"""Show robot-id.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(RobotIdCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
proto = robot.ensure_client(RobotIdClient.default_service_name).get_id()
if options.proto:
print(proto)
return True
nickname = ''
if proto.nickname and proto.nickname != proto.serial_number:
nickname = proto.nickname
release = proto.software_release
version = release.version
print(u"{:20} {:15} {:10} {} ({})".format(proto.serial_number, proto.computer_serial_number,
nickname, proto.species, proto.version))
print(" Software: {}.{}.{} ({} {})".format(version.major_version, version.minor_version,
version.patch_level, release.changeset,
timestamp_to_datetime(release.changeset_date)))
print(" Installed: {}".format(timestamp_to_datetime(release.install_date)))
return True
[docs]class DataBufferCommands(Subcommands):
"""Commands related to the data-buffer service."""
NAME = 'log'
def __init__(self, subparsers, command_dict):
"""Commands related to the data-buffer service.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DataBufferCommands, self).__init__(subparsers, command_dict,
[TextMsgCommand, OperatorCommentCommand])
[docs]class TextMsgCommand(Command):
"""Send a text-message to the data buffer to be logged."""
NAME = 'textmsg'
def __init__(self, subparsers, command_dict):
"""Send a text-message to the data buffer to be logged.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(TextMsgCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--timestamp', action='store_true',
help='achieve time-sync and send timestamp')
self._parser.add_argument('--tag', help='Tag for message')
parser_log_level = self._parser.add_mutually_exclusive_group()
parser_log_level.add_argument('--debug', '-D', action='store_true',
help='Log at debug-level')
parser_log_level.add_argument('--info', '-I', action='store_true', help='Log at info-level')
parser_log_level.add_argument('--warn', '-W', action='store_true', help='Log at warn-level')
parser_log_level.add_argument('--error', '-E', action='store_true',
help='Log at error-level')
self._parser.add_argument('message', help='Message to log')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
False if TimeSyncError is caught, True otherwise.
"""
robot_timestamp = None
if options.timestamp:
try:
robot_timestamp = robot.time_sync.robot_timestamp_from_local_secs(
time.time(), timesync_timeout_sec=1.0)
except TimeSyncError as err:
print("Failed to send message with timestamp: {}.".format(err))
return False
msg_proto = TextMessage(message=options.message, timestamp=robot_timestamp)
# pylint: disable=no-member
if options.debug:
msg_proto.level = TextMessage.LEVEL_DEBUG
elif options.warn:
msg_proto.level = TextMessage.LEVEL_WARN
elif options.error:
msg_proto.level = TextMessage.LEVEL_ERROR
else:
msg_proto.level = TextMessage.LEVEL_INFO
if options.tag:
msg_proto.tag = options.tag
data_buffer_client = robot.ensure_client(DataBufferClient.default_service_name)
data_buffer_client.add_text_messages([msg_proto])
return True
[docs]class DataServiceCommands(Subcommands):
"""Commands for querying the data-service."""
NAME = 'data'
def __init__(self, subparsers, command_dict):
"""Commands for querying the data-service
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DataServiceCommands, self).__init__(
subparsers, command_dict,
[GetDataBufferCommentsCommand, GetDataBufferEventsCommand, GetDataBufferStatusCommand])
[docs]class GetDataBufferEventsCommand(GetDataBufferEventsCommentsCommand):
"""Get events from the robot."""
NAME = 'events'
def __init__(self, subparsers, command_dict):
"""Get operator comments from the robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetDataBufferEventsCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--type', help='query for only the given event-type')
# pylint: disable=no-member
self._parser.add_argument(
'--level',
choices=Event.Level.keys()[1:], # slice skips UNSET
help='limit level to this and above')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
request_spec = EventsCommentsSpec()
event_spec = request_spec.events.add() # pylint: disable=no-member
if options.type:
event_spec.type = options.type
if options.level:
event_spec.level.value = Event.Level.Value(options.level) # pylint: disable=no-member
def _get_events(response):
return response.events_comments.events
return self._get_result(request_spec, robot, options, _get_events)
@staticmethod
def _level_name(event):
prefix = 'LEVEL_'
# pylint: disable=no-member
name = event.Level.Name(event.level)
if name.startswith(prefix):
return name[len(prefix):]
return name
[docs] def pretty_print(self, values): # pylint: disable=no-self-use
last_date_shown = None
for event in values:
start_secs = event.start_time.seconds + event.start_time.seconds * 1e-9
start_dt = datetime.datetime.fromtimestamp(start_secs)
if start_dt.date() != last_date_shown:
print("\n[{}]".format(start_dt.date()))
last_date_shown = start_dt.date()
if event.end_time and event.end_time != event.start_time:
end_secs = event.end_time.seconds + event.end_time.seconds * 1e-9
end_dt = datetime.datetime.fromtimestamp(end_secs)
print(" {}-{} (END) ({:16}) {:16} {:16} <{}> ".format(
start_dt.time(), end_dt.time(), end_secs - start_secs, event.type,
self._level_name(event), event.source))
else:
timing = '' if event.end_time else '(START)'
print(" {} {} {:16} {:16} <{}> ".format(start_dt.time(), timing, event.type,
self._level_name(event), event.source))
if event.description:
print("\t{}".format(event.description))
[docs]class GetDataBufferStatusCommand(Command):
"""Get status of data-buffer on robot."""
NAME = 'status'
def __init__(self, subparsers, command_dict):
"""Get status of data-buffer on robot
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetDataBufferStatusCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--get-blob-specs', '-B', action='store_true',
help='get list of channel/msgtype/source combinations')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
client = robot.ensure_client(DataServiceClient.default_service_name)
print(client.get_data_buffer_status(get_blob_specs=options.get_blob_specs))
return True
[docs]class RobotStateCommands(Subcommands):
"""Commands for querying robot state."""
NAME = 'state'
def __init__(self, subparsers, command_dict):
"""Commands for querying robot state.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(RobotStateCommands, self).__init__(
subparsers, command_dict,
[FullStateCommand, HardwareConfigurationCommand, MetricsCommand, RobotModel])
[docs]class FullStateCommand(Command):
"""Show robot state."""
NAME = 'full'
def __init__(self, subparsers, command_dict):
"""Show robot state.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(FullStateCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command; prints RobotState proto.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
proto = robot.ensure_client(RobotStateClient.default_service_name).get_robot_state()
print(proto)
return True
[docs]class HardwareConfigurationCommand(Command):
"""Show robot hardware configuration."""
NAME = 'hardware'
def __init__(self, subparsers, command_dict):
"""Show robot hardware configuration.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(HardwareConfigurationCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command; prints HardwareConfiguration proto.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
proto = robot.ensure_client(
RobotStateClient.default_service_name).get_robot_hardware_configuration()
print(proto)
return True
[docs]class RobotModel(Command):
"""Write robot URDF and mesh to local files."""
NAME = 'model'
NEED_AUTHENTICATION = False
def __init__(self, subparsers, command_dict):
"""Write robot URDF and mesh to local files.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(RobotModel, self).__init__(subparsers, command_dict)
self._parser.add_argument('--outdir', default='Model_Files',
help='directory into which to save the files')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
robot_state_client = robot.ensure_client(RobotStateClient.default_service_name)
hardware = robot_state_client.get_robot_hardware_configuration()
# Write files in user-specified directory, or use default name
model_directory = options.outdir
# Make the directory, if it does not already exist
try:
os.makedirs(model_directory)
except OSError:
pass
# Write each link model to its own file
for link in hardware.skeleton.links:
# Request a Skeleton.Link.ObjModel from the robot for link.name and write it to a file
try:
obj_model_proto = robot_state_client.get_robot_link_model(link.name)
except InvalidRequestError as err:
print(err, end='')
print(" Name of link: " + link.name)
continue
# If file_name is empty, ignore
if not obj_model_proto.file_name:
continue
# Write to a file, ignoring the robot path
sub_path = '/'.join(obj_model_proto.file_name.split('/')[:-1]) # robot defined path
path = os.path.join(model_directory, sub_path) # local path + robot path
try:
os.makedirs(path)
except OSError:
pass
path_and_name = os.path.join(path, obj_model_proto.file_name.split('/')[-1])
with open(path_and_name, "w") as obj_file:
obj_file.write(obj_model_proto.file_contents)
print('Link file written to ' + path_and_name)
# Write the corresponding urdf file inside the link directory
with open(os.path.join(model_directory, "model.urdf"), "w") as urdf_file:
urdf_file.write(hardware.skeleton.urdf)
print('URDF file written to ' + os.path.join(model_directory, "model.urdf"))
return True
[docs]class MetricsCommand(Command):
"""Show metrics (runtime, etc...)."""
NAME = 'metrics'
def __init__(self, subparsers, command_dict):
"""Show metrics (runtime, etc...).
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(MetricsCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print metrics in proto format')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
proto = robot.ensure_client(RobotStateClient.default_service_name).get_robot_metrics()
if options.proto:
print(proto)
return True
for metric in proto.metrics:
print(self._format_metric(metric))
return True
@staticmethod
def _secs_to_hms(seconds):
"""Converts seconds to hours:minutes:seconds string.
Args:
seconds: Seconds to convert.
Returns:
String in the format hours:MM:SS.
"""
isecs = int(seconds)
seconds = isecs % 60
minutes = (isecs // 60) % 60
hours = isecs // 3600
return '{:d}:{:02d}:{:02d}'.format(hours, minutes, seconds)
@staticmethod
def _distance_str(meters):
"""Converts distance to human-readable string.
Args:
meters: Distance in meters to convert.
Returns:
String with meters with two digit precision if distance is less than 1000 meters, or
kilometers with two digit precision otherwise.
"""
if meters < 1000:
return '{:.2f} m'.format(meters)
return '{:.2f} km'.format(float(meters) / 1000)
@staticmethod
def _timestamp_str(timestamp):
"""Converts a timestamp to a human-readable string.
Args:
timestamp: Protobuf timestamp to convert
Returns:
Timestamp string in ISO 8601 format
"""
# The json format of a timestamp is a string that looks like '"2022-01-12T21:56:05Z"',
# so we strip off the outer quotes and return that.
return json_format.MessageToJson(timestamp).strip('"')
@staticmethod
def _format_metric(metric): # pylint: disable=too-many-return-statements
"""Convert metric input to human-readable string.
Args:
metric: Input metric object to convert.
Returns:
String in the format: Label float_value units.
"""
field = metric.WhichOneof('values')
if field is None:
return '{:20} missing value'.format(metric.label)
value = getattr(metric, field)
# Special case formatting
if field == 'duration':
return '{:20} {}'.format(metric.label, MetricsCommand._secs_to_hms(value.seconds))
elif field == 'timestamp':
return '{:20} {}'.format(metric.label, MetricsCommand._timestamp_str(value))
elif field == 'float_value':
if metric.units == 'm':
return '{:20} {}'.format(metric.label, MetricsCommand._distance_str(value))
return '{:20} {:.2f} {}'.format(metric.label, value, metric.units)
else:
# Default formatting
return '{:20} {} {}'.format(metric.label, value, metric.units)
[docs]class TimeSyncCommand(Command):
"""Find clock difference between this and the robot clock."""
NAME = 'time-sync'
def __init__(self, subparsers, command_dict):
"""Find clock difference between this and the robot clock.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(TimeSyncCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
False if timesync cannot be established, True otherwise.
"""
endpoint = TimeSyncEndpoint(robot.ensure_client(TimeSyncClient.default_service_name))
if not endpoint.establish_timesync(break_on_success=True):
print("Failed to achieve time sync")
return False
if options.proto:
print(endpoint.response)
return True
print("GRPC round-trip time: {}".format(duration_str(endpoint.round_trip_time)))
print("Local time to robot time: {}".format(duration_str(endpoint.clock_skew)))
return True
[docs]class LicenseCommand(Command):
"""Show installed license."""
NAME = "license"
def __init__(self, subparsers, command_dict):
super(LicenseCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
self._parser.add_argument('-f', '--feature-codes', nargs='+',
help='Optional feature list for GetFeatureEnabled API.')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
license_client = robot.ensure_client(LicenseClient.default_service_name)
self._get_license_info(license_client, options)
self._get_feature_enabled(license_client, options)
return True
def _get_license_info(self, license_client, options):
license_info = license_client.get_license_info()
if options.proto:
print(license_info)
else:
print(str(license_info))
def _get_feature_enabled(self, license_client, options):
if not options.feature_codes or len(options.feature_codes) == 0:
return
feature_enabled = license_client.get_feature_enabled(options.feature_codes)
for feature in feature_enabled:
if feature_enabled[feature]:
print(f"Feature {feature} is enabled.")
else:
print(f"Feature {feature} is not enabled.")
[docs]class LeaseCommands(Subcommands):
"""Commands related to the lease service."""
NAME = 'lease'
def __init__(self, subparsers, command_dict):
"""Commands related to the lease service.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(LeaseCommands, self).__init__(subparsers, command_dict, [LeaseListCommand])
[docs]class LeaseListCommand(Command):
"""List all leases."""
NAME = 'list'
def __init__(self, subparsers, command_dict):
"""List all leases.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(LeaseListCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
lease_client = robot.ensure_client(LeaseClient.default_service_name)
resources = lease_client.list_leases()
if options.proto:
print(resources)
return True
for resource in resources:
print(self._format_lease_resource(resource))
return True
@staticmethod
def _format_lease_resource(resource):
return str(resource)
[docs]class EstopCommands(Subcommands):
"""Commands for interacting with robot estop service."""
NAME = 'estop'
def __init__(self, subparsers, command_dict):
"""Commands for interacting with robot estop service.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(EstopCommands,
self).__init__(subparsers, command_dict,
[BecomeEstopCommand, GetEstopConfigCommand, GetEstopStatusCommand])
[docs]class GetEstopConfigCommand(Command):
"""Get estop config of estop service."""
NAME = 'config'
def __init__(self, subparsers, command_dict):
"""Call EstopService GetEstopConfig RPC.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetEstopConfigCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
client = robot.ensure_client(EstopClient.default_service_name)
config = client.get_config()
print(config)
[docs]class GetEstopStatusCommand(Command):
"""Get estop status of estop service."""
NAME = 'status'
def __init__(self, subparsers, command_dict):
"""Call EstopService GetEstopSystemStatus RPC.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetEstopStatusCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
client = robot.ensure_client(EstopClient.default_service_name)
status = client.get_status()
print(status)
[docs]class BecomeEstopCommand(Command):
"""Grab and hold estop until Ctl-C."""
NAME = 'become-estop'
_RPC_PRINT_CHOICES = ['timestamp', 'full']
def __init__(self, subparsers, command_dict):
"""Grab and hold estop until Ctl-C.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(BecomeEstopCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--timeout', type=float, help='EStop timeout (seconds)',
default=10)
self._parser.add_argument('--rpc-print', choices=self._RPC_PRINT_CHOICES,
default='timestamp',
help='How much of the request/response messages to print')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
# Get a client to the estop service.
client = robot.ensure_client(EstopClient.default_service_name)
# If we'd only like to print timestamps of messages, change the client's callbacks that
# convert messages to string.
def _timestamp_fmt_request(request):
return '(request timestamp: {})'.format(request.header.request_timestamp.ToSeconds())
def _timestamp_fmt_response(response):
return '(response timestamp: {})'.format(response.header.response_timestamp.ToSeconds())
if options.rpc_print == 'timestamp':
client.request_trim_for_log = _timestamp_fmt_request
client.response_trim_for_log = _timestamp_fmt_response
# Create the endpoint to the robot estop system.
# Timeout should be chosen to balance safety considerations with expected service latency.
# See the estop documentation for details.
endpoint = EstopEndpoint(client, f"command-line-{socket.gethostname()}", options.timeout)
# Have this endpoint to set up the robot's estop system such that it is the sole estop.
# See the function's docstring and the estop documentation for details.
endpoint.force_simple_setup()
# Create the helper class that does periodic check-ins. This also starts the checking-in.
exit_signal = threading.Event()
# Define a function to call on SIGINT that asserts an estop and cleanly shuts down.
def sigint_handler(sig, frame):
"""Cleanly shut down the application on interrupt."""
#pylint: disable=unused-argument
# Signal that we want to exit the program.
exit_signal.set()
print('Press Ctrl-C or send SIGINT to exit')
# Install our signal handler function.
signal.signal(signal.SIGINT, sigint_handler)
with EstopKeepAlive(endpoint) as keep_alive:
# Now we wait. The thread in EstopKeepAlive will continue sending messages to the estop
# service on its own.
while not exit_signal.wait(10):
pass
# EStop the robot.
keep_alive.stop()
# The keep_alive object's exit() function will shut down the thread.
# This will let another endpoint fill our role, if they want to use the current
# configuration. For details, see the estop documentation.
endpoint.deregister()
return True
[docs]class OldBecomeEstopCommand(BecomeEstopCommand):
"""Old version of BecomeEstopCommand."""
[docs] def run(self, robot, options):
print('DEPRECATION WARNING: This command is now "bosdyn.client estop become-estop"')
return BecomeEstopCommand.run(self, robot, options)
[docs]class ImageCommands(Subcommands):
"""Commands for querying images."""
NAME = 'image'
def __init__(self, subparsers, command_dict):
"""Commands for querying images.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(ImageCommands, self).__init__(subparsers, command_dict,
[ListImageSourcesCommand, GetImageCommand])
def _show_image_sources_list(robot, as_proto=False, service_name=None):
"""Print available image sources.
Args:
robot: Robot object on which to run the command.
as_proto: Boolean to determine whether to print the full proto message, or a human-readable
string in the format: source_name (rows x cols)
Returns:
True.
"""
service_name = service_name or ImageClient.default_service_name
proto = robot.ensure_client(service_name).list_image_sources()
if as_proto:
print(proto)
else:
for image_source in proto:
image_formats = [image_pb2.Image.Format.Name(i)[7:] for i in image_source.image_formats]
pixel_formats = [
image_pb2.Image.PixelFormat.Name(i)[13:] for i in image_source.pixel_formats
]
print("{:30s} ({:d}x{:d}) {:15s} {:15s}".format(image_source.name, image_source.rows,
image_source.cols,
','.join(image_formats),
','.join(pixel_formats)))
return True
[docs]class ListImageSourcesCommand(Command):
"""List image sources."""
NAME = 'list-sources'
def __init__(self, subparsers, command_dict):
"""List image sources.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(ListImageSourcesCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
self._parser.add_argument('--service-name', default=ImageClient.default_service_name,
help='Image service to query')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
Output of _show_image_sources_list method.
"""
_show_image_sources_list(robot, as_proto=options.proto, service_name=options.service_name)
return True
[docs]class GetImageCommand(Command):
"""Get an image from the robot and write it to an image file."""
NAME = 'get-image'
def __init__(self, subparsers, command_dict):
"""Get an image from the robot and write it to an image file.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetImageCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--outfile', default=None,
help='Filename into which to save the image')
self._parser.add_argument('--quality-percent', type=int, default=75,
help='Percent image quality (0-100)')
self._parser.add_argument('source_name', metavar='SRC', nargs='+', help='Image source name')
self._parser.add_argument('--service-name', help='Image service to query',
default=ImageClient.default_service_name)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True if successful, False if exceptions are caught.
"""
image_requests = [
build_image_request(source_name, options.quality_percent)
for source_name in options.source_name
]
try:
response = robot.ensure_client(options.service_name).get_image(image_requests)
except UnknownImageSourceError:
print('Requested image source "{}" does not exist. Available image sources:'.format(
options.source_name))
_show_image_sources_list(robot, service_name=options.service_name)
return False
except ImageResponseError:
print('Robot cannot generate the "{}" at this time. Retry the command.'.format(
options.source_name))
return False
# Save the image files in the correct format (jpeg, pgm for raw/rle).
save_images_as_files(response)
return True
[docs]class LocalGridCommands(Subcommands):
"""Commands for querying local grid maps."""
NAME = 'local_grid'
def __init__(self, subparsers, command_dict):
"""Commands for querying local grid maps.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(LocalGridCommands, self).__init__(subparsers, command_dict,
[ListLocalGridTypesCommand, GetLocalGridsCommand])
def _show_local_grid_sources_list(robot, as_proto=False):
"""Print available local grid sources.
Args:
robot: Robot object on which to run the command.
as_proto: Boolean to determine whether to print the full proto message, or just the list of
names.
Returns:
True.
"""
proto = robot.ensure_client(LocalGridClient.default_service_name).get_local_grid_types()
if as_proto:
print(proto)
else:
for local_grid_type in proto:
print(local_grid_type.name)
return True
[docs]class ListLocalGridTypesCommand(Command):
"""List local grid sources."""
NAME = 'types'
def __init__(self, subparsers, command_dict):
"""List local grid sources.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(ListLocalGridTypesCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--proto', action='store_true',
help='print listing in proto format')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
Output of _show_local_grid_sources_list method.
"""
_show_local_grid_sources_list(robot, as_proto=options.proto)
return True
[docs]class GetLocalGridsCommand(Command):
"""Get local grids from the robot."""
NAME = 'get'
def __init__(self, subparsers, command_dict):
"""Get local grids from the robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(GetLocalGridsCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--outfile', default=None,
help='filename into which to save the image')
self._parser.add_argument('types', metavar='SRC', nargs='+', help='image types')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
response = robot.ensure_client(LocalGridClient.default_service_name).get_local_grids(
options.types)
for local_grid_response in response:
print(local_grid_response)
return True
[docs]class DataAcquisitionCommand(Subcommands):
"""Acquire data from the robot and add it in the data buffer with the metadata, or request
status."""
NAME = 'acquire'
def __init__(self, subparsers, command_dict):
"""Acquire data from the robot and add it in the data buffer with the metadata, or request
status.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DataAcquisitionCommand, self).__init__(subparsers, command_dict, [
DataAcquisitionServiceCommand, DataAcquisitionRequestCommand,
DataAcquisitionStatusCommand, DataAcquisitionGetLiveDataCommand
])
[docs]class DataAcquisitionRequestCommand(Command):
"""Capture and save images or metadata specified in the command line arguments."""
NAME = 'request'
def __init__(self, subparsers, command_dict):
"""Capture and save images or metadata specified in the command line arguments.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DataAcquisitionRequestCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--image-source', metavar='IMG_SRC', default=[],
help='Image source name', action='append')
self._parser.add_argument('--image-service', metavar='SERVICE_NAME', default=[],
help='Image service name for the image source.', action='append')
self._parser.add_argument('--data-source', metavar='DATA_SRC', default=[],
help='Data source name', action='append')
self._parser.add_argument('--action-name', help='The action name to save the data with.',
default="quick_captures")
self._parser.add_argument('--group-name', help='The group name to save the data with.',
default="command_line")
self._parser.add_argument('--non-blocking-request',
help='Return after making the acquisition request, without monitoring' +\
' the status for completion.',
default=False, action='store_true')
def _run(self, robot, options):
"""Implementation of the 'request' command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
if not options.data_source and not (options.image_source and options.image_service):
self._parser.error(
'A request requires either a data source name or an image source+service name.')
if len(options.image_source) != len(options.image_service):
self._parser.error(
'A request must have a 1:1 correspondence between image source and image service arguments.'
)
captures = data_acquisition_pb2.AcquisitionRequestList()
captures.data_captures.extend(
[data_acquisition_pb2.DataCapture(name=data_name) for data_name in options.data_source])
img_captures = []
for i, src_name in enumerate(options.image_source):
img_service = options.image_service[i]
img_captures.append(
data_acquisition_pb2.ImageSourceCapture(image_service=img_service,
image_source=src_name))
captures.image_captures.extend(img_captures)
robot.time_sync.wait_for_sync(timeout_sec=1.0)
data_acquisition_client = robot.ensure_client(DataAcquisitionClient.default_service_name)
success = acquire_and_process_request(data_acquisition_client, captures, options.group_name,
options.action_name,
block_until_complete=not options.non_blocking_request)
return success
[docs]class DataAcquisitionServiceCommand(Command):
"""Get list of different data acquisition capabilities."""
NAME = 'info'
def __init__(self, subparsers, command_dict):
"""Get list of different data acquisition capabilities.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DataAcquisitionServiceCommand, self).__init__(subparsers, command_dict)
# Constants to describe width of columns for printing the data names and types
self._data_type_width = 15
self._data_name_width = 35
self._service_name_width = 35
self._has_live_data_width = 30
def _format_and_print_capability(self, data_type, data_name, service_name="", has_live_data=""):
"""Print the data acquisition capability.
Args:
data_type (string): Either image or data capabilities.
data_name (string): The name of the data acquisition capability
service_name(string): For image capabilities, a service name is required.
"""
print(
('{:' + str(self._data_type_width) + '} {:' + str(self._data_name_width) + '} {:' +
str(self._service_name_width) + '} {:' + str(self._has_live_data_width) + '}').format(
data_type, data_name, service_name, has_live_data))
def _run(self, robot, options):
"""Implementation of the 'info' command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
capabilities = robot.ensure_client(
DataAcquisitionClient.default_service_name).get_service_info()
print("Data Acquisition Service's Available Capabilities\n")
self._format_and_print_capability("Data Type", "Data Name", "(optional) Service Name",
"(optional) has_live_data")
print("-" * (self._data_type_width + self._data_name_width + self._service_name_width +
self._has_live_data_width))
for data_name in capabilities.data_sources:
self._format_and_print_capability("data", data_name.name, data_name.service_name,
str(data_name.has_live_data))
for img_service in capabilities.image_sources:
for img in img_service.image_source_names:
self._format_and_print_capability("image", img, img_service.service_name)
for ncb_worker in capabilities.network_compute_sources:
for model in ncb_worker.models.data:
self._format_and_print_capability("models", model.model_name,
ncb_worker.server_config.service_name)
return True
[docs]class DataAcquisitionStatusCommand(Command):
"""Get status of an acquisition request based on the request id."""
NAME = 'status'
def __init__(self, subparsers, command_dict):
"""Get status of an acquisition request based on the request id.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DataAcquisitionStatusCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('id', type=int, help='Response id to get the status for')
def _run(self, robot, options):
"""Implementation of the 'status' command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
False if NonexistentServiceError is caught, True otherwise.
"""
response = robot.ensure_client(DataAcquisitionClient.default_service_name).get_status(
options.id)
print(response)
return True
[docs]class DataAcquisitionGetLiveDataCommand(Command):
"""Call GetLiveData based on service name."""
NAME = 'live'
def __init__(self, subparsers, command_dict):
"""Call GetLiveData based on service name.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(DataAcquisitionGetLiveDataCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--data-source', metavar='DATA_SRC', default=[],
help='Data source name', action='append', required=True)
def _run(self, robot, options):
"""Implementation of the 'live' command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True once complete.
"""
daq_client = robot.ensure_client(DataAcquisitionClient.default_service_name)
request = data_acquisition_pb2.LiveDataRequest()
request.data_captures.extend(
[data_acquisition_pb2.DataCapture(name=data_name) for data_name in options.data_source])
response = daq_client.get_live_data(request)
print(response)
return True
[docs]class HostComputerIPCommand(Command):
"""Determine a computer's IP address."""
NAME = 'self-ip'
NEED_AUTHENTICATION = False
def __init__(self, subparsers, command_dict):
"""Determine the IP address of the current computer used to talk to the robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(HostComputerIPCommand, self).__init__(subparsers, command_dict)
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True
"""
print("The IP address of the computer used to talk to the robot is: %s" %
(bosdyn.client.common.get_self_ip(robot._name)))
[docs]class PowerCommand(Subcommands):
"""Send power commands to the robot."""
NAME = 'power'
def __init__(self, subparsers, command_dict):
"""Send power commands to the robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(PowerCommand, self).__init__(
subparsers,
command_dict,
[
PowerRobotCommand,
PowerPayloadsCommand,
PowerWifiRadioCommand,
])
[docs]class KeepaliveCommand(Subcommands):
"""Send keepalive commands to the robot."""
NAME = 'keepalive'
def __init__(self, subparsers, command_dict):
"""Send keepalive commands to the robot.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(KeepaliveCommand, self).__init__(subparsers, command_dict, [
KeepaliveGetStatusCommand,
KeepaliveRemovePoliciesCommand,
])
[docs]def lease_details(leases):
"""Returns list of <resource_name>:<sequence>, ...N."""
lease_strings = []
for lease in leases:
sequence_string = ", ".join([str(lease_seq) for lease_seq in lease.sequence])
lease_strings.append(f"{lease.resource}:[{sequence_string}]")
return ", ".join(lease_strings)
[docs]class KeepaliveGetStatusCommand(Command):
"""Get status of keepalive service."""
NAME = 'status'
def __init__(self, subparsers, command_dict):
"""Call KeepaliveService GetStatus RPC.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(KeepaliveGetStatusCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('-f', '--full', action='store_true', default=False,
help='Show full GetStatus proto as json.')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
client = robot.ensure_client(KeepaliveClient.default_service_name)
status = client.get_status()
# Give the option to print out the full message.
if options.full:
print(json_format.MessageToJson(status))
return True
# If there are no policies, there probably isn't anything interesting here.
if len(status.status) == 0:
print("No active policies")
return True
# List all live policies in a concise message.
if len(status.status) == 1:
print("Robot returned 1 live policy:")
else:
print(f"Robot returned {len(status.status)} live policies:")
for live_policy in status.status:
rough_robot_timestamp = status.header.response_timestamp.ToSeconds()
last_checkin = live_policy.last_checkin.ToSeconds()
time_elapsed = rough_robot_timestamp - last_checkin
# Go through each action, and create a helpful message describing the status.
action_list = []
for action in live_policy.policy.actions:
name = action.WhichOneof("action")
# If this is a lease_stale or auto_return action, include some lease info.
name_maybe_with_details = name
if action.HasField("lease_stale"):
name_maybe_with_details = f"{name_maybe_with_details} ({lease_details(action.lease_stale.leases)})"
if action.HasField("auto_return"):
name_maybe_with_details = f"{name_maybe_with_details} ({lease_details(action.auto_return.leases)})"
# Add an indicator if the policy action is active or not.
action_after_time = action.after.ToSeconds()
active_message = "NOT active"
if time_elapsed > action.after.ToSeconds():
active_message = "ACTIVE"
action_list.append(
f"{name_maybe_with_details} after {action_after_time} seconds, {active_message}"
)
formatted_string = f"id: {live_policy.policy_id}\n client_name: {live_policy.client_name}\n last checkin {time_elapsed}s ago"
# Only the supervisor policy seems to have a name.
if live_policy.policy.name:
formatted_string = formatted_string + f"\n policy name: '{live_policy.policy.name}'"
if live_policy.policy.user_id:
formatted_string = formatted_string + f"\n user_id: '{live_policy.policy.user_id}'"
if len(live_policy.policy.associated_leases) > 0:
formatted_string = formatted_string + f"\n associated_leases: {lease_details(live_policy.policy.associated_leases)}"
print(formatted_string)
print(" actions:")
for index, action in enumerate(action_list):
print(f" {action}")
print("")
# If there is an action control action, we should indicate that.
if len(status.active_control_actions) == 0:
return True
for action in status.active_control_actions:
enum_name = keepalive_pb2.GetStatusResponse.PolicyControlAction.Name(action)
print(f"Active control action: {enum_name}")
return True
[docs]class KeepaliveRemovePoliciesCommand(Command):
"""Remove keepalive policies."""
NAME = 'remove'
def __init__(self, subparsers, command_dict):
"""Remove keepalive policies.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(KeepaliveRemovePoliciesCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('--policy-id', type=int, nargs='+',
help='Specify specific policy ids to remove.')
def _run(self, robot, options):
"""Implementation of the command.
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
Returns:
True.
"""
client = robot.ensure_client(KeepaliveClient.default_service_name)
current_policies = client.get_status().status
current_policy_ids = [s.policy_id for s in current_policies]
# If there are no policies, there probably isn't anything interesting here.
if len(current_policies) == 0:
print("No active policies")
return True
to_rm = []
if options.policy_id:
# Remove specific policies.
for policy_id in options.policy_id:
if policy_id in current_policy_ids:
to_rm.append(policy_id)
else:
print(f"Policy '{policy_id}' not found.")
else:
# Remove all current policies.
to_rm = current_policy_ids
client.modify_policy(policy_ids_to_remove=to_rm)
print(f"Removed {len(to_rm)} policies")
return True
[docs]class PowerRobotCommand(Command):
"""Control the power of the entire robot."""
NAME = 'robot'
def __init__(self, subparsers, command_dict):
"""Power cycle or power off robot computers.
Note that this is only compatible with certain robots. Check HardwareConfiguration for details.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(PowerRobotCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('cmd', choices=['cycle', 'off'])
def _run(self, robot, options):
"""
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
robot.time_sync.wait_for_sync(timeout_sec=1.0)
lease_client = robot.ensure_client(bosdyn.client.lease.LeaseClient.default_service_name)
with bosdyn.client.lease.LeaseKeepAlive(lease_client, must_acquire=True,
return_at_exit=True):
power_client = robot.ensure_client(PowerClient.default_service_name)
if options.cmd == 'cycle':
power_cycle_robot(power_client)
elif options.cmd == 'off':
power_off_robot(power_client)
[docs]class PowerPayloadsCommand(Command):
"""Control the power of robot payloads."""
NAME = 'payload'
def __init__(self, subparsers, command_dict):
"""Power on or off robot payloads.
Note that this is only compatible with certain robots. Check HardwareConfiguration for details.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(PowerPayloadsCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('on_off', choices=['on', 'off'])
def _run(self, robot, options):
"""
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
robot.time_sync.wait_for_sync(timeout_sec=1.0)
lease_client = robot.ensure_client(bosdyn.client.lease.LeaseClient.default_service_name)
with bosdyn.client.lease.LeaseKeepAlive(lease_client, must_acquire=True,
return_at_exit=True):
power_client = robot.ensure_client(PowerClient.default_service_name)
if options.on_off == 'on':
power_on_payload_ports(power_client)
elif options.on_off == 'off':
power_off_payload_ports(power_client)
[docs]class PowerWifiRadioCommand(Command):
"""Control the power of robot wifi radio."""
NAME = 'wifi'
def __init__(self, subparsers, command_dict):
"""Power on or off robot LTE radio.
Note that this is only compatible with certain robots. Check HardwareConfiguration for details.
Args:
subparsers: List of argument parsers.
command_dict: Dictionary of command names which take parsed options.
"""
super(PowerWifiRadioCommand, self).__init__(subparsers, command_dict)
self._parser.add_argument('on_off', choices=['on', 'off'])
def _run(self, robot, options):
"""
Args:
robot: Robot object on which to run the command.
options: Parsed command-line arguments.
"""
robot.time_sync.wait_for_sync(timeout_sec=1.0)
lease_client = robot.ensure_client(bosdyn.client.lease.LeaseClient.default_service_name)
with bosdyn.client.lease.LeaseKeepAlive(lease_client, must_acquire=True,
return_at_exit=True):
power_client = robot.ensure_client(PowerClient.default_service_name)
if options.on_off == 'on':
power_on_wifi_radio(power_client)
elif options.on_off == 'off':
power_off_wifi_radio(power_client)
[docs]def main(args=None):
"""Command-line interface for interacting with robot services."""
parser = argparse.ArgumentParser(prog='bosdyn.client', description=main.__doc__)
add_common_arguments(parser, credentials_no_warn=True)
command_dict = {} # command name to fn which takes parsed options
subparsers = parser.add_subparsers(title='commands', dest='command')
# Register commands that can be run.
DirectoryCommands(subparsers, command_dict)
PayloadCommands(subparsers, command_dict)
FaultCommands(subparsers, command_dict)
RobotIdCommand(subparsers, command_dict)
LicenseCommand(subparsers, command_dict)
LogStatusCommands(subparsers, command_dict)
RobotStateCommands(subparsers, command_dict)
DataBufferCommands(subparsers, command_dict)
DataServiceCommands(subparsers, command_dict)
TimeSyncCommand(subparsers, command_dict)
LeaseCommands(subparsers, command_dict)
OldBecomeEstopCommand(subparsers, command_dict)
EstopCommands(subparsers, command_dict)
ImageCommands(subparsers, command_dict)
LocalGridCommands(subparsers, command_dict)
DataAcquisitionCommand(subparsers, command_dict)
HostComputerIPCommand(subparsers, command_dict)
PowerCommand(subparsers, command_dict)
KeepaliveCommand(subparsers, command_dict)
options = parser.parse_args(args=args)
setup_logging(verbose=options.verbose)
# Create robot object and authenticate.
sdk = bosdyn.client.create_standard_sdk('BosdynClient')
sdk.register_service_client(DataAcquisitionPluginClient)
robot = sdk.create_robot(options.hostname)
if not options.command:
print("Need to specify a command")
parser.print_help()
return False
if not command_dict[options.command].run(robot, options):
return False
return True