Source code for bosdyn.bddf.file_indexer

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""A FileIndexer is an object which keeps an index of series and blocks within series"""

import struct
from hashlib import sha1

import bosdyn.api.bddf_pb2 as bddf

from .common import AddSeriesError, DataFormatError, SeriesNotUniqueError


def _hasher_to_uint64(hasher):
    return struct.unpack('>Q', hasher.digest()[0:8])[0]


[docs]class FileIndexer: """An object which keeps an index of series and blocks within series. It can write a block index at the end of a data file. """ def __init__(self): # DescriptorBlock proto for the FileIndex self._descriptor_index = bddf.DescriptorBlock() self._series_descriptors = [] # series_idx -> SeriesDescriptor self._series_block_indexes = [] # series_index -> SeriesBlockIndex @property def file_index(self): """Get the FileIndex proto used which describes how to access data in the file.""" return self._descriptor_index.file_index # pylint: disable=no-member @property def descriptor_index(self): """Get the Descriptor proto containing the FileIndex.""" return self._descriptor_index @property def series_block_indexes(self): """Returns the current list of SeriesBlockIndexes: series_index -> SeriesBlockIndex.""" return self._series_block_indexes
[docs] def series_descriptor(self, series_index): """Return SeriesDescriptor for given series index.""" return self._series_descriptors[series_index]
[docs] @staticmethod def series_identifier_to_hash(series_identifier): """Given a SeriesIdentifier, return a 64-bit hash.""" hasher = sha1() hasher.update(series_identifier.series_type.encode('utf-8')) for key in sorted(series_identifier.spec.keys()): hasher.update(key.encode('utf-8')) hasher.update((series_identifier.spec[key]).encode('utf-8')) return _hasher_to_uint64(hasher)
[docs] def add_series_descriptor(self, series_descriptor, series_block_file_offset): """Add the given series_descriptor to the index, with the given file offset. Args: series_descriptor SeriesDescriptor to add to the index series_block_file_offset Location in file where SeriesDescriptor will be written, or was read from. """ assert series_descriptor.series_index == len(self._series_descriptors) # Update the file index. self.file_index.series_identifiers.add().CopyFrom(series_descriptor.series_identifier) self.file_index.series_identifier_hashes.append(series_descriptor.identifier_hash) self._series_descriptors.append(series_descriptor) self._series_block_indexes.append( bddf.SeriesBlockIndex(series_index=series_descriptor.series_index, descriptor_file_offset=series_block_file_offset))
[docs] def add_series( # pylint: disable=too-many-arguments self, series_type, series_spec, message_type, pod_type, annotations, additional_index_names, writer): """Register a new series for messages for a DataWriter. Args: series_type: the kind of spec, corresponding to the set of keys expected in series_spec. series_spec: dict of {key (string) -> value (string)} describing the series. message_type: MessageTypeDescriptor (need EITHER this OR pod_type) pod_type: PodTypeDescriptor (need EITHER this OR pod_type) annotations: optional dict of key (string) -> value (string) pairs to associate with the message channel additional_index_names: names of additional timestamps to store with each message (list of string). writer: BlockWriter owned by the DataWriter. Returns series id (int). Raises SeriesNotUniqueError if a series matching series_spec is already added. """ # pylint: disable=no-member series_index = len(self._series_descriptors) # Write the descriptor block. descriptor = bddf.DescriptorBlock() series_descriptor = descriptor.series_descriptor series_descriptor.series_index = series_index series_identifier = series_descriptor.series_identifier series_identifier.series_type = series_type series_identifier.spec.update(series_spec) series_descriptor.identifier_hash = self.series_identifier_to_hash(series_identifier) # Ensure the series_spec is unique in the file. for prev_series_identifier in self.file_index.series_identifiers: if prev_series_identifier.spec == series_identifier.spec: raise SeriesNotUniqueError( "Spec %s is not unique within the data file" % series_identifier.spec) if message_type: if pod_type: raise AddSeriesError("Specified both message_type ({}) and pod_type ({})".format( message_type, pod_type)) series_descriptor.message_type.CopyFrom(message_type) else: if not pod_type: raise AddSeriesError("Specified neither message_type nor pod_type") series_descriptor.pod_type.CopyFrom(pod_type) if annotations: series_descriptor.annotations.update(annotations) if additional_index_names: for name in additional_index_names: series_descriptor.additional_index_names.append(name) series_block_file_offset = writer.tell() writer.write_descriptor_block(descriptor) self.add_series_descriptor(series_descriptor, series_block_file_offset) return series_index
[docs] def index_data_block( # pylint: disable=too-many-arguments self, series_index, timestamp_nsec, file_offset, nbytes, additional_indexes): """Add an entry to the data block index of the series identified by series_index.""" series_block_index = self._series_block_indexes[series_index] block_entry = series_block_index.block_entries.add(file_offset=file_offset) block_entry.timestamp.FromNanoseconds(timestamp_nsec) series_block_index.total_bytes += nbytes if additional_indexes: for idx_val in additional_indexes: block_entry.additional_indexes.append(idx_val) # pylint: disable=no-member
[docs] def make_data_descriptor(self, series_index, timestamp_nsec, additional_indexes): """Return DataDescriptor for writing a data block, and add the block to the series index.""" series_descriptor = self._series_descriptors[series_index] data_descriptor = bddf.DataDescriptor(series_index=series_index) data_descriptor.timestamp.FromNanoseconds(timestamp_nsec) # pylint: disable=no-member additional_indexes = additional_indexes or [] if len(additional_indexes) != len(series_descriptor.additional_index_names): raise DataFormatError('Series {} needs {} additional indexes, but {} provided.'.format( series_descriptor, len(series_descriptor.additional_index_names), len(additional_indexes))) if additional_indexes: for idx_val in additional_indexes: data_descriptor.additional_indexes.append(idx_val) # pylint: disable=no-member return data_descriptor
[docs] def write_index(self, block_writer): """Write all the indexes of the data file, and the file end.""" # Write all the block indexes for block_index in self.series_block_indexes: # Record the location of the block index. self.file_index.series_block_index_offsets.append(block_writer.tell()) # Write the block index. block = bddf.DescriptorBlock() block.series_block_index.CopyFrom(block_index) # pylint: disable=no-member block_writer.write_descriptor_block(block) index_offset = block_writer.tell() block_writer.write_descriptor_block(self.descriptor_index) block_writer.write_file_end(index_offset)