# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""BlockWriter writes basic data structures in the bddf file."""
import struct
from hashlib import sha1
import bosdyn.api.bddf_pb2 as bddf
from .common import (BLOCK_HEADER_SIZE_MASK, DATA_BLOCK_TYPE, DESCRIPTOR_BLOCK_TYPE, END_BLOCK_TYPE,
END_MAGIC, MAGIC, SHA1_DIGEST_NBYTES, DataFormatError)
[docs]class BlockWriter:
"""Writes data structures in the data file."""
def __init__(self, outfile):
self._outfile = outfile
self._hasher = sha1()
[docs] def tell(self):
"""Return location from start of file."""
return self._outfile.tell()
[docs] def write_descriptor_block(self, block):
"""Write a DescriptorBlock to the file."""
serialized = block.SerializeToString()
self._write_block_header(DESCRIPTOR_BLOCK_TYPE, len(serialized))
self._write(serialized)
[docs] def write_data_block(self, desc_block, data):
"""Write a block of data to the file."""
serialized_desc = desc_block.SerializeToString()
self._write_block_header(DATA_BLOCK_TYPE, len(data) + len(serialized_desc))
self._write(struct.pack('<I', len(serialized_desc)))
self._write(serialized_desc)
self._write(data)
def _write(self, data):
self._hasher.update(data)
self._outfile.write(data)
[docs] def close(self):
"""Close the file, if not already closed."""
if self.closed:
return
self._outfile.close()
self._outfile = None
@property
def closed(self):
"""Returns True if the writer has been closed."""
return self._outfile is None
[docs] def write_file_end(self, index_offset):
"""Write the end of the data file."""
self._write_block_header(END_BLOCK_TYPE, 24)
self._write(struct.pack('<Q', index_offset))
self._outfile.write(self._hasher.digest())
self._outfile.write(END_MAGIC)
def _write_block_header(self, block_type, block_len):
if block_len > BLOCK_HEADER_SIZE_MASK:
raise DataFormatError('block size ({}) is too big (> {})'.format(
block_len, BLOCK_HEADER_SIZE_MASK))
block_descriptor = block_type << 56 | block_len # mark this as a desc block
self._write(struct.pack('<Q', block_descriptor))