Source code for bosdyn.bddf.stream_data_reader

# Copyright (c) 2021 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Data reader which reads the file format from a stream, without seeking."""
from hashlib import sha1

from .base_data_reader import BaseDataReader
from .common import ParseError
from .file_indexer import FileIndexer


[docs]class StreamDataReader(BaseDataReader): """Data reader which reads the file format from a stream, without seeking.""" def __init__(self, outfile): """ Args: outfile: binary file-like object for reading (e.g., from open(fname, "rb")). """ self._hasher = sha1() # This computes a checksum super(StreamDataReader, self).__init__(outfile) self._indexer = FileIndexer() self._series_index_to_block_index = {} # {series_index -> SeriesBlockIndex} def _read(self, nbytes): block = BaseDataReader._read(self, nbytes) self._hasher.update(block) return block @property def read_checksum(self): """64-bit checksum read from the end of the file, or None if not yet read.""" return self._read_checksum @property def stream_file_index(self): """Return the file index as parsed from the stream.""" return self._indexer.file_index def _computed_checksum(self): return self._hasher.digest()
[docs] def series_descriptor(self, series_index): """Return SeriesDescriptor for given series index. Returns KeyError if no such series exists. """ return self._indexer.series_descriptor(series_index)
[docs] def read_data_block(self): """Read and return next data block. Returns: DataDescriptor, SeriesDescriptor, data (bytes) Raises ParseError if there is a problem with the format of the file, EOFError if the end of the file is reached. """ while True: is_data, desc, data = self.read_next_block() if not is_data: continue return desc, self.series_descriptor(desc.series_index), data
[docs] def read_next_block(self): """Read and return next block. Returns: True, DataDescriptor, data (bytes) for data block Returns: False, DescriptorBlock, None for descriptor block Raises ParseError if there is a problem with the format of the file, EOFError if the end of the file is reached. """ file_offset = self._file.tell() try: is_data, desc, data = self._read_block() except EOFError as err: self._eof = True raise err if is_data: self._indexer.index_data_block(desc.series_index, desc.timestamp.ToNanoseconds(), len(data), file_offset, desc.additional_indexes) else: desc_type = desc.WhichOneof("DescriptorType") if desc_type == 'file_index': self._file_index = desc.file_index elif desc_type == 'file_descriptor': pass # Don't expect this after the first block, though. elif desc_type == 'series_descriptor': series_descriptor = desc.series_descriptor self._indexer.add_series_descriptor(series_descriptor, file_offset) elif desc_type == 'series_block_index': series_block_index = desc.series_block_index self._series_index_to_block_index[ series_block_index.series_index] = series_block_index else: raise ParseError("Unknown DescriptorType %s" % desc_type) return is_data, desc, data
@property def series_block_indexes(self): """Returns the current list of SeriesBlockIndexes: series_index -> SeriesBlockIndex.""" return self._indexer.series_block_indexes
[docs] def series_block_index(self, series_index): """Returns the SeriesBlockIndexes for the given series_index.""" return self._indexer.series_block_indexes[series_index]
@property def eof(self): """Returns True if all blocks in the file have been read.""" return self._eof