Source code for ewdm.sources

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

import numpy as np
import pandas as pd
import xarray as xr
import os

from pathlib import Path
from typing import Union, Tuple

from .helpers import get_sampling_frequency
from .parameters import VARIABLE_NAMES


[docs] class SpotterBuoysDataSource(object): """Class to handle Spotter buoy data parsed from SD Card""" def __init__( self, fname: str ="displacement.csv", ): if os.path.isfile(fname): self.fname = Path(fname) else: raise FileNotFoundError( "File not found. The name of the file parsed from the " "SD Card is likely `displacement.csv`. Please provide " "the full path to this file." )
[docs] def read_dataset(self): """Read raw data file and return xarray object.""" # define data columns variables = { k: VARIABLE_NAMES[k] for k in [ "eastward_displacement", "northward_displacement", "surface_elevation" ] } data_cols = [col for col in variables] columns = ['year','month','day','hour','min','sec','msec'] + data_cols # load data (deprecated) # fmt = "%Y %m %d %H %M %S %f" # parser = lambda t: pd.Timestamp( # *map(int,t.split()[:-1]), int(t.split()[-1])*1000 # ) # data = pd.read_csv( # self.fname, names=columns, header=None, skiprows=1, # parse_dates={"time": columns[:7]}, date_parser=parser # ) # read the CSV file without parsing date data = pd.read_csv( self.fname, names=columns, header=None, skiprows=1 ) # format the 'msec' column to be three digits data['msec'] = data['msec'].apply(lambda x: f'{int(x):03}') # print(data['msec']) # combine the desired date-time columns and convert them to datetime # here, we concatenate the time components and ensure the # millisecond part is correctly formatted data["time"] = pd.to_datetime( data['year'].astype(str) + '-' + data['month'].astype(str) + '-' + data['day'].astype(str) + ' ' + data['hour'].astype(str) + ':' + data['min'].astype(str) + ':' + data['sec'].astype(str) + '.' + data['msec'], # concatenate using three-digit milliseconds format="%Y-%m-%d %H:%M:%S.%f" # use .%f to parse the millisec part ) data = data.drop(columns=columns[:7]) # convert to xarray and add metadata ds = data.set_index("time").to_xarray() for name, attrs in variables.items(): ds[name].attrs = attrs ds["time"].encoding = { 'units': 'days since 1970-01-01 00:00:00', 'calendar': 'proleptic_gregorian' } ds.attrs["sampling_rate"] = get_sampling_frequency( ds.time, precision=3 ) return ds
[docs] class CDIPDataSourceRealTime(object): """Class to handle CDIP real-time buoy data""" def __init__(self, station_id: Union[str, int]): self.station_id = station_id self.baseurl = ( "https://thredds.cdip.ucsd.edu/thredds/dodsC/cdip/realtime" )
[docs] def read_dataset( self, time_start: Union[str, np.datetime64] = None, time_end: Union[str, np.datetime64] = None, ) -> xr.Dataset: """Read dataset from a specified time range and location. Parameters: - time_start (str or np.datetime64): Start time of the range to extract from the dataset. Format: 'YYYY-MM-DD HH:MM:SS'. - time_end (str or np.datetime64): End time of the range to extract from the dataset. Format 'YYYY-MM-DD HH:MM:SS'. Returns: - ds (xr.Dataset): Dataset containing the extracted data. """ url = f"{self.baseurl}/{self.station_id}p1_xy.nc" ds = xr.open_dataset(url) # sampling frequency fs = np.round(ds.xyzSampleRate.item(), 4) # create time array # filter_delay = np.timedelta64(int(1E9 * ds.xyzFilterDelay.item()), 'ns') time_delta = np.timedelta64(int(1E9 / fs), 'ns') miliseconds = np.arange(len(ds.xyzCount)) * time_delta time = ds.xyzStartTime.values + miliseconds self.time = time self.time_coverage_start = ds.time_coverage_start self.time_coverage_end = ds.time_coverage_end # check if time_start is passed otherwise use first time if time_start is None: time_start = time[0] else: try: time_start = np.datetime64(time_start) except TypeError: raise Exception( "`time_end` should be either string yyy-mm-dd HH:MM:SS " "or datetime64 instance." ) # if time_end is not defined, add 30 minutes to start time if time_end is None: time_end = time_start + np.timedelta64(30, "m") else: try: time_end = np.datetime64(time_end) except TypeError: raise Exception( "`time_end` should be either string yyy-mm-dd HH:MM:SS " "or datetime64 instance." ) # compute indices try: ij = slice( np.where(time >= time_start)[0][0], np.where(time >= time_end)[0][0] ) except IndexError: raise Exception( f"`time_start` and `time_end` couldn't be found in the dataset. " f"This dataset covers from {self.time_coverage_start} to " f"{self.time_coverage_end}." ) # originally X is northward and Y westward, in my convection # X should be eastward and Y northward _x = -ds.xyzYDisplacement.isel(xyzCount=ij).data _y = ds.xyzXDisplacement.isel(xyzCount=ij).data _z = ds.xyzZDisplacement.isel(xyzCount=ij).data ds = xr.Dataset( coords={"time": time[ij]}, data_vars={ "eastward_displacement": ("time", _x), "northward_displacement": ("time", _y), "surface_elevation": ("time", _z) }, attrs={ "sampling_rate": fs, **ds.attrs } ) variables = { k: VARIABLE_NAMES[k] for k in [ "eastward_displacement", "northward_displacement", "surface_elevation" ] } for name, attrs in variables.items(): ds[name].attrs = attrs ds["time"].encoding = { 'units': 'days since 1970-01-01 00:00:00', 'calendar': 'proleptic_gregorian' } return ds