# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""A class for reading message data from a DataFile."""
from deprecated.sphinx import deprecated
from .common import PROTOBUF_CONTENT_TYPE
[docs]class MessageReader:
"""A class for reading message data from a DataFile.
Methods raise ParseError if there is a problem with the format of the file.
"""
def __init__(self, data_reader, require_protobuf=False):
self._data_reader = data_reader
self._channel_name_to_series_descriptor = {}
self._channel_name_to_series_index = {}
for series_index, series_identifier in enumerate(data_reader.file_index.series_identifiers):
try:
channel_name = series_identifier.spec["bosdyn:channel"]
except KeyError:
continue # Not a protobuf series
series_descriptor = self._data_reader.series_descriptor(series_index)
if series_descriptor.WhichOneof("DataType") != "message_type":
continue # Not a protobuf series
message_type = series_descriptor.message_type
if require_protobuf and message_type.content_type != PROTOBUF_CONTENT_TYPE:
continue # Not a protobuf series
self._channel_name_to_series_descriptor[channel_name] = series_descriptor
self._channel_name_to_series_index[channel_name] = series_descriptor.series_index
@property
def data_reader(self):
"""Return underlying DataReader this object is using."""
return self._data_reader
@property
def channel_name_to_series_descriptor(self):
"""Return a mapping of {channel name -> series descriptor} for message series."""
return self._channel_name_to_series_descriptor
@property
@deprecated(reason='Use channel_name_to_series_descriptor instead', version='3.1.0')
def channel_name_to_series_decriptor(self):
"""Return a mapping of {channel name -> series descriptor} for message series."""
return self._channel_name_to_series_descriptor
[docs] def series_index(self, channel_name, message_type=None):
"""Return series index (int) to access SeriesDescriptors and messages.
Args:
channel_name: name of the channel of messages.
message_type: specify message type, if channel may have multiple
kinds of messages (optional)
"""
if message_type is None:
return self._channel_name_to_series_index[channel_name]
for series_index, series_identifier in enumerate(
self._data_reader.file_index.series_identifiers):
try:
series_channel = series_identifier.spec["bosdyn:channel"]
except KeyError:
continue # Not a protobuf series
if series_channel != channel_name:
continue
series_descriptor = self._data_reader.series_descriptor(series_index)
if series_descriptor.WhichOneof("DataType") != "message_type":
continue # Not a protobuf series
if message_type == series_descriptor.message_type.type_name:
return series_index
raise KeyError("No series with channel_name={} and message_type={}".format(
channel_name, message_type))
[docs] def series_index_to_descriptor(self, series_index):
"""Given a series index, return the associated SeriesDescriptor
Args:
series_index: index (int) from the series_index() call
"""
return self._data_reader.file_index.series_descriptor(series_index)
[docs] def get_blob(self, series_index, index_in_series):
"""Return binary data from message stored in the file.
Args:
series_index: index (int) from the series_index() call
index_in_series: the index of the message within the series
Returns: DataTypeDescriptor for channel, timestamp_nsec (int), binary data
"""
return self._data_reader.read(series_index, index_in_series)