gpsd2csv.py

This python(3) tool processes json from gpsd into a csv file for use in GIS or whatever.

gpsd2csv.py
import json
import csv
import argparse
from datetime import datetime, timedelta
from typing import Dict, List, Any
 
class GPSConverter:
    def __init__(self):
        self.current_sky_data = {}
        self.rows = []
        self.all_fieldnames = set()  # Track all possible fields
 
    def determine_fix_type(self, tpv_data: Dict[str, Any]) -> str:
        """Determine the type of GPS fix based on mode and status"""
        mode = tpv_data.get('mode', 0)
        status = tpv_data.get('status', 0)
 
        # First determine basic fix type from mode
        if mode == 0 or mode == 1:
            base_type = 'NO_FIX'
        elif mode == 2:
            base_type = '2D'
        elif mode == 3:
            base_type = '3D'
        else:
            return 'UNKNOWN'
 
        # Then add status information
        status_types = {
            0: 'UNKNOWN',
            1: 'NORMAL',
            2: 'DGPS',
            3: 'RTK_FIXED',
            4: 'RTK_FLOAT',
            5: 'DR',
            6: 'GNSSDR',
            7: 'TIME',
            8: 'SIM',
            9: 'P(Y)'
        }
 
        if status == 0 or status == 1:  # Unknown or Normal
            return base_type
        else:
            return f"{base_type}/{status_types[status]}"
 
    def process_sky(self, data: Dict[str, Any]) -> None:
        """Process SKY message and store relevant data"""
        sky_data = {
            'hdop': data.get('hdop'),
            'pdop': data.get('pdop'),
            'vdop': data.get('vdop'),
            'gdop': data.get('gdop'),
            'uSat': data.get('uSat'),
            'nSat': data.get('nSat')
        }
 
        # Initialize all possible qual fields to 0
        for i in range(8):  # Assuming qual values 0-7
            sky_data[f'qual_{i}_count'] = 0
 
        # Add satellite quality information if available
        satellites = data.get('satellites', [])
        if satellites:
            # Count satellites by quality level
            for sat in satellites:
                qual = sat.get('qual', 0)
                sky_data[f'qual_{qual}_count'] = sky_data.get(f'qual_{qual}_count', 0) + 1
 
        self.current_sky_data = {k: v for k, v in sky_data.items() if v is not None}
        # Update all_fieldnames with any new fields
        self.all_fieldnames.update(sky_data.keys())
 
    def process_timestamp(self, utc_timestamp: str) -> Dict[str, str]:
        """Process UTC timestamp into various time fields"""
        if not utc_timestamp:
            return {}
 
        # Parse UTC time
        utc_time = datetime.strptime(utc_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
 
        # Convert to EST (UTC-5)
        local_time = utc_time - timedelta(hours=5)
 
        time_fields = {
            'timestamp_utc': utc_timestamp,
            'date': local_time.strftime("%Y-%m-%d"),
            'time_local': local_time.strftime("%H:%M:%S.%f")[:-3],  # Trim to milliseconds
            'datetime_local': local_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
        }
 
        # Update all_fieldnames with time fields
        self.all_fieldnames.update(time_fields.keys())
        return time_fields
 
    def process_tpv(self, data: Dict[str, Any]) -> None:
        """Process TPV message and create a row combining it with current SKY data"""
        if data.get('mode', 0) < 2:  # Skip if no fix
            return
 
        # Process timestamp first
        time_fields = self.process_timestamp(data.get('time'))
        if not time_fields:
            return
 
        row = {
            **time_fields,  # Add all time-related fields
            'fix_type': self.determine_fix_type(data),
            'lat': data.get('lat'),
            'lon': data.get('lon'),
            'alt_hae': data.get('altHAE'),  # Height above ellipsoid
            'alt_msl': data.get('altMSL'),  # Height above mean sea level
            'speed': data.get('speed'),
            'track': data.get('track'),  # Course over ground
            'climb': data.get('climb'),
            'epx': data.get('epx'),  # Longitude error estimate
            'epy': data.get('epy'),  # Latitude error estimate
            'epv': data.get('epv'),  # Vertical error estimate
            'eph': data.get('eph'),  # Horizontal error (CEP)
            'sep': data.get('sep'),  # Spherical error probability
        }
 
        # Update all_fieldnames with TPV fields
        self.all_fieldnames.update(row.keys())
 
        # Add the current SKY data
        row.update(self.current_sky_data)
 
        # Fill in any missing fields with None
        for field in self.all_fieldnames:
            if field not in row:
                row[field] = None
 
        # Only add row if we have essential data
        if row['timestamp_utc'] and row['lat'] and row['lon']:
            self.rows.append(row)
 
    def convert_file(self, input_file: str, output_file: str) -> None:
        """Convert GPS JSON log file to CSV"""
        print(f"Processing {input_file}...")
        print("Using EST (UTC-5)")
 
        with open(input_file, 'r') as f:
            for line_num, line in enumerate(f, 1):
                try:
                    data = json.loads(line.strip())
                    if data['class'] == 'SKY':
                        self.process_sky(data)
                    elif data['class'] == 'TPV':
                        self.process_tpv(data)
                except json.JSONDecodeError:
                    print(f"Warning: Skipping malformed JSON on line {line_num}: {line[:50]}...")
                except KeyError:
                    print(f"Warning: Skipping line {line_num} missing required fields: {line[:50]}...")
 
        # Write to CSV
        if not self.rows:
            print("No valid GPS fixes found in input file")
            return
 
        # Define column order (keeping qual_count fields for last)
        base_fields = [
            'timestamp_utc', 'date', 'time_local', 'datetime_local',
            'fix_type', 'lat', 'lon', 'alt_hae', 'alt_msl',
            'speed', 'track', 'climb',
            'epx', 'epy', 'epv', 'eph', 'sep',
            'hdop', 'pdop', 'vdop', 'gdop',
            'uSat', 'nSat'
        ]
 
        # Get all qual_count fields and sort them
        qual_fields = sorted([f for f in self.all_fieldnames if f.startswith('qual_')])
 
        # Combine base fields with qual fields
        fieldnames = base_fields + qual_fields
 
        with open(output_file, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(self.rows)
 
        print(f"Processed {len(self.rows)} GPS fixes")
        print(f"Output written to {output_file}")
 
def main():
    try:
        parser = argparse.ArgumentParser(description='Convert GPS JSON logs to CSV format')
        parser.add_argument('input_file', help='Input JSON file path')
        parser.add_argument('output_file', nargs='?', help='Output CSV file path (optional, defaults to input filename with .csv extension)')
 
        args = parser.parse_args()
 
        output_file = args.output_file
        if output_file is None:
            # Remove .json extension if present and add .csv
            if args.input_file.lower().endswith('.json'):
                output_file = args.input_file[:-5] + '.csv'
            else:
                output_file = args.input_file + '.csv'
 
        converter = GPSConverter()
        converter.convert_file(args.input_file, output_file)
 
    except Exception as e:
        print(f"Error: {str(e)}")
        raise
 
if __name__ == '__main__':
    main()