Skip to content

Ireland (OPW) (add new countries) #69

@thiagovmdon

Description

@thiagovmdon

Related to issue #5

Code for list of stations (and metadata #1):

import requests
import pandas as pd
import numpy as np

def get_opw_metadata() -> pd.DataFrame:
    """
    Fetch OPW (Ireland) hydrometric station metadata.

    - Keeps ALL original properties from the GeoJSON feed.
    - Renames standardized fields:
        ref → gauge_id
        name → station_name
        river → river
        geometry.coordinates → [longitude, latitude]
    - Adds missing standardized columns:
        ['gauge_id','station_name','river','latitude','longitude',
         'altitude','area','country','source']
    - Adds constants:
        country='Ireland', source='OPW Waterlevel.ie'
    - Coordinates are WGS84 (EPSG:4326).
    - Prints the source URL being fetched for transparency.

    Source: https://waterlevel.ie/geojson/  (© OPW, open data)
    """

    url = "https://waterlevel.ie/geojson/"

    try:
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        data = r.json()

        features = data.get("features", [])
        if not features:
            print("No features found in GeoJSON.")
            return pd.DataFrame()

        # Normalize full GeoJSON feature set to keep all nested fields
        df = pd.json_normalize(features)

        # Extract coordinates if available
        if "geometry.coordinates" in df.columns:
            df["longitude"] = df["geometry.coordinates"].apply(lambda c: c[0] if isinstance(c, list) and len(c) > 0 else np.nan)
            df["latitude"] = df["geometry.coordinates"].apply(lambda c: c[1] if isinstance(c, list) and len(c) > 1 else np.nan)

        # Rename standardized property columns (keep all others)
        rename_map = {
            "properties.ref": "gauge_id",
            "properties.name": "station_name",
            "properties.river": "river",
        }
        df = df.rename(columns=rename_map)

        # Derive short ID (trim to last 5 characters)
        if "gauge_id" in df.columns:
            df["gauge_id"] = df["gauge_id"].astype(str).str.strip().apply(lambda x: x[-5:] if len(x) >= 5 else x)

        # Ensure standardized fields exist
        std_cols = [
            "gauge_id",
            "station_name",
            "river",
            "latitude",
            "longitude",
            "altitude",
            "area",
            "country",
            "source",
        ]
        for col in std_cols:
            if col not in df.columns:
                df[col] = np.nan

        # Add constants
        df["country"] = "Ireland"
        df["source"] = "OPW Waterlevel.ie"

        # Convert coordinate types
        df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
        df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")

        # Keep ALL columns — no subsetting
        return df.reset_index(drop=True)

    except Exception as e:
        print(f"Failed to fetch OPW metadata: {e}")
        return pd.DataFrame()

Code for downloading the data

iimport io
import zipfile
import requests
import pandas as pd
import numpy as np
from typing import Optional

def get_opw_data(
    station_id: str,
    variable: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
) -> pd.DataFrame:
    """
    Download hydrological time series data from OPW (Ireland) WaterLevel.ie.

    Parameters
    ----------
    station_id : str
        Station ID (e.g., '01041')
    variable : str
        One of:
            - 'discharge', 'stage', 'temperature' → daily mean (from JSON if available)
            - add '-instantaneous' → 15-min data (from ZIP)
            - add '-max' or '-min' → daily extremes (from JSON)
    start_date, end_date : str, optional
        ISO dates ('YYYY-MM-DD'). Filter applied after loading.

    Returns
    -------
    pd.DataFrame
        Columns: ['time', '<variable>']

    Notes
    -----
    - Units:
        * Stage (S): metres → converted to centimetres
        * Discharge (Q): cumec (m³/s)
        * Temperature (TWater): °C
    - Time zone: UTC
    - Invalid values (≤ -777) are replaced with NaN.
    - JSON endpoint used for daily min/mean/max if available.
      If unavailable, data are retrieved from ZIP and resampled.
    """

    variable = variable.lower()
    instantaneous = variable.endswith("-instantaneous")
    daily_min = variable.endswith("-min")
    daily_max = variable.endswith("-max")
    var_base = variable.replace("-instantaneous", "").replace("-min", "").replace("-max", "")

    var_map = {
        "stage": ("S", "Waterlevel_complete.zip"),
        "discharge": ("Q", "Discharge_complete.zip"),
        "temperature": ("TWater", "WaterTemperature_complete.zip"),
    }

    if var_base not in var_map:
        raise ValueError(
            "Variable must be 'stage', 'discharge', or 'temperature', optionally with "
            "'-instantaneous', '-min', or '-max'."
        )

    #prefix = str(int(station_id[0]))
    param_code, zip_name = var_map[var_base]

    # --- Case 1: JSON full-record endpoint for daily data ---
    if not instantaneous:
        json_url = f"https://waterlevel.ie/hydro-data/data/internet/stations/0/{station_id}/{param_code}/year.json"
        try:
            resp = requests.get(json_url, timeout=60)
            resp.raise_for_status()
            data = resp.json()
            # Select which time series to extract
            if daily_min:
                ts_key = "Min"
            elif daily_max:
                ts_key = "Max"
            else:
                ts_key = "Mean"

            # Find the entry whose ts_shortname contains the desired keyword (case-insensitive)
            matched = [
                d for d in data
                if "ts_shortname" in d and ts_key.lower() in d["ts_shortname"].lower()
            ]
            if matched:
                entry = matched[0]
                series = entry.get("data", [])
                if not series:
                    raise ValueError("Empty JSON data array")

                # Detect number of columns (3 or 4)
                ncols = len(series[0])
                if ncols == 3:
                    df = pd.DataFrame(series, columns=["time", var_base, "quality"])
                elif ncols == 4:
                    df = pd.DataFrame(series, columns=["time", var_base, "quality", "aggregation_accuracy"])
                else:
                    raise ValueError(f"Unexpected number of columns ({ncols}) in JSON data")

                # Clean and convert
                df["time"] = pd.to_datetime(df["time"], utc=True, errors="coerce")
                df[var_base] = pd.to_numeric(df[var_base], errors="coerce")
                df.loc[df[var_base] <= -777, var_base] = np.nan
                df = df[["time", var_base]].dropna()

                # Convert units
                unit = entry.get("ts_unitsymbol", "").lower()
                if var_base == "stage" and unit in ("m", "metres", "meter"):
                    df[var_base] = df[var_base] * 100.0  # m → cm
                elif var_base == "discharge" and unit in ("l/s", "litres/second", "liters/second"):
                    df[var_base] = df[var_base] * 0.001  # L/s → m³/s

                # Filter by date
                if start_date:
                    df = df[df["time"] >= pd.to_datetime(start_date, utc=True)]
                if end_date:
                    df = df[df["time"] <= pd.to_datetime(end_date, utc=True)]

                return df.sort_values("time").reset_index(drop=True)
            else:
                pass #print(f"JSON daily series '{ts_key}' not found, falling back to ZIP.")
        except Exception as e:
            pass 
    # --- Case 2: ZIP download (instantaneous or fallback) ---
    zip_url = f"https://waterlevel.ie/hydro-data/data/internet/stations/0/{station_id}/{param_code}/{zip_name}"

    try:
        resp = requests.get(zip_url, timeout=60)
        resp.raise_for_status()

        with zipfile.ZipFile(io.BytesIO(resp.content)) as z:
            csv_files = [f for f in z.namelist() if f.lower().endswith(".csv")]
            if not csv_files:
                return pd.DataFrame(columns=["time", var_base])
            csv_name = csv_files[0]
            raw_lines = z.read(csv_name).decode("utf-8").splitlines()

        data_lines = [ln for ln in raw_lines if not ln.startswith("#") and ln.strip()]
        meta_lines = [ln for ln in raw_lines if ln.startswith("#")]

        # Metadata
        meta = {}
        for m in meta_lines:
            parts = m.strip("#").split(";", 1)
            if len(parts) == 2:
                meta[parts[0].strip().lower()] = parts[1].strip()

        df = pd.read_csv(
            io.StringIO("\n".join(data_lines)),
            sep=";",
            header=None,
            names=["time", var_base, "quality"],
            usecols=[0, 1],
        )

        df["time"] = pd.to_datetime(df["time"], utc=True, errors="coerce")
        df[var_base] = pd.to_numeric(df[var_base], errors="coerce")
        df.loc[df[var_base] <= -777, var_base] = np.nan

        # Unit conversion
        units = meta.get("ts_unitsymbol", "").lower()
        if var_base == "stage" and units in ("m", "metres", "meter"):
            df[var_base] = df[var_base] * 100.0  # m → cm
        elif var_base == "discharge" and units in ("l/s", "litres/second", "liters/second"):
            df[var_base] = df[var_base] * 0.001  # L/s → m³/s

        # Filter dates
        if start_date:
            df = df[df["time"] >= pd.to_datetime(start_date, utc=True)]
        if end_date:
            df = df[df["time"] <= pd.to_datetime(end_date, utc=True)]

        # --- Aggregate to daily (fallback) ---
        if not instantaneous:
            agg_func = "mean"
            if daily_min:
                agg_func = "min"
            elif daily_max:
                agg_func = "max"

            df = (
                df.set_index("time")[var_base]
                .resample("1D")
                .agg(agg_func)
                .dropna()
                .reset_index()
            )

        return df.sort_values("time").reset_index(drop=True)

    except Exception as e:
        return pd.DataFrame(columns=["time", var_base])

Example usage

# Daily mean discharge data
df_q_daily = get_opw_data("01041", "discharge", start_date="2020-01-01", end_date="2020-12-31")
print(df_q_daily)

# Full 15-minute discharge data
df_q_inst = get_opw_data("25308", "temperature-instantaneous")
print(df_q_inst)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions