Source code for bosdyn.bddf.pod_series_reader

# Copyright (c) 2021 Boston Dynamics, Inc.  All rights reserved.
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""A class for reading a series of POD data from a DataFile."""
from itertools import product
import struct

from .common import ParseError, POD_TYPE_TO_NUM_BYTES, POD_TYPE_TO_STRUCT

[docs]class PodSeriesReader: """A class for reading a series of POD data from a DataFile. Methods raise ParseError if there is a problem with the format of the file. """ def __init__(self, data_reader, series_spec): self._data_reader = data_reader self._series_index = self._data_reader.series_spec_to_index(series_spec) self._series_descriptor = self._data_reader.series_descriptor(self._series_index) if self._series_descriptor.WhichOneof("DataType") != "pod_type": raise ParseError("Expected DataType 'pod_type' but got {}.".format( self._series_descriptor.WhichOneof("DataType"))) self._pod_type = self._series_descriptor.pod_type self._num_values_per_sample = 1 for dim in self._pod_type.dimension: self._num_values_per_sample *= dim pod_type = self._pod_type.pod_type self._bytes_per_sample = POD_TYPE_TO_NUM_BYTES[pod_type] * self._num_values_per_sample self._num_data_blocks = None @property def pod_type(self): """Return the PodTypeDescriptor for the series.""" return self._pod_type @property def series_descriptor(self): """Return the SeriesDescriptor for the series.""" return self._series_descriptor @property def num_data_blocks(self): """Number of data blocks in this series.""" if self._num_data_blocks is None: self._num_data_blocks = self._data_reader.num_data_blocks(self._series_index) return self._num_data_blocks
[docs] def read_samples(self, index_in_series): """Return the POD data values from the data block of the given index. Returns: timestamp_nsec (int), POD data values (array of (array ... (of POD values))) """ _desc, timestamp_nsec, data =, index_in_series) num_samples = len(data) // self._bytes_per_sample expected_size = num_samples * self._bytes_per_sample if len(data) != expected_size: raise ParseError('{} idx={} expect {} elements but got {})'.format( self._series_descriptor.series_identifier, index_in_series, expected_size, len(data))) num_values = num_samples * self._num_values_per_sample format_str = '<{}{}'.format(num_values, POD_TYPE_TO_STRUCT[self._pod_type.pod_type]) pod_data = list(struct.unpack(format_str, data)) def _split(vals, dims): if not dims: return vals els_per_sample = product(dims) assert els_per_sample next_dims = dims[1:] return [ _split(vals[i:i + els_per_sample], next_dims) for i in range(0, len(vals), els_per_sample) ] split_data = _split(pod_data, self._pod_type.dimension) return timestamp_nsec, split_data