Skip to content

Estonia (add more countries) #74

@thiagovmdon

Description

@thiagovmdon

Related to issue #5

Code for list of stations (and metadata #1):

import requests
import pandas as pd
import numpy as np
import re
import unicodedata
from pyproj import Transformer

def get_estonian_station_metadata() -> pd.DataFrame:
    """
    Fetch and merge Estonian hydrological station metadata from:
      1. EstModel API (https://estmodel.envir.ee/stations)
      2. Estonian Geoportal WFS (WISKI hydrology database)
         https://inspire.geoportaal.ee/geoserver/EF_hydrojaamad/wfs

    - Keeps ALL original columns from both sources.
    - Renames standardized columns only:
        code → gauge_id
        name → station_name
        Extracts river and location from station_name.
    - Adds or fills standardized fields with NaN if missing:
        ['gauge_id','station_name','river','area','latitude',
         'longitude','country','source']
    - Adds constants:
        country='Estonia', source='EstModel + WISKI'
    - Converts coordinates from EPSG:3301 → WGS84 (EPSG:4326).
    - Prints source URLs being fetched for transparency.
    """

    def normalize_name(name: str) -> str:
        """Normalize station and location names for fuzzy matching."""
        if not isinstance(name, str):
            return ""
        name = name.lower()
        name = ''.join(c for c in unicodedata.normalize('NFKD', name)
                       if not unicodedata.combining(c))
        name = (name
                .replace("ä", "a")
                .replace("ö", "o")
                .replace("ü", "u")
                .replace("õ", "o"))
        name = re.sub(r"(.)\1+", r"\1", name)
        name = re.sub(r"[-/:.,]", " ", name)
        name = re.sub(r"h[üu]dro\w*|jaam", "", name)
        name = re.sub(r"\s+", " ", name).strip()
        return name

    def extract_river_from_station(station_name: str) -> str:
        """Extract river name (before colon)."""
        if not isinstance(station_name, str):
            return None
        parts = station_name.split(":")
        return parts[0].strip() if len(parts) > 1 else None

    def extract_location_from_station(station_name: str) -> str:
        """Extract location name (after colon)."""
        if not isinstance(station_name, str):
            return ""
        parts = station_name.split(":")
        return parts[1].strip() if len(parts) > 1 else station_name.strip()

    # --- Fetch EstModel stations ---
    url_estmodel = "https://estmodel.envir.ee/stations"

    r1 = requests.get(url_estmodel, timeout=60)
    r1.raise_for_status()
    r1.encoding = "utf-8"
    data_est = r1.json()

    df_est = pd.DataFrame(data_est)
    # Rename standardized fields if present
    rename_map = {"code": "gauge_id", "name": "station_name"}
    df_est = df_est.rename(columns=rename_map)

    # Add extracted fields
    df_est["river"] = df_est["station_name"].apply(extract_river_from_station)
    df_est["location"] = df_est["station_name"].apply(extract_location_from_station)

    # Filter only hydrological stations
    df_est = df_est[df_est.get("type", "").eq("HYDROLOGICAL")].copy()

    # Add missing standardized fields
    std_cols = [
        "gauge_id", "station_name", "river", "area",
        "latitude", "longitude", "altitude", "country", "source"
    ]
    for col in std_cols:
        if col not in df_est.columns:
            df_est[col] = np.nan

    # --- Fetch WISKI (Geoportal) stations ---
    url_wfs = (
        "https://inspire.geoportaal.ee/geoserver/EF_hydrojaamad/wfs"
        "?request=GetFeature&service=WFS&version=2.0.0"
        "&outputFormat=application/json"
        "&typeNames=EF_hydrojaamad:EF.EnvironmentalMonitoringFacilities"
    )

    r2 = requests.get(url_wfs, timeout=60)
    r2.raise_for_status()
    data_wiski = r2.json()

    transformer = Transformer.from_crs("EPSG:3301", "EPSG:4326", always_xy=True)

    wiski_rows = []
    for f in data_wiski.get("features", []):
        props = f.get("properties", {})
        geom = f.get("geometry", {})
        coords = geom.get("coordinates", [None, None])
        if None in coords:
            continue
        lon, lat = transformer.transform(coords[0], coords[1])
        props["latitude"] = lat
        props["longitude"] = lon
        wiski_rows.append(props)

    df_wiski = pd.DataFrame(wiski_rows)

    # --- Merge by fuzzy-normalized location ---
    for i, row in df_est.iterrows():
        loc_est = normalize_name(row.get("location", ""))
        for _, wrow in df_wiski.iterrows():
            name_wiski = normalize_name(wrow.get("name", ""))
            if loc_est and (loc_est in name_wiski or name_wiski in loc_est):
                df_est.at[i, "latitude"] = wrow.get("latitude")
                df_est.at[i, "longitude"] = wrow.get("longitude")
                break

    # --- Add manual fallback coordinates ---
    manual_coords = {
        "Leisi jõgi: Elu": (58.51293, 22.69738),
        "Pärnu jõgi: Särevere": (58.78900, 25.42111),
    }
    for name, (lat, lon) in manual_coords.items():
        mask = df_est["station_name"].str.contains(name, case=False, na=False)
        df_est.loc[mask & df_est["latitude"].isna(), ["latitude", "longitude"]] = lat, lon

    # --- Add constants and cleanup ---
    df_est["country"] = "Estonia"
    df_est["altitude"] = np.nan

    df_est["source"] = "EstModel + WISKI"
    df_est["latitude"] = pd.to_numeric(df_est["latitude"], errors="coerce")
    df_est["longitude"] = pd.to_numeric(df_est["longitude"], errors="coerce")
    df_est["gauge_id"] = df_est["gauge_id"].astype(str).str.strip()

    df_est = df_est.dropna(subset=["gauge_id"]).reset_index(drop=True)
    return df_est


Code for downloading the data

import requests
import pandas as pd
import numpy as np
from typing import Optional

def get_estmodel_data(
    station_id: str,
    variable: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
) -> pd.DataFrame:
    """
    Download daily hydrological time series from the EstModel API.
    Source: https://estmodel.envir.ee

    Returns a DataFrame with columns: ['time', '<variable>'].

    Parameters
    ----------
    station_id : str
        Station code in the EstModel API.
    variable : str
        One of:
            'discharge-daily-mean', 'discharge-daily-max', 'discharge-daily-min',
            'stage-daily-mean', 'stage-daily-max', 'stage-daily-min',
            'temperature-daily-mean', 'temperature-daily-max', 'temperature-daily-min'
    start_date, end_date : str, optional
        ISO format 'YYYY-MM-DD'. If omitted:
            start_date → '1900-01-01'
            end_date   → today
    show_url : bool
        If True, prints the full request URL before calling the API.
    """
    ESTMODEL_VARIABLE_MAP = {
        "discharge-daily-mean": ("Q", "MEAN"),
        "discharge-daily-max": ("Q", "MAXIMUM"),
        "discharge-daily-min": ("Q", "MINIMUM"),

        "stage-daily-mean": ("H", "MEAN"),
        "stage-daily-max": ("H", "MAXIMUM"),
        "stage-daily-min": ("H", "MINIMUM"),

        "temperature-daily-mean": ("T", "MEAN"),
        "temperature-daily-max": ("T", "MAXIMUM"),
        "temperature-daily-min": ("T", "MINIMUM"),
    }
    variable = variable.lower()
    if variable not in ESTMODEL_VARIABLE_MAP:
        raise ValueError(f"Invalid variable '{variable}'. Allowed: {list(ESTMODEL_VARIABLE_MAP.keys())}")

    # Default date handling
    if not start_date:
        start_date = "1900-01-01"
    if not end_date:
        end_date = pd.Timestamp.now().date().strftime("%Y-%m-%d")

    # Convert to year integers for API
    start_year = pd.to_datetime(start_date).year
    end_year = pd.to_datetime(end_date).year

    param, dtype = ESTMODEL_VARIABLE_MAP[variable]

    base_url = f"https://estmodel.envir.ee/stations/{station_id}/measurements"
    params = {
        "parameter": param,
        "type": dtype,
        "start-year": start_year,
        "end-year": end_year,
    }

    r = requests.get(base_url, params=params, timeout=30)
    r.raise_for_status()
    data = r.json()

    if not isinstance(data, list) or not data:
        return pd.DataFrame(columns=["time", variable])

    df = pd.DataFrame(data)

    # Required fields: startDate + value
    if "startDate" not in df or "value" not in df:
        return pd.DataFrame(columns=["time", variable])

    df = df.rename(columns={"startDate": "time", "value": variable})
    df["time"] = pd.to_datetime(df["time"], errors="coerce")
    df[variable] = pd.to_numeric(df[variable], errors="coerce")

    df = df.dropna(subset=["time"])
    df = df.drop_duplicates(subset="time", keep="first")
    df = df.sort_values("time").reset_index(drop=True)
    df = df[["time", variable]]
    
    # Filter by date range again (in case of aggregation)
    if start_date:
        df = df[df["time"] >= start_date]
    if end_date:
        df = df[df["time"] <= end_date]

    return df

Example usage:

df = get_estmodel_data(
    station_id="SJA8821000",
    variable="discharge-daily-max",
    start_date="1959-01-01",
    end_date="2024-12-31",
)

print(df)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions