Source code for bosdyn.client.gps.NMEAParser

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import datetime
import math
import time
from typing import List, Tuple

import pynmea2
from google.protobuf.any_pb2 import Any
from google.protobuf.timestamp_pb2 import Timestamp

from bosdyn.api.gps.gps_pb2 import GpsDataPoint
from bosdyn.util import RobotTimeConverter, now_timestamp, seconds_to_timestamp


has_warned_no_zda = False


[docs]class NMEAParser(object): # The amount of time to wait before logging another decode error. LOG_THROTTLE_TIME = 2.0 # seconds. def __init__(self, logger): self.data = '' self.full_lines = [] self.logger = logger # NMEA strings come in in "groups" we are just trying to # figure out which group each message belongs to. We do this # by checking if their times are near to one another. # # If your GPS outputs data at 20 Hz, this constant must be less than 0.050 seconds. self.grouping_timeout = 0.025 self.last_failed_read_log_time = None
[docs] def nmea_message_group_to_gps_data_point(self, nmea_messages: List[Tuple[str, str, int]], time_converter: RobotTimeConverter): """Convert a NMEA message group with the same UTC timestamp to a GpsDataPoint.""" global has_warned_no_zda data_point = GpsDataPoint() has_timestamp = False # The NMEA message group should at least have a ZDA message. # Parse each message depending on the NMEA sentence_type. for nmea_message_list in nmea_messages: data, raw_nmea_msg, client_timestamp = nmea_message_list if data.sentence_type == 'GGA': if data.latitude is not None and data.longitude is not None and data.altitude is not None: data_point.llh.latitude = data.latitude data_point.llh.longitude = data.longitude data_point.llh.height = data.altitude if data.num_sats is not None and int(data.num_sats) > 0: for _ in range(int(data.num_sats)): sat = data_point.satellites.add() # GPS Quality indicator: # 0: Fix not valid # 1: GPS fix # 2: Differential GPS fix, OmniSTAR VBS # 4: Real-Time Kinematic, fixed integers # 5: Real-Time Kinematic, float integers, OmniSTAR XP/HP or Location RTK if data.gps_qual is not None: data_point.mode.value = data.gps_qual if not has_timestamp: # If there is no ZDA message to provide a date, assume today's date. gps_timestamp = datetime.datetime.combine(datetime.date.today(), data.timestamp) data_point.timestamp_gps.FromDatetime(gps_timestamp) elif data.sentence_type == 'GST': if data.std_dev_latitude is not None: # Horizontal Root Mean Squared. Note we are not using "twice distance rms" or "2drms". hrms = math.sqrt(( (math.pow(data.std_dev_latitude, 2) + math.pow(data.std_dev_longitude, 2)) / 2)) data_point.accuracy.horizontal = hrms data_point.accuracy.vertical = data.std_dev_altitude elif data.sentence_type == 'ZDA': gps_timestamp = data.datetime # Protobuf timestamp does not use timezone aware timestamps. gps_timestamp_no_tz = gps_timestamp.replace(tzinfo=None) data_point.timestamp_gps.FromDatetime(gps_timestamp_no_tz) has_timestamp = True # Populate client and robot timestamps. If we are not using TimeSync, the robot # timestamp will be the same as the client timestamp. data_point.timestamp_client.CopyFrom(seconds_to_timestamp(client_timestamp)) if time_converter is not None: data_point.timestamp_robot.CopyFrom( time_converter.robot_timestamp_from_local_secs(client_timestamp)) else: data_point.timestamp_robot.CopyFrom(data_point.timestamp_client) if not has_timestamp and not has_warned_no_zda: self.logger.warning("GPS data does not include ZDA. Timestamp may be inaccurate.") has_warned_no_zda = True return data_point
[docs] def parse(self, new_data: str, time_converter: RobotTimeConverter, check: bool = True) -> List[GpsDataPoint]: timestamp = time.time() # Client timestamp when received. self.data = self.data + new_data if len(self.data) == 0: return [] # Protection because empty_string.splitlines() returns empty array lines = self.data.splitlines(True) self.data = lines[-1] len_lines = len(lines) # Parse each line. for idx, line in enumerate(lines): # Only parse the last line if it ends with a \n if idx == len_lines - 1: if len(line) > 0: if line[-1] == '\n': # This line is complete, any new data # will be on a new line. Parse it. self.data = "" else: break else: break stripped = line.strip() try: nmea_msg = pynmea2.parse(stripped, check) except KeyboardInterrupt: raise except Exception as e: # Parsing error, log and skip. # Throttle the logs. now = time.time() if self.last_failed_read_log_time is None or ( now - self.last_failed_read_log_time) > self.LOG_THROTTLE_TIME: self.logger.exception(f"Failed to parse {stripped}. Is it NMEA?") self.last_failed_read_log_time = now continue # if the message does not contain a timestamp attribute, abandon the rest of the logic # and go to the beginning of the loop if not hasattr(nmea_msg, 'timestamp'): continue # Only use NMEA messages that have a timestamp. # For example, GSA and GST messages are not supported. if isinstance(nmea_msg.timestamp, datetime.time): self.full_lines.append((nmea_msg, stripped, timestamp)) elif isinstance(nmea_msg.timestamp, str) and nmea_msg.timestamp == '.': # pynmea2 will set the timestamp to the string '.' when GPS # spits out: "$GPZDA,.,,,,,00*66". Silently ignore. continue elif nmea_msg.timestamp is not None: self.logger.error("Invalid timestamp for \"" + stripped + "\" \"") continue found = True found_subsets = [] # Group a subset of NMEA messages based on timestamp. while found: if len(self.full_lines) < 2: break first_time = self.full_lines[0][0].timestamp found = False for idx in range(1, len(self.full_lines)): date = datetime.date(1, 1, 1) datetime1 = datetime.datetime.combine(date, first_time) datetime2 = datetime.datetime.combine(date, self.full_lines[idx][0].timestamp) # Mod the total seconds by 3600 so that checking 23:59:59.99 and 00:00:00.00 # evaluate as close to each other. time_elapsed = (datetime2 - datetime1).total_seconds() % 3600 if (time_elapsed < 0) or (time_elapsed > self.grouping_timeout): subset = self.full_lines[0:idx] self.full_lines = self.full_lines[idx:] found_subsets.append(subset) found = True break return [self.nmea_message_group_to_gps_data_point(x, time_converter) for x in found_subsets]