Source code for bosdyn.bddf.data_reader

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Class for reading data from a file-like object which is seekable."""
import os
import struct

from .base_data_reader import BaseDataReader
from .common import END_MAGIC, INDEX_OFFSET_OFFSET, MAGIC, ParseError


[docs]class DataReader(BaseDataReader): # pylint: disable=too-many-instance-attributes """Class for reading data from a file-like object which is seekable. Methods raise ParseError if there is a problem with the format of the file. """ def __init__(self, infile=None, filename=None): """ At least one of the following arguments must be specified. Args: infile: binary file-like object for reading (e.g., from open(fname, "rb")). filename: path of input file, if applicable. """ super(DataReader, self).__init__(infile, filename) self._series_index_to_descriptor = {} self._series_index_to_block_index = {} # {series_index -> SeriesBlockIndex} self._read_index()
[docs] def series_descriptor(self, series_index): """Return SeriesDescriptor for given series index, loading it if necessary.""" try: return self._series_index_to_descriptor[series_index] except KeyError: pass series_block_index = self.series_block_index(series_index) desc = self._read_desc_block_at("series_descriptor", series_block_index.descriptor_file_offset) self._series_index_to_descriptor[series_index] = desc return desc
[docs] def num_data_blocks(self, series_index): """Returns the number of data blocks for a given series in the file.""" return len(self.series_block_index(series_index).block_entries)
[docs] def total_bytes(self, series_index): """Returns the total number of bytes for data in a given series in the file.""" return self.series_block_index(series_index).total_bytes
[docs] def read(self, series_index, index_in_series): """Retrieves a message and related information from the file. Args: series_index: int selecting from which series to read the message. index_in_series: The index number of the message within the channel. Returns: DataTypeDescriptor for channel, timestamp_nsec (int), message-data (bytes) Raises ParseError if there is a problem with the format of the file. """ series_block_index = self.series_block_index(series_index) msg_idx = series_block_index.block_entries[index_in_series] desc, data = self._read_data_block_at(msg_idx.file_offset) return desc, msg_idx.timestamp.ToNanoseconds(), data
[docs] def series_block_index(self, series_index): """Returns the SeriesBlockIndexes for the given series_index, loading it as needed.""" try: return self._series_index_to_block_index[series_index] except KeyError: pass # Need to load the block index for this series. offset = self.file_index.series_block_index_offsets[series_index] block_index = self._read_desc_block_at('series_block_index', offset) self._series_index_to_block_index[series_index] = block_index return block_index
def _read_index(self): self._file.seek(-len(END_MAGIC), os.SEEK_END) end_magic = self._read(len(END_MAGIC)) if end_magic != END_MAGIC: raise ParseError("Bad magic bytes at the end of the file.") self._file.seek(-INDEX_OFFSET_OFFSET, os.SEEK_END) self._index_offset, self._checksum = struct.unpack('<QQ', self._read(16)) if self._index_offset < len(MAGIC): raise ParseError('Invalid offset to index: {})'.format(self._index_offset)) self._file_index = self._read_desc_block_at("file_index", self._index_offset) self._spec_index = [{ key: value for key, value in desc.spec.items() } for desc in self._file_index.series_identifiers] def _seek_to(self, location): if location < len(MAGIC): raise ParseError('Invalid offset for block: {})'.format(location)) self._file.seek(location) def _read_data_block_at(self, location): self._seek_to(location) return self._read_data_block() def _read_desc_block_at(self, descriptor_type_name, location): self._seek_to(location) return self._read_desc_block(descriptor_type_name)