Source code for bosdyn.client.data_buffer

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Client for the data-buffer service.

This allows client code to log the following to the robot's data buffer: text-messages,
operator comments, blobs, signal ticks, and protobuf messages.

import functools
import logging
import sys
import threading
import time
import traceback
import uuid

from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp

import bosdyn.api.data_buffer_pb2 as data_buffer_protos
import bosdyn.api.data_buffer_service_pb2_grpc as data_buffer_service
from bosdyn import util as core_util
from bosdyn.api import parameter_pb2
from bosdyn.client import time_sync
from bosdyn.client.common import BaseClient, common_header_errors
from bosdyn.client.exceptions import Error, ResponseError, RpcError

[docs]class InvalidArgument(Error): """A given argument could not be used."""
[docs]def log_event( # pylint: disable=too-many-arguments,no-member robot, event_type, level, description, start_timestamp_secs, end_timestamp_secs=None, id_str=None, parameters=None, log_preserve_hint=data_buffer_protos.Event.LOG_PRESERVE_HINT_NORMAL): """Add an Event to the Data Buffer. Args: robot: A Robot object. event_type (string): The type of event. level (bosdyn.api.Event.Level): The relative importance of the event. description (string): A human-readable description of the event. start_timestamp_secs (float): Start of the event, in local time. end_timestamp_secs (float): End of the event. start_timestamp_secs is used if None. id_str (string): Unique id for event. A uuid is generated if None. parameters ([bosdyn.api.Parameter]): Parameters to attach to the event. log_preserve_hint (bosdyn.api.LogPreserveHint): Whether event should try to preserve log data. """ data_buffer_client = robot.ensure_client(DataBufferClient.default_service_name) if not id_str: id_str = str(uuid.uuid1()) robot.time_sync.wait_for_sync() robot_start_timestamp = robot.time_sync.robot_timestamp_from_local_secs(start_timestamp_secs) if end_timestamp_secs: robot_end_timestamp = robot.time_sync.robot_timestamp_from_local_secs(end_timestamp_secs) else: robot_end_timestamp = robot_start_timestamp # pylint: disable=no-member if isinstance(log_preserve_hint, bool): if log_preserve_hint: log_preserve_hint = data_buffer_protos.Event.LOG_PRESERVE_HINT_PRESERVE else: log_preserve_hint = data_buffer_protos.Event.LOG_PRESERVE_HINT_NORMAL event = data_buffer_protos.Event(type=event_type, description=description, source=robot.client_name, id=id_str, start_time=robot_start_timestamp, end_time=robot_end_timestamp, level=level, log_preserve_hint=log_preserve_hint) if parameters: for parameter in parameters: proto = event.parameters.add() proto.CopyFrom(parameter) data_buffer_client.add_events([event])
[docs]def make_parameter(label, value, units="", notes=""): """Create a parameter proto from a label and the parameter value.""" parameter = parameter_pb2.Parameter(label=label, units=units, notes=notes) if isinstance(value, bool): parameter.bool_value = value elif isinstance(value, int): parameter.int_value = value elif isinstance(value, float): parameter.float_value = value elif isinstance(value, Timestamp): parameter.timestamp.CopyFrom(value) elif isinstance(value, Duration): parameter.duration.CopyFrom(value) elif isinstance(value, str): parameter.string_value = value else: return None return parameter
[docs]class DataBufferClient(BaseClient): """A client for adding to robot data buffer.""" default_service_name = 'data-buffer' service_type = 'bosdyn.api.DataBufferService' def __init__(self): super(DataBufferClient, self).__init__(data_buffer_service.DataBufferServiceStub) self.log_tick_schemas = {} self._timesync_endpoint = None
[docs] def update_from(self, other): super(DataBufferClient, self).update_from(other) # Grab a timesync endpoint if it is available. try: self._timesync_endpoint = other.time_sync.endpoint except AttributeError: pass # other doesn't have a time_sync accessor
[docs] def add_text_messages(self, text_messages, **kwargs): """Log text messages to the robot. Args: text_messages (List[TextMessage]): Sequence of TextMessage protos. Raises: RpcError: Problem communicating with the robot. """ return self._do_add_text_messages(, text_messages, **kwargs)
[docs] def add_text_messages_async(self, text_messages, **kwargs): """Async version of add_text_messages.""" return self._do_add_text_messages(self.call_async, text_messages, **kwargs)
def _do_add_text_messages(self, func, text_messages, **kwargs): """Internal text message RPC stub call.""" request = data_buffer_protos.RecordTextMessagesRequest() request.text_messages.extend(text_messages) return func(self._stub.RecordTextMessages, request, value_from_response=None, error_from_response=common_header_errors, **kwargs)
[docs] def add_operator_comment(self, msg, robot_timestamp=None, **kwargs): """Add an operator comment to the robot log. Args: msg (string): Text of user comment to log. robot_timestamp(google.protobuf.Timestamp): Time of messages, in *robot time*. If not set, timestamp will be when the robot receives the message. Raises: RpcError: Problem communicating with the robot. """ return self._do_add_operator_comment(, msg, robot_timestamp, **kwargs)
[docs] def add_operator_comment_async(self, msg, robot_timestamp=None, **kwargs): """Async version of add_operator_comment.""" return self._do_add_operator_comment(self.call_async, msg, robot_timestamp, **kwargs)
def _do_add_operator_comment(self, func, msg, robot_timestamp=None, **kwargs): """Internal operator comment RPC stub call.""" request = data_buffer_protos.RecordOperatorCommentsRequest() robot_timestamp = robot_timestamp or self.now_in_robot_basis(msg_type="Operator Comment") # pylint: disable=no-member request.operator_comments.add(message=msg, timestamp=robot_timestamp) return func(self._stub.RecordOperatorComments, request, value_from_response=None, error_from_response=common_header_errors, **kwargs)
[docs] def add_blob(self, data, type_id, channel=None, robot_timestamp=None, write_sync=False, **kwargs): """Log blob messages to the data buffer. Args: data (bytes): Binary data of one blob. type_id (string): Type of binary data of blob. For example, this could be the full name of a protobuf message type. channel (string): The name by which messages are typically queried: often the same as type_id, or of the form '{prefix}/{type_id}'. robot_timestamp (google.protobuf.Timestamp): Time of messages, in *robot time*. Raises: RpcError: Problem communicating with the robot. """ return self._do_add_blob(, data, type_id, channel, robot_timestamp, write_sync, **kwargs)
[docs] def add_blob_async(self, data, type_id, channel=None, robot_timestamp=None, write_sync=False, **kwargs): """Async version of add_blob.""" return self._do_add_blob(self.call_async, data, type_id, channel, robot_timestamp, write_sync, **kwargs)
def _do_add_blob( # pylint: disable=too-many-arguments self, func, data, type_id, channel, robot_timestamp, write_sync, **kwargs): """Internal blob RPC stub call.""" request = data_buffer_protos.RecordDataBlobsRequest() if not channel: channel = type_id robot_timestamp = robot_timestamp or self.now_in_robot_basis(msg_type=type_id) request.blob_data.add( # pylint: disable=no-member timestamp=robot_timestamp, channel=channel, type_id=type_id, data=data) request.sync = write_sync return func(self._stub.RecordDataBlobs, request, value_from_response=None, error_from_response=common_header_errors, **kwargs)
[docs] def add_protobuf(self, proto, channel=None, robot_timestamp=None, write_sync=False): """Log protobuf messages to the data buffer. Args: proto (Protobuf message): Serializable protobuf to log. channel (string): Name of channel for data. If not set defaults to proto type name. robot_timestamp (google.protobuf.Timestamp): Time of proto, in *robot time*. Raises: RpcError: Problem communicating with the robot. """ return self._do_add_protobuf(self.add_blob, proto, channel, robot_timestamp, write_sync)
[docs] def add_protobuf_async(self, proto, channel=None, robot_timestamp=None, write_sync=False): """Async version of add_protobuf.""" return self._do_add_protobuf(self.add_blob_async, proto, channel, robot_timestamp, write_sync)
def _do_add_protobuf(self, func, proto, channel, robot_timestamp, write_sync): """Internal blob stub call, serializes proto and logs as blob.""" binary_data = proto.SerializeToString() robot_timestamp = robot_timestamp or self.now_in_robot_basis(proto=proto) type_id = proto.DESCRIPTOR.full_name channel = channel or type_id return func(data=binary_data, type_id=type_id, channel=channel, robot_timestamp=robot_timestamp, write_sync=write_sync)
[docs] def add_events(self, events, **kwargs): """Log event messages to the robot. Args: events (List[Event]): Sequence of Event protos. Raises: RpcError: Problem communicating with the robot. """ return self._do_add_events(, events, **kwargs)
[docs] def add_events_async(self, events, **kwargs): """Async version of add_events.""" return self._do_add_events(self.call_async, events, **kwargs)
def _do_add_events(self, func, events, **kwargs): """Internal event stub call.""" request = data_buffer_protos.RecordEventsRequest() for event in events: # pylint: disable=no-member return func(self._stub.RecordEvents, request, value_from_response=None, error_from_response=common_header_errors, **kwargs)
[docs] def register_signal_schema(self, variables, schema_name, **kwargs): """Log signal schema to the robot. Args: variables (List[SignalSchema.Variable]): List of SignalSchema variables defining what is in tick. schema_name (string): Name of schema (defined previously by client). Raises: RpcError: Problem communicating with the robot. """ return self._do_register_signal_schema(, variables, schema_name, **kwargs)
[docs] def register_signal_schema_async(self, variables, schema_name, **kwargs): """Async version of register_signal_schema""" return self._do_register_signal_schema(self.call_async, variables, schema_name, **kwargs)
def _do_register_signal_schema(self, func, variables, schema_name, **kwargs): """Internal register stub call.""" tick_schema = data_buffer_protos.SignalSchema(vars=variables, schema_name=schema_name) request = data_buffer_protos.RegisterSignalSchemaRequest() request.schema.CopyFrom(tick_schema) # pylint: disable=no-member # Schemas are saved internally, according to their schema ID. We need to wait for the # response from the server to get the schema id. The response does not include the schema # itself so use a partial to process the response appropriately. value_from_response = functools.partial(self._save_schema_id, tick_schema) return func(self._stub.RegisterSignalSchema, request, value_from_response=value_from_response, error_from_response=common_header_errors, **kwargs)
[docs] def add_signal_tick( # pylint: disable=too-many-arguments,no-member self, data, schema_id, encoding=data_buffer_protos.SignalTick.ENCODING_RAW, sequence_id=0, source="client", **kwargs): """Log signal data to the robot data buffer. Schema should be sent before any ticks. Args: data (bytes): Single hunk of binary data. schema_id (int): ID name of schema (obtained from a previous schema registration) encoding (SignalTick.Encoding): Encoding of the data sequence_id (int): Index of which sequence tick this is source (string): String name representing client Raises: RpcError: Problem communicating with the robot. LookupError: The schema_id is unknown (not previously registered by this client) """ return self._do_add_signal_tick(, data, schema_id, encoding, sequence_id, source, **kwargs)
[docs] def add_signal_tick_async( # pylint: disable=too-many-arguments,no-member self, data, schema_id, encoding=data_buffer_protos.SignalTick.ENCODING_RAW, sequence_id=0, source="client", **kwargs): """Async version of add_signal_tick.""" return self._do_add_signal_tick(self.call_async, data, schema_id, encoding, sequence_id, source, **kwargs)
def _do_add_signal_tick( # pylint: disable=too-many-arguments self, func, data, schema_id, encoding, sequence_id, source, **kwargs): """Internal add signal tick stub call.""" if schema_id not in self.log_tick_schemas: raise LookupError('The log tick schema id "{}" is unknown'.format(schema_id)) request = data_buffer_protos.RecordSignalTicksRequest() request.tick_data.add( # pylint: disable=no-member sequence_id=sequence_id, source=source, schema_id=schema_id, encoding=encoding, data=data) return func(self._stub.RecordSignalTicks, request, value_from_response=None, error_from_response=common_header_errors, **kwargs) def _save_schema_id(self, schema, response): """Return schema id from response, after saving the schema in a dict indexed by id.""" self.log_tick_schemas[response.schema_id] = schema return response.schema_id
[docs] def now_in_robot_basis(self, msg_type=None, proto=None): """Get current time in robot clock basis if possible, None otherwise.""" if self._timesync_endpoint: try: converter = self._timesync_endpoint.get_robot_time_converter() except time_sync.NotEstablishedError: # No timesync. That's OK -- the receiving host will provide the timestamp. self.logger.debug( 'Could not timestamp message of type %s', (msg_type if msg_type is not None else (proto.DESCRIPTOR.full_name if proto is not None else 'Unknown'))) else: return converter.robot_timestamp_from_local_secs(time.time()) return None
[docs]class LoggingHandler(logging.Handler): # pylint: disable=too-many-instance-attributes """A logging system Handler that will publish text to the data-buffer service. Args: service: Name of the service. See LogAnnotationTextMessage. data_buffer_client: API client that will send log messages. level: Python logging level. Defaults to NOTSET. time_sync_endpoint: A TimeSyncEndpoint, already synchronized to the remote clock. rpc_timeout: Timeout on RPCs made by data_buffer_client. msg_num_limit: If number of messages reaches this number, send data with data_buffer_client. msg_age_limit: If messages have been sitting locally for this many seconds, send data with data_buffer_client. skip_rpcs: Do not log any messages for RPC sending. Raises: log_annotation.InvalidArgument: The TimeSyncEndpoint is not valid. """ def __init__( # pylint: disable=too-many-arguments self, service, data_buffer_client, level=logging.NOTSET, time_sync_endpoint=None, rpc_timeout=1, msg_num_limit=10, msg_age_limit=1, skip_rpcs=False): logging.Handler.__init__(self, level=level) self.addFilter(is_not_text_log) if skip_rpcs: self.addFilter(is_not_rpc) self.msg_age_limit = msg_age_limit self.msg_num_limit = msg_num_limit self.rpc_timeout = rpc_timeout self.service = service self.time_sync_endpoint = time_sync_endpoint if self.time_sync_endpoint and not self.time_sync_endpoint.has_established_time_sync: raise InvalidArgument('time_sync_endpoint must have already established timesync!') # If we have this many unsent messages in the queue after a failure to send, # "dump" the messages to stdout. self._dump_msg_count = 20 # Internal tracking of errors. self._num_failed_sends = 0 self._num_failed_sends_sequential = 0 # If we have this many failed sends in a row, stop the send thread. self._limit_failed_sends_sequential = 5 # Event to trigger immediate flush of messages to the log client. self._flush_event = threading.Event() # How long to wait for flush events. Dictates non-flush update rate. self._flush_event_wait_time = 0.1 # Last time "emit" was called. self._last_emit_time = 0 self._data_buffer_client = data_buffer_client self._lock = threading.Lock() self._msg_queue = [] self._send_thread = threading.Thread(target=self._run_send_thread) # Set to stop the message send thread. self._shutdown_event = threading.Event() # This apparently needs to be a daemon thread to play nicely with python's Handler shutdown # procedure. self._send_thread.daemon = True self._send_thread.start() def __enter__(self): """Optionally use this as a ContextManager to be more cautious about sending messages.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """To ensure all messages have been sent to the best of our ability, call close().""" self.close()
[docs] def emit(self, record): msg = self.record_to_msg(record) with self._lock: self._msg_queue.append(msg) self._last_emit_time = time.time()
[docs] def flush(self): self._flush_event.set()
[docs] def close(self): self._shutdown_event.set() self._send_thread.join() # One last attempt to send any messages. if self._msg_queue: try: self._data_buffer_client.add_text_messages(self._msg_queue, timeout=self.rpc_timeout) # Catch all client library errors. except Error: self._num_failed_sends += 1 with self._lock: self._dump_msg_queue() logging.Handler.close(self)
[docs] def is_thread_alive(self): """Return true if send-thread is running.""" return self._send_thread.is_alive()
[docs] def restart(self, data_buffer_client): """Restart the send thread. Raises: AssertionError if send thread is still alive. """ assert not self.is_thread_alive() self._num_failed_sends_sequential = 0 self._data_buffer_client = data_buffer_client self._send_thread = threading.Thread(target=self._run_send_thread) self._send_thread.daemon = True self._send_thread.start()
def _dump_msg_queue(self): """Pop all of the message queue, using fallback_log to try and capture them. Should be called with the lock held. """ self.fallback_log('Dumping {} messages!'.format(len(self._msg_queue))) for msg in self._msg_queue: self.fallback_log(msg) del self._msg_queue[:]
[docs] @staticmethod def fallback_log(msg): """Handle log messages that were failed to be sent by printing to the console.""" print(msg, file=sys.stderr)
def _run_send_thread(self): while (self._num_failed_sends_sequential < self._limit_failed_sends_sequential and not self._shutdown_event.is_set()): flush = self._flush_event.wait(self._flush_event_wait_time) msg_age = time.time() - self._last_emit_time with self._lock: num_msgs = len(self._msg_queue) to_send = self._msg_queue[:num_msgs] send_now = num_msgs >= 1 and (flush or msg_age >= self.msg_age_limit or num_msgs >= self.msg_num_limit) if send_now: self._flush_event.clear() send_errors = 0 error_limit = 2 sent = False while send_errors < error_limit and not self._shutdown_event.is_set(): try: self._data_buffer_client.add_text_messages(to_send, timeout=self.rpc_timeout) except (ResponseError, RpcError): self.fallback_log('Error:\n{}'.format(traceback.format_exc())) send_errors += 1 except: # pylint: disable=bare-except # Catch all other exceptions and log them. self.fallback_log('Unexpected exception!\n{}'.format( traceback.format_exc())) break else: sent = True break # Default to possibly dumping messages. maybe_dump = True if sent: # We successfully sent logs to the log service! Delete relevant local cache. with self._lock: del self._msg_queue[:num_msgs] maybe_dump = False self._num_failed_sends_sequential = 0 elif send_errors >= error_limit: self._num_failed_sends += 1 self._num_failed_sends_sequential += 1 elif self._shutdown_event.is_set(): # Don't dump if we're shutting down; we'll clear the messages in close(). maybe_dump = False else: # We can hit this state if # 1) We break out of the above loop without setting sent = True # 2) There is a logic bug in the above handling code / while loop. function_name = traceback.extract_stack()[-1][2] self.fallback_log('Unexpected condition in {}.{}!'.format( self.__class__.__name__, function_name)) # If we decided we may need to dump the message queue... if maybe_dump: with self._lock: if len(self._msg_queue) >= self._dump_msg_count: self._dump_msg_queue()
[docs] def record_to_msg(self, record: logging.LogRecord): """Convert logging record to TextMessage proto.""" level = self.record_level_to_proto_level(record.levelno) msg = data_buffer_protos.TextMessage(source=self.service, level=level, message=self.format(record), filename=record.filename, line_number=record.lineno) # pylint: disable=no-member if self.time_sync_endpoint is not None: try: msg.timestamp.CopyFrom( self.time_sync_endpoint.robot_timestamp_from_local_secs(time.time())) except time_sync.NotEstablishedError: # If timestamp is not set in the proto, data-buffer will timestamp it on receipt. msg.message = '(No time sync!): ' + msg.message else: msg.timestamp.CopyFrom(core_util.now_timestamp()) return msg
[docs] @staticmethod def record_level_to_proto_level(record_level): """Convert logging record level to TextMessage proto level.""" # pylint: disable=no-member if record_level >= logging.ERROR: return data_buffer_protos.TextMessage.LEVEL_ERROR if record_level >= logging.WARNING: return data_buffer_protos.TextMessage.LEVEL_WARN if record_level >= logging.INFO: return data_buffer_protos.TextMessage.LEVEL_INFO return data_buffer_protos.TextMessage.LEVEL_DEBUG
[docs]def is_not_text_log(record: logging.LogRecord) -> bool: """Filter out the RecordMessages calls that the handler sends so that we do not go into an infinite loop.""" return not'.DataBufferService.RecordTextMessages')
[docs]def is_not_rpc(record: logging.LogRecord) -> bool: """Because our loggers use the form sdk_name.robot_name.service_name.rpc_method, we can't easily just turn off all logging for rpcs. This function identifies the rpc logging calls to be able to strip them out.""" return not (record.module == 'common' and (record.funcName == 'call' or record.funcName == 'call_async'))