Source code for bosdyn.bddf.base_data_reader

# Copyright (c) 2021 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""BaseDataReader is a shared parent class for DataReader and StreamedDataReader."""
import struct

import bosdyn.api.bddf_pb2 as bddf

from .common import (BLOCK_HEADER_SIZE_MASK, BLOCK_HEADER_TYPE_MASK, ChecksumError, DATA_BLOCK_TYPE,
                     DESCRIPTOR_BLOCK_TYPE, DataFormatError, END_BLOCK_TYPE, LOGGER, MAGIC,
                     ParseError, SHA1_DIGEST_NBYTES)


[docs]class BaseDataReader: # pylint: disable=too-many-instance-attributes """Shared parent class for DataReader and StreamedDataReader.""" def __init__(self, infile=None, filename=None): """ At least one of the following arguments must be specified. Args: infile: binary file-like object for reading (e.g., from open(fname, "rb")). filename: path of input file, if applicable. """ self._file = infile self._filename = filename if not self._file: if not self._filename: raise ValueError("One of infile or filename must be specified") self._file = open(self._filename, 'rb') self._file_descriptor = None self._spec_index = None self._index_offset = None self._checksum = None self._read_checksum = None self._eof = False self._file_index = None self._read_header() @property def filename(self): """Return input file name, if specified, or None if not.""" return self._filename @property def file_descriptor(self): """Return the file descriptor from the start of the file/stream.""" return self._file_descriptor @property def version(self): """Return file version as a bosdyn.api.FileFormatVersion proto.""" return self._file_descriptor.version @property def annotations(self): """Return {key -> value} dict for file annotations.""" return self._file_descriptor.annotations @property def file_index(self): """Get the FileIndex proto used which describes how to access data in the file.""" return self._file_index @property def checksum(self): """160-bit checksum read from the end of the file, or None if not yet read.""" return self._checksum @property def read_checksum(self): """Override to compute checksum on reading, in stream-readers.""" return self._read_checksum def _computed_checksum(self): # pylint: disable=no-self-use """Override to compute checksum on reading, in stream-readers.""" return None
[docs] def series_spec_to_index(self, series_spec): """Given a series spec (map {key -> value}), return the series index for that series. Raises ValueError if no such series exists. """ return self._spec_index.index(series_spec)
def __del__(self): self._close() def __enter__(self): return self def _close(self): if not self._file: return self._file.close() self._file = None def __exit__(self, type_, value_, tb_): self._close() def _read(self, nbytes): assert nbytes block = self._file.read(nbytes) if not block: raise EOFError("Unexpected end of bddf file") return block def _read_header(self): magic = self._read(len(MAGIC)) if magic != MAGIC: raise ParseError("Bad magic bytes at the start of the file.") self._file_descriptor = self._read_desc_block("file_descriptor") if (self._file_descriptor.version.major_version != 1 or self._file_descriptor.version.minor_version != 0 or self._file_descriptor.version.patch_level != 0): raise DataFormatError("Unsupported file version: {}.{}.{}".format( self._file_descriptor.version.major_version, self._file_descriptor.version.minor_version, self._file_descriptor.version.patch_level)) checksum_type = self._file_descriptor.checksum_type # pylint: disable=no-member if checksum_type == bddf.FileFormatDescriptor.CHECKSUM_TYPE_UNKNOWN: raise DataFormatError("Unset checksum type in file descriptor") if checksum_type == bddf.FileFormatDescriptor.CHECKSUM_TYPE_NONE: LOGGER.debug("No checksum in bddf stream") elif checksum_type != bddf.FileFormatDescriptor.CHECKSUM_TYPE_SHA1: raise DataFormatError("Unknown checksum type {}".format(checksum_type)) if self._file_descriptor.checksum_num_bytes != SHA1_DIGEST_NBYTES: raise DataFormatError("Unexpected checksm num_bytes ({} != {}).".format( self._file_descriptor.checksum_num_bytes, SHA1_DIGEST_NBYTES)) def _read_proto(self, proto_type, nbytes): block = self._read(nbytes) descriptor = proto_type() descriptor.ParseFromString(block) return descriptor def _read_data_block(self): is_data, desc, data = self._read_block() assert is_data return desc, data def _read_desc_block(self, descriptor_type_name): is_data, desc, _data = self._read_block() assert not is_data assert not _data if desc.WhichOneof("DescriptorType") != descriptor_type_name: raise ParseError("Expected DescriptorType {} but got {}.".format( descriptor_type_name, desc.WhichOneof("DescriptorType"))) return getattr(desc, descriptor_type_name) def _read_block(self): (block_header,) = struct.unpack('<Q', self._read(8)) block_size = block_header & BLOCK_HEADER_SIZE_MASK block_type = (block_header & BLOCK_HEADER_TYPE_MASK) >> 56 if block_type == END_BLOCK_TYPE: self._index_offset = struct.unpack('<Q', self._read(8))[0] self._read_checksum = self._computed_checksum() # pylint: disable=assignment-from-none self._checksum = self._read(self._file_descriptor.checksum_num_bytes) self._eof = True # pylint: disable=no-member if (self._file_descriptor.checksum_type == bddf.FileFormatDescriptor.CHECKSUM_TYPE_SHA1 and self._read_checksum is not None and self._checksum != self._read_checksum): raise ChecksumError("File checksum 0x{} does not match computed value 0x{}".format( ''.join('{:02X}'.format(x) for x in self._checksum), ''.join('{:02X}'.format(x) for x in self._read_checksum))) raise EOFError("Normal end of bddf file") is_data_block = (block_type == DATA_BLOCK_TYPE) if not is_data_block: if block_type != DESCRIPTOR_BLOCK_TYPE: raise ParseError("Expected block_type {} but got {}.".format( DESCRIPTOR_BLOCK_TYPE, block_type)) return is_data_block, self._read_proto(bddf.DescriptorBlock, block_size), None (desc_size,) = struct.unpack('<I', self._read(4)) if desc_size > block_size: raise ParseError("Data block descriptor size {} > block size {}.".format( desc_size, block_size)) data_desc = self._read_proto(bddf.DataDescriptor, desc_size) data_size = block_size - desc_size data = self._read(data_size) return is_data_block, data_desc, data