import json import csv import argparse from datetime import datetime, timedelta from typing import Dict, List, Any class GPSConverter: def __init__(self): self.current_sky_data = {} self.rows = [] self.all_fieldnames = set() # Track all possible fields def determine_fix_type(self, tpv_data: Dict[str, Any]) -> str: """Determine the type of GPS fix based on mode and status""" mode = tpv_data.get('mode', 0) status = tpv_data.get('status', 0) # First determine basic fix type from mode if mode == 0 or mode == 1: base_type = 'NO_FIX' elif mode == 2: base_type = '2D' elif mode == 3: base_type = '3D' else: return 'UNKNOWN' # Then add status information status_types = { 0: 'UNKNOWN', 1: 'NORMAL', 2: 'DGPS', 3: 'RTK_FIXED', 4: 'RTK_FLOAT', 5: 'DR', 6: 'GNSSDR', 7: 'TIME', 8: 'SIM', 9: 'P(Y)' } if status == 0 or status == 1: # Unknown or Normal return base_type else: return f"{base_type}/{status_types[status]}" def process_sky(self, data: Dict[str, Any]) -> None: """Process SKY message and store relevant data""" sky_data = { 'hdop': data.get('hdop'), 'pdop': data.get('pdop'), 'vdop': data.get('vdop'), 'gdop': data.get('gdop'), 'uSat': data.get('uSat'), 'nSat': data.get('nSat') } # Initialize all possible qual fields to 0 for i in range(8): # Assuming qual values 0-7 sky_data[f'qual_{i}_count'] = 0 # Add satellite quality information if available satellites = data.get('satellites', []) if satellites: # Count satellites by quality level for sat in satellites: qual = sat.get('qual', 0) sky_data[f'qual_{qual}_count'] = sky_data.get(f'qual_{qual}_count', 0) + 1 self.current_sky_data = {k: v for k, v in sky_data.items() if v is not None} # Update all_fieldnames with any new fields self.all_fieldnames.update(sky_data.keys()) def process_timestamp(self, utc_timestamp: str) -> Dict[str, str]: """Process UTC timestamp into various time fields""" if not utc_timestamp: return {} # Parse UTC time utc_time = datetime.strptime(utc_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") # Convert to EST (UTC-5) local_time = utc_time - timedelta(hours=5) time_fields = { 'timestamp_utc': utc_timestamp, 'date': local_time.strftime("%Y-%m-%d"), 'time_local': local_time.strftime("%H:%M:%S.%f")[:-3], # Trim to milliseconds 'datetime_local': local_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] } # Update all_fieldnames with time fields self.all_fieldnames.update(time_fields.keys()) return time_fields def process_tpv(self, data: Dict[str, Any]) -> None: """Process TPV message and create a row combining it with current SKY data""" if data.get('mode', 0) < 2: # Skip if no fix return # Process timestamp first time_fields = self.process_timestamp(data.get('time')) if not time_fields: return row = { **time_fields, # Add all time-related fields 'fix_type': self.determine_fix_type(data), 'lat': data.get('lat'), 'lon': data.get('lon'), 'alt_hae': data.get('altHAE'), # Height above ellipsoid 'alt_msl': data.get('altMSL'), # Height above mean sea level 'speed': data.get('speed'), 'track': data.get('track'), # Course over ground 'climb': data.get('climb'), 'epx': data.get('epx'), # Longitude error estimate 'epy': data.get('epy'), # Latitude error estimate 'epv': data.get('epv'), # Vertical error estimate 'eph': data.get('eph'), # Horizontal error (CEP) 'sep': data.get('sep'), # Spherical error probability } # Update all_fieldnames with TPV fields self.all_fieldnames.update(row.keys()) # Add the current SKY data row.update(self.current_sky_data) # Fill in any missing fields with None for field in self.all_fieldnames: if field not in row: row[field] = None # Only add row if we have essential data if row['timestamp_utc'] and row['lat'] and row['lon']: self.rows.append(row) def convert_file(self, input_file: str, output_file: str) -> None: """Convert GPS JSON log file to CSV""" print(f"Processing {input_file}...") print("Using EST (UTC-5)") with open(input_file, 'r') as f: for line_num, line in enumerate(f, 1): try: data = json.loads(line.strip()) if data['class'] == 'SKY': self.process_sky(data) elif data['class'] == 'TPV': self.process_tpv(data) except json.JSONDecodeError: print(f"Warning: Skipping malformed JSON on line {line_num}: {line[:50]}...") except KeyError: print(f"Warning: Skipping line {line_num} missing required fields: {line[:50]}...") # Write to CSV if not self.rows: print("No valid GPS fixes found in input file") return # Define column order (keeping qual_count fields for last) base_fields = [ 'timestamp_utc', 'date', 'time_local', 'datetime_local', 'fix_type', 'lat', 'lon', 'alt_hae', 'alt_msl', 'speed', 'track', 'climb', 'epx', 'epy', 'epv', 'eph', 'sep', 'hdop', 'pdop', 'vdop', 'gdop', 'uSat', 'nSat' ] # Get all qual_count fields and sort them qual_fields = sorted([f for f in self.all_fieldnames if f.startswith('qual_')]) # Combine base fields with qual fields fieldnames = base_fields + qual_fields with open(output_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(self.rows) print(f"Processed {len(self.rows)} GPS fixes") print(f"Output written to {output_file}") def main(): try: parser = argparse.ArgumentParser(description='Convert GPS JSON logs to CSV format') parser.add_argument('input_file', help='Input JSON file path') parser.add_argument('output_file', nargs='?', help='Output CSV file path (optional, defaults to input filename with .csv extension)') args = parser.parse_args() output_file = args.output_file if output_file is None: # Remove .json extension if present and add .csv if args.input_file.lower().endswith('.json'): output_file = args.input_file[:-5] + '.csv' else: output_file = args.input_file + '.csv' converter = GPSConverter() converter.convert_file(args.input_file, output_file) except Exception as e: print(f"Error: {str(e)}") raise if __name__ == '__main__': main()