Skip to content

hfpytrace.density.sami

Package

SAMI3 model adapter for 2D and 3D electron density grid workflows.

Key Classes

Class SAMI3

Key Methods

Method SAMI3.fetch_dataset() Method SAMI3.fetch_dataset_3d() Method SAMI3._bilinear_ne_profile()

Implementation Notes

Spatial interpolation

fetch_interpolated_data and _fetch_profile_at_index use bilinear horizontal interpolation (_bilinear_ne_profile) over the four surrounding 1° grid cells instead of nearest-neighbour selection. Longitude wraparound at 0°/360° is handled automatically.

Temporal interpolation

fetch_dataset and fetch_dataset_3d perform linear interpolation between the two bracketing SAMI3 time frames using

α  = (t − t_i) / (t_j − t_i)
Ne = (1 − α) · Ne(t_i) + α · Ne(t_j)

The denominator uses the actual bracket width t_j − t_i, so non-uniform output cadences are handled correctly.

API

hfpytrace.density.sami

SAMI3 electron density reader.

Reads SAMI3 (Sami is Another Model of the Ionosphere, 3D) netCDF output files and interpolates the electron density onto arbitrary lat/lon/altitude grids for use in HF ray-tracing.

Requires

xarray, scipy : for netCDF I/O and interpolation.

Classes

SAMI3 Loads a SAMI3 netCDF file and provides fetch_dataset / set_dataset methods that resample Ne onto user-specified grids.

SAMI3

Bases: object

Electron density from SAMI3 simulation output.

Parameters
SimpleNamespace

Config with density_file_location, density_file_name, and density_simulated_datetime (ISO-8601 string).

datetime.datetime

Requested time snapshot; the nearest SAMI3 time step is used.

int, optional

Round lat/lon values to this many decimal places during lookup. -1 disables rounding. Default -1.

Source code in hfpytrace/density/sami.py
class SAMI3(object):
    """Electron density from SAMI3 simulation output.

    Parameters
    ----------
    cfg : SimpleNamespace
        Config with ``density_file_location``, ``density_file_name``, and
        ``density_simulated_datetime`` (ISO-8601 string).
    event : datetime.datetime
        Requested time snapshot; the nearest SAMI3 time step is used.
    round_to : int, optional
        Round lat/lon values to this many decimal places during lookup.
        ``-1`` disables rounding.  Default ``-1``.
    """

    def __init__(
        self,
        cfg,
        event,
        round_to=-1,
    ):
        self.cfg = cfg
        self.event = event
        self.file_name = os.path.join(
            self.cfg.density_file_location, self.cfg.density_file_name
        )
        self.cfg.density_simulated_datetime = dparser.isoparse(
            self.cfg.density_simulated_datetime
        )
        self.round_to = round_to
        self.load_nc_dataset()
        return

    def load_nc_dataset(self):
        """Load the SAMI3 netCDF file into memory.

        Reads time, altitude, geographic lat/lon, and electron density arrays
        from the netCDF file and stores them in ``self.store``.  The raw
        ``time`` variable (in hours from ``density_simulated_datetime``) is
        converted to absolute ``datetime`` objects.  Longitude is kept in
        the 0–360° convention as produced by SAMI3.
        """
        self.store = {}
        logger.info(f"Load files -> {self.file_name}")
        ds = xr.open_dataset(self.file_name)
        self.store["time"] = [
            self.cfg.density_simulated_datetime + dt.timedelta(hours=float(d))
            for d in ds.variables["time"][:]
        ]
        if self.round_to > 0:
            logger.info("Round to nearest minutes!!!!!!!!!!!!")
            self.store["time"] = [self.round_time(e) for e in self.store["time"]]
        self.dates = self.store["time"]
        self.store["alt"] = ds.variables["alt0"].values  # in km
        self.store["glat"] = ds.variables["lat0"].values  # in deg
        self.store["glon"] = ds.variables["lon0"].values  # in 0-360 deg
        # self.store["glon"] = (
        #     np.mod(self.store["glon"] - 180.0, 360.0) - 180.0
        # )  # to +/- 180
        self.store["eden"] = ds.variables["dene0"][:]  # in cc
        ds.close()
        del ds
        return

    def round_time(self, t):
        """Round a datetime object to any time lapse in seconds
        t : datetime.datetime object, default now.
        """
        seconds = (t.replace(tzinfo=None) - t.min).seconds
        rounding = (seconds + self.round_to / 2) // self.round_to * self.round_to
        return t + dt.timedelta(0, rounding - seconds, -t.microsecond)

    def find_time_index(self, t):
        """Finds the index of the interval in the array where the number falls.

        Returns:
            The index of the interval where the number falls, or -1 if the number is
            outside the range of the array.
        """
        for i in range(len(self.store["time"]) - 1):
            if self.store["time"][i] <= t < self.store["time"][i + 1]:
                return i, i + 1
        return -1, 0

    def fetch_dataset(
        self,
        time,
        lats,
        lons,
        alts,
        to_file=None,
    ):
        """Fetch 2D electron density along a route at a given time.

        If the requested time matches a model step exactly it is used directly;
        otherwise the two bracketing steps are linearly interpolated in time.

        Parameters
        ----------
        time : datetime.datetime
            Requested time snapshot.
        lats, lons : array-like, shape (npts,)
            Geographic coordinates of route points [°].
        alts : array-like, shape (nalt,)
            Target altitude levels [km].
        to_file : str, optional
            If given, save the result to a ``.mat`` file at this path.

        Returns
        -------
        param : np.ndarray, shape (nalt, npts)
            Electron density in cm⁻³.
        alts : np.ndarray
            Model altitude grid [km].
        """
        # Selecting based on time index
        if time in self.store["time"]:
            logger.info(f"Into exact timestamp {time}")
            # Select the exact index if timestamp in the simulation
            i = self.store["time"].index(time)
            self.param, self.alts = self.fetch_interpolated_data(lats, lons, alts, i)
        else:
            # Select the two index if timestamp in the simulation
            i, j = self.find_time_index(time)
            logger.info(
                f"{time}/ into between timestamp {self.store['time'][i]} & {self.store['time'][j]}"
            )
            dt_bracket = (self.store["time"][j] - self.store["time"][i]).total_seconds()
            alpha = (time - self.store["time"][i]).total_seconds() / dt_bracket
            px, _ = self.fetch_interpolated_data(lats, lons, alts, i)
            py, self.alts = self.fetch_interpolated_data(lats, lons, alts, j)
            self.param = (1.0 - alpha) * px + alpha * py

        if to_file:
            savemat(to_file, dict(ne=self.param))
        return self.param, self.alts

    def _bilinear_ne_profile(self, D, lat, lon):
        """Return a raw Ne altitude profile (shape: n_alt) bilinearly interpolated
        at (lat, lon) from the four surrounding 1° grid cells.

        D has shape (n_lon, n_alt, n_lat).
        lon must already be normalised to [0, 360).
        """
        glat = self.store["glat"]
        glon = self.store["glon"]
        n_lon, _, n_lat = D.shape

        # ── latitude bracket ─────────────────────────────────────────────
        i0 = int(np.clip(np.searchsorted(glat, lat, side="right") - 1, 0, n_lat - 2))
        i1 = i0 + 1
        lat_f = float(np.clip((lat - glat[i0]) / (glat[i1] - glat[i0]), 0.0, 1.0))

        # ── longitude bracket with 0–360 wraparound ──────────────────────
        j0 = int((np.searchsorted(glon, lon, side="right") - 1) % n_lon)
        j1 = int((j0 + 1) % n_lon)
        dlon = (
            float(glon[j1] - glon[j0])
            if j1 > j0
            else float(glon[j1] + 360.0 - glon[j0])
        )
        lon_f = float(
            np.clip((lon - float(glon[j0])) / dlon if dlon > 0.0 else 0.0, 0.0, 1.0)
        )

        # ── bilinear blend over four corners ─────────────────────────────
        p00 = D[j0, :, i0].astype(float)  # lon0, lat0
        p10 = D[j1, :, i0].astype(float)  # lon1, lat0
        p01 = D[j0, :, i1].astype(float)  # lon0, lat1
        p11 = D[j1, :, i1].astype(float)  # lon1, lat1
        return (
            (1.0 - lon_f) * (1.0 - lat_f) * p00
            + lon_f * (1.0 - lat_f) * p10
            + (1.0 - lon_f) * lat_f * p01
            + lon_f * lat_f * p11
        )

    def fetch_interpolated_data(
        self,
        lats,
        lons,
        alts,
        index,
    ):
        """Fetch bilinearly interpolated 2D Ne profiles for a given time index.

        Parameters
        ----------
        lats, lons : array-like, shape (npts,)
            Geographic coordinates of route points [°].
        alts : array-like, shape (nalt,)
            Target altitude levels [km].
        index : int
            Time index into ``self.store["eden"]``.

        Returns
        -------
        out : np.ndarray, shape (nalt, npts)
            Electron density in cm⁻³.
        galt : np.ndarray
            Model altitude grid [km].
        """
        n = len(lats)
        D = self.store["eden"][index]
        galt = self.store["alt"]
        out, ix = np.zeros((len(alts), n)) * np.nan, 0
        for lat, lon in zip(lats, lons):
            lon = float(np.mod(360 + lon, 360))
            o = self._bilinear_ne_profile(D, lat, lon)
            out[:, ix] = utils.interpolate_by_altitude(
                galt, alts, o, self.cfg.scale, self.cfg.kind, method="extp"
            )  # in cc
            ix += 1
        return out, galt

    def _fetch_profile_at_index(self, index, lat, lon, alts):
        """Return a single altitude-interpolated Ne profile at (lat, lon).

        Parameters
        ----------
        index : int
            Time index into ``self.store["eden"]``.
        lat, lon : float
            Target geographic coordinates [°].
        alts : array-like
            Target altitude levels [km].

        Returns
        -------
        p : np.ndarray, shape (nalt,)
            Electron density in cm⁻³.
        galt : np.ndarray
            Model altitude grid [km].
        """
        D = self.store["eden"][index]
        galt = self.store["alt"]
        lon = float(np.mod(360 + lon, 360))
        o = self._bilinear_ne_profile(D, lat, lon)
        p = utils.interpolate_by_altitude(
            galt, alts, o, self.cfg.scale, self.cfg.kind, method="extp"
        )
        return np.asarray(p, dtype=float), np.asarray(galt, dtype=float)

    def _fetch_cube_at_index(self, index, lats, lons, alts, workers: int = 1):
        lats = np.asarray(lats, dtype=float)
        lons = np.asarray(lons, dtype=float)
        alts = np.asarray(alts, dtype=float)
        out = np.zeros((lats.size, lons.size, alts.size), dtype=float) * np.nan
        ij = [(ii, jj) for ii in range(lats.size) for jj in range(lons.size)]

        def _job(ii, jj):
            p, _ = self._fetch_profile_at_index(index, lats[ii], lons[jj], alts)
            return ii, jj, p

        n_workers = max(1, int(workers))
        if n_workers == 1:
            for ii, jj in ij:
                _, _, p = _job(ii, jj)
                out[ii, jj, :] = p
        else:
            with ThreadPoolExecutor(max_workers=n_workers) as ex:
                for ii, jj, p in ex.map(lambda t: _job(*t), ij):
                    out[ii, jj, :] = p
        return out

    def fetch_dataset_3d(
        self,
        time,
        lats,
        lons,
        alts,
        to_file=None,
        workers: int = 1,
    ):
        """
        Fetch 3D electron density cube on (lat, lon, alt) with optional
        point-wise parallelism and time interpolation.
        """
        if time in self.store["time"]:
            i = self.store["time"].index(time)
            self.param3d = self._fetch_cube_at_index(i, lats, lons, alts, workers)
        else:
            i, j = self.find_time_index(time)
            dt_bracket = (self.store["time"][j] - self.store["time"][i]).total_seconds()
            alpha = (time - self.store["time"][i]).total_seconds() / dt_bracket
            px = self._fetch_cube_at_index(i, lats, lons, alts, workers)
            py = self._fetch_cube_at_index(j, lats, lons, alts, workers)
            self.param3d = (1.0 - alpha) * px + alpha * py

        self.alts = np.asarray(alts, dtype=float)
        if to_file:
            savemat(to_file, dict(ne=self.param3d))
        return self.param3d, self.alts

    def load_from_file(self, to_file: str):
        """Load a previously saved electron density array from a ``.mat`` file.

        Parameters
        ----------
        to_file : str
            Path to the ``.mat`` file containing key ``ne``.

        Returns
        -------
        np.ndarray
            Electron density array as stored.
        """
        logger.info(f"Load from file {to_file.split('/')[-1]}")
        self.param = loadmat(to_file)["ne"]
        return self.param

load_nc_dataset()

Load the SAMI3 netCDF file into memory.

Reads time, altitude, geographic lat/lon, and electron density arrays from the netCDF file and stores them in self.store. The raw time variable (in hours from density_simulated_datetime) is converted to absolute datetime objects. Longitude is kept in the 0–360° convention as produced by SAMI3.

Source code in hfpytrace/density/sami.py
def load_nc_dataset(self):
    """Load the SAMI3 netCDF file into memory.

    Reads time, altitude, geographic lat/lon, and electron density arrays
    from the netCDF file and stores them in ``self.store``.  The raw
    ``time`` variable (in hours from ``density_simulated_datetime``) is
    converted to absolute ``datetime`` objects.  Longitude is kept in
    the 0–360° convention as produced by SAMI3.
    """
    self.store = {}
    logger.info(f"Load files -> {self.file_name}")
    ds = xr.open_dataset(self.file_name)
    self.store["time"] = [
        self.cfg.density_simulated_datetime + dt.timedelta(hours=float(d))
        for d in ds.variables["time"][:]
    ]
    if self.round_to > 0:
        logger.info("Round to nearest minutes!!!!!!!!!!!!")
        self.store["time"] = [self.round_time(e) for e in self.store["time"]]
    self.dates = self.store["time"]
    self.store["alt"] = ds.variables["alt0"].values  # in km
    self.store["glat"] = ds.variables["lat0"].values  # in deg
    self.store["glon"] = ds.variables["lon0"].values  # in 0-360 deg
    # self.store["glon"] = (
    #     np.mod(self.store["glon"] - 180.0, 360.0) - 180.0
    # )  # to +/- 180
    self.store["eden"] = ds.variables["dene0"][:]  # in cc
    ds.close()
    del ds
    return

round_time(t)

Round a datetime object to any time lapse in seconds t : datetime.datetime object, default now.

Source code in hfpytrace/density/sami.py
def round_time(self, t):
    """Round a datetime object to any time lapse in seconds
    t : datetime.datetime object, default now.
    """
    seconds = (t.replace(tzinfo=None) - t.min).seconds
    rounding = (seconds + self.round_to / 2) // self.round_to * self.round_to
    return t + dt.timedelta(0, rounding - seconds, -t.microsecond)

find_time_index(t)

Finds the index of the interval in the array where the number falls.

Returns:

Type Description

The index of the interval where the number falls, or -1 if the number is

outside the range of the array.

Source code in hfpytrace/density/sami.py
def find_time_index(self, t):
    """Finds the index of the interval in the array where the number falls.

    Returns:
        The index of the interval where the number falls, or -1 if the number is
        outside the range of the array.
    """
    for i in range(len(self.store["time"]) - 1):
        if self.store["time"][i] <= t < self.store["time"][i + 1]:
            return i, i + 1
    return -1, 0

fetch_dataset(time, lats, lons, alts, to_file=None)

Fetch 2D electron density along a route at a given time.

If the requested time matches a model step exactly it is used directly; otherwise the two bracketing steps are linearly interpolated in time.

Parameters
datetime.datetime

Requested time snapshot.

lats, lons : array-like, shape (npts,) Geographic coordinates of route points [°].

array-like, shape (nalt,)

Target altitude levels [km].

str, optional

If given, save the result to a .mat file at this path.

Returns
np.ndarray, shape (nalt, npts)

Electron density in cm⁻³.

np.ndarray

Model altitude grid [km].

Source code in hfpytrace/density/sami.py
def fetch_dataset(
    self,
    time,
    lats,
    lons,
    alts,
    to_file=None,
):
    """Fetch 2D electron density along a route at a given time.

    If the requested time matches a model step exactly it is used directly;
    otherwise the two bracketing steps are linearly interpolated in time.

    Parameters
    ----------
    time : datetime.datetime
        Requested time snapshot.
    lats, lons : array-like, shape (npts,)
        Geographic coordinates of route points [°].
    alts : array-like, shape (nalt,)
        Target altitude levels [km].
    to_file : str, optional
        If given, save the result to a ``.mat`` file at this path.

    Returns
    -------
    param : np.ndarray, shape (nalt, npts)
        Electron density in cm⁻³.
    alts : np.ndarray
        Model altitude grid [km].
    """
    # Selecting based on time index
    if time in self.store["time"]:
        logger.info(f"Into exact timestamp {time}")
        # Select the exact index if timestamp in the simulation
        i = self.store["time"].index(time)
        self.param, self.alts = self.fetch_interpolated_data(lats, lons, alts, i)
    else:
        # Select the two index if timestamp in the simulation
        i, j = self.find_time_index(time)
        logger.info(
            f"{time}/ into between timestamp {self.store['time'][i]} & {self.store['time'][j]}"
        )
        dt_bracket = (self.store["time"][j] - self.store["time"][i]).total_seconds()
        alpha = (time - self.store["time"][i]).total_seconds() / dt_bracket
        px, _ = self.fetch_interpolated_data(lats, lons, alts, i)
        py, self.alts = self.fetch_interpolated_data(lats, lons, alts, j)
        self.param = (1.0 - alpha) * px + alpha * py

    if to_file:
        savemat(to_file, dict(ne=self.param))
    return self.param, self.alts

fetch_interpolated_data(lats, lons, alts, index)

Fetch bilinearly interpolated 2D Ne profiles for a given time index.

Parameters

lats, lons : array-like, shape (npts,) Geographic coordinates of route points [°].

array-like, shape (nalt,)

Target altitude levels [km].

int

Time index into self.store["eden"].

Returns
np.ndarray, shape (nalt, npts)

Electron density in cm⁻³.

np.ndarray

Model altitude grid [km].

Source code in hfpytrace/density/sami.py
def fetch_interpolated_data(
    self,
    lats,
    lons,
    alts,
    index,
):
    """Fetch bilinearly interpolated 2D Ne profiles for a given time index.

    Parameters
    ----------
    lats, lons : array-like, shape (npts,)
        Geographic coordinates of route points [°].
    alts : array-like, shape (nalt,)
        Target altitude levels [km].
    index : int
        Time index into ``self.store["eden"]``.

    Returns
    -------
    out : np.ndarray, shape (nalt, npts)
        Electron density in cm⁻³.
    galt : np.ndarray
        Model altitude grid [km].
    """
    n = len(lats)
    D = self.store["eden"][index]
    galt = self.store["alt"]
    out, ix = np.zeros((len(alts), n)) * np.nan, 0
    for lat, lon in zip(lats, lons):
        lon = float(np.mod(360 + lon, 360))
        o = self._bilinear_ne_profile(D, lat, lon)
        out[:, ix] = utils.interpolate_by_altitude(
            galt, alts, o, self.cfg.scale, self.cfg.kind, method="extp"
        )  # in cc
        ix += 1
    return out, galt

fetch_dataset_3d(time, lats, lons, alts, to_file=None, workers=1)

Fetch 3D electron density cube on (lat, lon, alt) with optional point-wise parallelism and time interpolation.

Source code in hfpytrace/density/sami.py
def fetch_dataset_3d(
    self,
    time,
    lats,
    lons,
    alts,
    to_file=None,
    workers: int = 1,
):
    """
    Fetch 3D electron density cube on (lat, lon, alt) with optional
    point-wise parallelism and time interpolation.
    """
    if time in self.store["time"]:
        i = self.store["time"].index(time)
        self.param3d = self._fetch_cube_at_index(i, lats, lons, alts, workers)
    else:
        i, j = self.find_time_index(time)
        dt_bracket = (self.store["time"][j] - self.store["time"][i]).total_seconds()
        alpha = (time - self.store["time"][i]).total_seconds() / dt_bracket
        px = self._fetch_cube_at_index(i, lats, lons, alts, workers)
        py = self._fetch_cube_at_index(j, lats, lons, alts, workers)
        self.param3d = (1.0 - alpha) * px + alpha * py

    self.alts = np.asarray(alts, dtype=float)
    if to_file:
        savemat(to_file, dict(ne=self.param3d))
    return self.param3d, self.alts

load_from_file(to_file)

Load a previously saved electron density array from a .mat file.

Parameters
str

Path to the .mat file containing key ne.

Returns

np.ndarray Electron density array as stored.

Source code in hfpytrace/density/sami.py
def load_from_file(self, to_file: str):
    """Load a previously saved electron density array from a ``.mat`` file.

    Parameters
    ----------
    to_file : str
        Path to the ``.mat`` file containing key ``ne``.

    Returns
    -------
    np.ndarray
        Electron density array as stored.
    """
    logger.info(f"Load from file {to_file.split('/')[-1]}")
    self.param = loadmat(to_file)["ne"]
    return self.param

Source Code

hfpytrace/density/sami.py
"""SAMI3 electron density reader.

Reads SAMI3 (Sami is Another Model of the Ionosphere, 3D) netCDF output
files and interpolates the electron density onto arbitrary lat/lon/altitude
grids for use in HF ray-tracing.

Requires
--------
xarray, scipy : for netCDF I/O and interpolation.

Classes
-------
SAMI3
    Loads a SAMI3 netCDF file and provides ``fetch_dataset`` / ``set_dataset``
    methods that resample Ne onto user-specified grids.
"""

import datetime as dt
import os
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import pandas as pd
import xarray as xr
from dateutil import parser as dparser
from loguru import logger
from scipy.io import loadmat, savemat

from hfpytrace import utils


class SAMI3(object):
    """Electron density from SAMI3 simulation output.

    Parameters
    ----------
    cfg : SimpleNamespace
        Config with ``density_file_location``, ``density_file_name``, and
        ``density_simulated_datetime`` (ISO-8601 string).
    event : datetime.datetime
        Requested time snapshot; the nearest SAMI3 time step is used.
    round_to : int, optional
        Round lat/lon values to this many decimal places during lookup.
        ``-1`` disables rounding.  Default ``-1``.
    """

    def __init__(
        self,
        cfg,
        event,
        round_to=-1,
    ):
        self.cfg = cfg
        self.event = event
        self.file_name = os.path.join(
            self.cfg.density_file_location, self.cfg.density_file_name
        )
        self.cfg.density_simulated_datetime = dparser.isoparse(
            self.cfg.density_simulated_datetime
        )
        self.round_to = round_to
        self.load_nc_dataset()
        return

    def load_nc_dataset(self):
        """Load the SAMI3 netCDF file into memory.

        Reads time, altitude, geographic lat/lon, and electron density arrays
        from the netCDF file and stores them in ``self.store``.  The raw
        ``time`` variable (in hours from ``density_simulated_datetime``) is
        converted to absolute ``datetime`` objects.  Longitude is kept in
        the 0–360° convention as produced by SAMI3.
        """
        self.store = {}
        logger.info(f"Load files -> {self.file_name}")
        ds = xr.open_dataset(self.file_name)
        self.store["time"] = [
            self.cfg.density_simulated_datetime + dt.timedelta(hours=float(d))
            for d in ds.variables["time"][:]
        ]
        if self.round_to > 0:
            logger.info("Round to nearest minutes!!!!!!!!!!!!")
            self.store["time"] = [self.round_time(e) for e in self.store["time"]]
        self.dates = self.store["time"]
        self.store["alt"] = ds.variables["alt0"].values  # in km
        self.store["glat"] = ds.variables["lat0"].values  # in deg
        self.store["glon"] = ds.variables["lon0"].values  # in 0-360 deg
        # self.store["glon"] = (
        #     np.mod(self.store["glon"] - 180.0, 360.0) - 180.0
        # )  # to +/- 180
        self.store["eden"] = ds.variables["dene0"][:]  # in cc
        ds.close()
        del ds
        return

    def round_time(self, t):
        """Round a datetime object to any time lapse in seconds
        t : datetime.datetime object, default now.
        """
        seconds = (t.replace(tzinfo=None) - t.min).seconds
        rounding = (seconds + self.round_to / 2) // self.round_to * self.round_to
        return t + dt.timedelta(0, rounding - seconds, -t.microsecond)

    def find_time_index(self, t):
        """Finds the index of the interval in the array where the number falls.

        Returns:
            The index of the interval where the number falls, or -1 if the number is
            outside the range of the array.
        """
        for i in range(len(self.store["time"]) - 1):
            if self.store["time"][i] <= t < self.store["time"][i + 1]:
                return i, i + 1
        return -1, 0

    def fetch_dataset(
        self,
        time,
        lats,
        lons,
        alts,
        to_file=None,
    ):
        """Fetch 2D electron density along a route at a given time.

        If the requested time matches a model step exactly it is used directly;
        otherwise the two bracketing steps are linearly interpolated in time.

        Parameters
        ----------
        time : datetime.datetime
            Requested time snapshot.
        lats, lons : array-like, shape (npts,)
            Geographic coordinates of route points [°].
        alts : array-like, shape (nalt,)
            Target altitude levels [km].
        to_file : str, optional
            If given, save the result to a ``.mat`` file at this path.

        Returns
        -------
        param : np.ndarray, shape (nalt, npts)
            Electron density in cm⁻³.
        alts : np.ndarray
            Model altitude grid [km].
        """
        # Selecting based on time index
        if time in self.store["time"]:
            logger.info(f"Into exact timestamp {time}")
            # Select the exact index if timestamp in the simulation
            i = self.store["time"].index(time)
            self.param, self.alts = self.fetch_interpolated_data(lats, lons, alts, i)
        else:
            # Select the two index if timestamp in the simulation
            i, j = self.find_time_index(time)
            logger.info(
                f"{time}/ into between timestamp {self.store['time'][i]} & {self.store['time'][j]}"
            )
            dt_bracket = (self.store["time"][j] - self.store["time"][i]).total_seconds()
            alpha = (time - self.store["time"][i]).total_seconds() / dt_bracket
            px, _ = self.fetch_interpolated_data(lats, lons, alts, i)
            py, self.alts = self.fetch_interpolated_data(lats, lons, alts, j)
            self.param = (1.0 - alpha) * px + alpha * py

        if to_file:
            savemat(to_file, dict(ne=self.param))
        return self.param, self.alts

    def _bilinear_ne_profile(self, D, lat, lon):
        """Return a raw Ne altitude profile (shape: n_alt) bilinearly interpolated
        at (lat, lon) from the four surrounding 1° grid cells.

        D has shape (n_lon, n_alt, n_lat).
        lon must already be normalised to [0, 360).
        """
        glat = self.store["glat"]
        glon = self.store["glon"]
        n_lon, _, n_lat = D.shape

        # ── latitude bracket ─────────────────────────────────────────────
        i0 = int(np.clip(np.searchsorted(glat, lat, side="right") - 1, 0, n_lat - 2))
        i1 = i0 + 1
        lat_f = float(np.clip((lat - glat[i0]) / (glat[i1] - glat[i0]), 0.0, 1.0))

        # ── longitude bracket with 0–360 wraparound ──────────────────────
        j0 = int((np.searchsorted(glon, lon, side="right") - 1) % n_lon)
        j1 = int((j0 + 1) % n_lon)
        dlon = (
            float(glon[j1] - glon[j0])
            if j1 > j0
            else float(glon[j1] + 360.0 - glon[j0])
        )
        lon_f = float(
            np.clip((lon - float(glon[j0])) / dlon if dlon > 0.0 else 0.0, 0.0, 1.0)
        )

        # ── bilinear blend over four corners ─────────────────────────────
        p00 = D[j0, :, i0].astype(float)  # lon0, lat0
        p10 = D[j1, :, i0].astype(float)  # lon1, lat0
        p01 = D[j0, :, i1].astype(float)  # lon0, lat1
        p11 = D[j1, :, i1].astype(float)  # lon1, lat1
        return (
            (1.0 - lon_f) * (1.0 - lat_f) * p00
            + lon_f * (1.0 - lat_f) * p10
            + (1.0 - lon_f) * lat_f * p01
            + lon_f * lat_f * p11
        )

    def fetch_interpolated_data(
        self,
        lats,
        lons,
        alts,
        index,
    ):
        """Fetch bilinearly interpolated 2D Ne profiles for a given time index.

        Parameters
        ----------
        lats, lons : array-like, shape (npts,)
            Geographic coordinates of route points [°].
        alts : array-like, shape (nalt,)
            Target altitude levels [km].
        index : int
            Time index into ``self.store["eden"]``.

        Returns
        -------
        out : np.ndarray, shape (nalt, npts)
            Electron density in cm⁻³.
        galt : np.ndarray
            Model altitude grid [km].
        """
        n = len(lats)
        D = self.store["eden"][index]
        galt = self.store["alt"]
        out, ix = np.zeros((len(alts), n)) * np.nan, 0
        for lat, lon in zip(lats, lons):
            lon = float(np.mod(360 + lon, 360))
            o = self._bilinear_ne_profile(D, lat, lon)
            out[:, ix] = utils.interpolate_by_altitude(
                galt, alts, o, self.cfg.scale, self.cfg.kind, method="extp"
            )  # in cc
            ix += 1
        return out, galt

    def _fetch_profile_at_index(self, index, lat, lon, alts):
        """Return a single altitude-interpolated Ne profile at (lat, lon).

        Parameters
        ----------
        index : int
            Time index into ``self.store["eden"]``.
        lat, lon : float
            Target geographic coordinates [°].
        alts : array-like
            Target altitude levels [km].

        Returns
        -------
        p : np.ndarray, shape (nalt,)
            Electron density in cm⁻³.
        galt : np.ndarray
            Model altitude grid [km].
        """
        D = self.store["eden"][index]
        galt = self.store["alt"]
        lon = float(np.mod(360 + lon, 360))
        o = self._bilinear_ne_profile(D, lat, lon)
        p = utils.interpolate_by_altitude(
            galt, alts, o, self.cfg.scale, self.cfg.kind, method="extp"
        )
        return np.asarray(p, dtype=float), np.asarray(galt, dtype=float)

    def _fetch_cube_at_index(self, index, lats, lons, alts, workers: int = 1):
        lats = np.asarray(lats, dtype=float)
        lons = np.asarray(lons, dtype=float)
        alts = np.asarray(alts, dtype=float)
        out = np.zeros((lats.size, lons.size, alts.size), dtype=float) * np.nan
        ij = [(ii, jj) for ii in range(lats.size) for jj in range(lons.size)]

        def _job(ii, jj):
            p, _ = self._fetch_profile_at_index(index, lats[ii], lons[jj], alts)
            return ii, jj, p

        n_workers = max(1, int(workers))
        if n_workers == 1:
            for ii, jj in ij:
                _, _, p = _job(ii, jj)
                out[ii, jj, :] = p
        else:
            with ThreadPoolExecutor(max_workers=n_workers) as ex:
                for ii, jj, p in ex.map(lambda t: _job(*t), ij):
                    out[ii, jj, :] = p
        return out

    def fetch_dataset_3d(
        self,
        time,
        lats,
        lons,
        alts,
        to_file=None,
        workers: int = 1,
    ):
        """
        Fetch 3D electron density cube on (lat, lon, alt) with optional
        point-wise parallelism and time interpolation.
        """
        if time in self.store["time"]:
            i = self.store["time"].index(time)
            self.param3d = self._fetch_cube_at_index(i, lats, lons, alts, workers)
        else:
            i, j = self.find_time_index(time)
            dt_bracket = (self.store["time"][j] - self.store["time"][i]).total_seconds()
            alpha = (time - self.store["time"][i]).total_seconds() / dt_bracket
            px = self._fetch_cube_at_index(i, lats, lons, alts, workers)
            py = self._fetch_cube_at_index(j, lats, lons, alts, workers)
            self.param3d = (1.0 - alpha) * px + alpha * py

        self.alts = np.asarray(alts, dtype=float)
        if to_file:
            savemat(to_file, dict(ne=self.param3d))
        return self.param3d, self.alts

    def load_from_file(self, to_file: str):
        """Load a previously saved electron density array from a ``.mat`` file.

        Parameters
        ----------
        to_file : str
            Path to the ``.mat`` file containing key ``ne``.

        Returns
        -------
        np.ndarray
            Electron density array as stored.
        """
        logger.info(f"Load from file {to_file.split('/')[-1]}")
        self.param = loadmat(to_file)["ne"]
        return self.param