# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to delegate saving of tokens.
TokenCache -- Separate token storage from token management.
"""
import errno
import os
import shutil
import tempfile
from bosdyn.client.exceptions import Error
[docs]class TokenCacheError(Error):
    """General class of errors to handle non-response non-grpc errors.""" 
[docs]class ClearFailedError(TokenCacheError):
    """Failed to delete the token from storage.""" 
[docs]class NotInCacheError(TokenCacheError):
    """Failed to read the token from cache.""" 
[docs]class WriteFailedError(TokenCacheError):
    """Failed to write the token to storage.""" 
[docs]def atomic_file_write(data, filename, permissions=0o600):
    # Atomically write data.
    tf = tempfile.NamedTemporaryFile(delete=False)
    tf.write(data)
    tf.close()
    original_umask = os.umask(0)
    # Make sure path to file exists.
    try:
        directory = os.path.dirname(filename)
        if not os.path.exists(directory):
            os.makedirs(directory, 0o700)
    finally:
        os.umask(original_umask)
    # Copy the temporary file to filename, then unlink (aka delete) the temporary file.
    try:
        shutil.copyfile(tf.name, filename)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise
        os.unlink(filename)
        shutil.copyfile(tf.name, filename)
    # The delete happens separately to avoid a potential "rename" across filesystems.
    os.unlink(tf.name)
    os.chmod(filename, permissions) 
[docs]class TokenCache:
    """No-op default cache that serves as an interface."""
    def __init__(self):
        pass
[docs]    def read(self, name):
        raise NotInCacheError 
[docs]    def clear(self, name):
        pass 
[docs]    def write(self, name, token):
        pass 
[docs]    def match(self, name):
        """Returns a set of valid keys that contains the name."""
        return set()  
[docs]class TokenCacheFilesystem:
    """Handles transfer from in memory tokens to arbitrary storage e.g. filesystem."""
    def __init__(self, cache_directory='~/.bosdyn/user_tokens'):
        self.directory = os.path.join(os.path.expanduser(cache_directory))
[docs]    def read(self, name):
        try:
            filename = self._name_to_filename(name)
            with open(filename, 'rb') as reader:
                return reader.read()
        except IOError as e:
            raise NotInCacheError(e) 
[docs]    def clear(self, name):
        filename = self._name_to_filename(name)
        try:
            os.unlink(filename)
        except OSError as e:
            raise ClearFailedError(e) 
[docs]    def write(self, name, token):
        filename = self._name_to_filename(name)
        try:
            atomic_file_write(token, filename)
        except OSError as e:
            raise WriteFailedError(e) 
[docs]    def match(self, name):
        """Returns a set of valid keys that contains the name."""
        matching_tokens = set()
        for filename in os.listdir(self.directory):
            if name in filename:
                matching_tokens.add(self._filename_to_name(filename))
        return matching_tokens 
    def _name_to_filename(self, name):
        return '{}.jwt'.format(os.path.join(self.directory, name))
    def _filename_to_name(self, filename):
        return os.path.splitext(filename)[0]