Source code for bosdyn.client.payload_software_update

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Payload software update service gRPC client.

This is used by Spot payloads to coordinate updates of their own software with Spot.
"""

import datetime

from google.protobuf import timestamp_pb2

from bosdyn.api.payload_software_update_pb2 import (GetAvailableSoftwareUpdatesRequest,
                                                    SendCurrentVersionInfoRequest,
                                                    SendSoftwareUpdateStatusRequest)
from bosdyn.api.payload_software_update_service_pb2_grpc import PayloadSoftwareUpdateServiceStub
from bosdyn.api.robot_id_pb2 import SoftwareVersion
from bosdyn.api.software_package_pb2 import SoftwarePackageVersion, SoftwareUpdateStatus
from bosdyn.client.common import BaseClient


[docs]class PayloadSoftwareUpdateClient(BaseClient): """A client for payloads to coordinate software updates with a robot.""" default_service_name = 'payload-software-update' service_type = 'bosdyn.api.PayloadSoftwareUpdateService' def __init__(self): super(PayloadSoftwareUpdateClient, self).__init__(PayloadSoftwareUpdateServiceStub)
[docs] def send_current_software_info( self, package_name: str, version: SoftwareVersion | list[int], release_date: float | timestamp_pb2.Timestamp | datetime.datetime, build_id: str, **kwargs): """Send version information about the currently installed payload software to Spot. Args: package_name: Name of the package, e.g., "coreio" version: Current semantic version of the installed software. release_date: Release date of the currently installed software. build_id: Unique identifier of the build. Returns: SendCurrentVersionInfoResponse: The response object from Spot. Currently this message contains no information other than a standard response header. Raises: RpcError: Problem communicating with the robot. """ request = self.make_info_request(package_name, version, release_date, build_id) return self.call(self._stub.SendCurrentVersionInfo, request, **kwargs)
[docs] def send_current_software_info_async( self, package_name: str, version: SoftwareVersion | list[int], release_date: float | timestamp_pb2.Timestamp | datetime.datetime, build_id: str, **kwargs): """Async version of send_current_software_info(). Args: package_name: Name of the package, e.g., "coreio" version: Current semantic version of the installed software. release_date: Release date of the currently installed software. build_id: Unique identifier of the build. Returns: SendCurrentVersionInfoResponse: The response object from Spot. Currently this message contains no information other than a standard response header. Raises: RpcError: Problem communicating with the robot. """ request = self.make_info_request(package_name, version, release_date, build_id) return self.call_async(self._stub.SendCurrentVersionInfo, request, **kwargs)
[docs] def get_available_updates(self, package_names: str | list[str], **kw_args): """Get a list of package information for the named package(s). Args: package_names: The package name or list of package names to query. Returns: GetAvailableSoftwareUpdatesResponse: The response object from Spot containing the version info for packages cached by Spot. Raises: RpcError: Problem communicating with the robot. """ if not isinstance(package_names, list): package_names = [package_names] request = GetAvailableSoftwareUpdatesRequest(package_names=package_names) return self.call(self._stub.GetAvailableSoftwareUpdates, request, **kw_args)
[docs] def get_available_updates_async(self, package_names: str | list[str], **kw_args): """Async version of get_available_updates(). Args: package_names: The package name or list of package names to query. Returns: GetAvailableSoftwareUpdatesResponse: The response object from Spot containing the version info for packages cached by Spot. Raises: RpcError: Problem communicating with the robot. """ if not isinstance(package_names, list): package_names = [package_names] request = GetAvailableSoftwareUpdatesRequest(package_names=package_names) return self.call_async(self._stub.GetAvailableSoftwareUpdates, request, **kw_args)
[docs] def send_installation_status(self, package_name: str, status: SoftwareUpdateStatus.Status, error_code: SoftwareUpdateStatus.ErrorCode, **kw_args): """Send a status update of a payload software installation operation to Spot. Args: package_name: Name of the package being updated status: Status code of installation operation error_code: Error code of the installation operation, or ERROR_NONE if no error has been encountered. Returns: SendSoftwareUpdateStatusResponse: The response object from Spot. Currently this message contains nothing beyond a standard response header. Raises: RpcError: Problem communicating with the robot. """ status = SoftwareUpdateStatus(package_name=package_name, status=status, error_code=error_code) request = SendSoftwareUpdateStatusRequest(update_status=status) return self.call(self._stub.SendSoftwareUpdateStatus, request, **kw_args)
[docs] def send_installation_status_async(self, package_name: str, status: SoftwareUpdateStatus.Status, error_code: SoftwareUpdateStatus.ErrorCode, **kw_args): """Async version of send_installation_status(). Args: package_name: Name of the package being updated status: Status code of installation operation error_code: Error code of the installation operation, or ERROR_NONE if no error has been encountered. Returns: SendSoftwareUpdateStatusResponse: The response object from Spot. Currently this message contains nothing beyond a standard response header. Raises: RpcError: Problem communicating with the robot. """ status = SoftwareUpdateStatus(package_name=package_name, status=status, error_code=error_code) request = SendSoftwareUpdateStatusRequest(update_status=status) return self.call_async(self._stub.SendSoftwareUpdateStatus, request, **kw_args)
[docs] @staticmethod def make_info_request(package_name: str, version: SoftwareVersion, release_date: float | timestamp_pb2.Timestamp | datetime.datetime, build_id: str): """Make a SendCurrentVersionInfoRequest message using the supplied information. Args: package_name: Name of the package, e.g., "coreio" version: Current semantic version of the installed software. release_date: Release date of the currently installed software. build_id: Unique identifier of the build. Returns: SendCurrentVersionInfoRequest: Message communicating to Spot the version information of the currently installed payload software. """ if not isinstance(version, SoftwareVersion): version = SoftwareVersion(major_version=version[0], minor_version=version[1], patch_level=version[2]) if isinstance(release_date, float): release_date = datetime.datetime.fromtimestamp(release_date) if isinstance(release_date, datetime.datetime): pb_release_date = timestamp_pb2.Timestamp() pb_release_date.FromDatetime(release_date) release_date = pb_release_date return SendCurrentVersionInfoRequest( package_version=SoftwarePackageVersion(package_name=package_name, version=version, release_date=release_date, build_id=build_id))