# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Assists with writing POD data values into a series, within a DataWriter."""
import struct
from .bosdyn import MessageChannel
from .common import POD_TYPE_TO_NUM_BYTES, POD_TYPE_TO_STRUCT, DataFormatError
[docs]class PodSeriesWriter: # pylint: disable=too-many-instance-attributes
"""A class to assist with writing POD data values into a series, within a DataWriter."""
def __init__( # pylint: disable=too-many-arguments
self, data_writer, series_type, series_spec, pod_type, dimensions=None,
annotations=None, data_block_size=2048):
self._data_writer = data_writer
self._series_type = series_type
self._series_spec = series_spec
self._pod_type = pod_type
self._dimensions = dimensions or []
self._series_index = self._data_writer.add_pod_series(self.series_type, self.series_spec,
type_enum=self._pod_type,
dimension=self._dimensions,
annotations=annotations)
self._data_block_size = data_block_size
self._num_values_per_sample = 1
for dim in self._dimensions:
self._num_values_per_sample *= dim
self._bytes_per_sample = POD_TYPE_TO_NUM_BYTES[pod_type] * self._num_values_per_sample
self._block = b''
self._timestamp_nsec = None
self._format_str = '<{}{}'.format(self._num_values_per_sample, POD_TYPE_TO_STRUCT[pod_type])
self._data_writer.run_on_close(self.finish_block)
[docs] def write(self, timestamp_nsec, sample):
"""Add sample to data block, and write block if block is full.
A block is full if there is no room for an additional sample within the data_block_size.
Args:
timestamp_nsec: nsec since unix epoch to timestamp the data
sample: array/vector of POD values to write
Raises DataFormatError if the data is invalid for this series.
"""
serialized_sample = struct.pack(self._format_str, sample)
if len(serialized_sample) != self._bytes_per_sample:
raise DataFormatError('{} expect {} elements but got {})'.format(
self._series_spec, self._bytes_per_sample, len(serialized_sample)))
if self._bytes_per_sample >= self._data_block_size:
# We should always immediately write data.
self._data_writer.write_data(self._series_index, timestamp_nsec, serialized_sample)
return
if not self._block:
# New block, starts with current timestamp.
self._timestamp_nsec = timestamp_nsec
self._block = serialized_sample
else:
# Add data to pre-existing block
self._block += serialized_sample
if len(self._block) + self._bytes_per_sample > self._data_block_size:
# No room for more samples before the data must be written.
self._data_writer.write_data(self._series_index, self._timestamp_nsec, self._block)
self._block = b''
[docs] def finish_block(self):
"""If there are samples which haven't been written to the file, write them now."""
if not self._block:
return
self._data_writer.write_data(self._series_index, self._timestamp_nsec, self._block)
self._block = b''
@property
def series_type(self):
"""Return the series_type (string) with which the series was registered."""
return MessageChannel.SERIES_TYPE
@property
def series_spec(self):
"""Return the series_spec ({key -> value}) with which the series was registered."""
return self._series_spec