-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
Related to issue #5
Code for list of stations (and metadata #1):
import re
import requests
import pandas as pd
import numpy as np
from pyproj import Transformer
AREA_RE = re.compile(r'(\d+(?:[.,]\d+)?)\s*(?:km2|km²|km\^2)', re.IGNORECASE)
def _parse_area(description: str):
if not description or not isinstance(description, str):
return None
m = AREA_RE.search(description)
if not m:
return None
val = m.group(1).replace(',', '.')
try:
return float(val)
except ValueError:
return None
def _extract_lonlat(station):
"""Extract coordinates (x,y,srid) from station or measurementPoints."""
loc = station.get("location") if isinstance(station.get("location"), dict) else {}
if not loc or loc.get("x") is None or loc.get("y") is None:
mps = station.get("measurementPoints") or []
if mps and isinstance(mps, list):
mploc = mps[0].get("location") if isinstance(mps[0].get("location"), dict) else {}
loc = mploc if mploc.get("x") is not None and mploc.get("y") is not None else {}
if not loc:
return None, None, None
return loc.get("x"), loc.get("y"), (loc.get("srid") or "").lower()
def get_vandah_metadata() -> pd.DataFrame:
"""
Fetch station metadata from VandA/H Miljøportal.
- Keeps ALL original fields from the API.
- Renames standardized columns:
stationId → gauge_id
name → station_name
Extracts river from station_name (before first comma).
- Adds or fills standardized fields with NaN if missing:
['gauge_id','station_name','river','latitude','longitude',
'altitude','area','country','source']
- Converts coordinates from EPSG:25832 → EPSG:4326 (WGS84).
- Prints the URL being fetched for transparency.
"""
url = "https://vandah.miljoeportal.dk/api/stations"
print(f"Fetching VandA/H Miljøportal metadata from: {url}")
headers = {"Accept": "application/json", "User-Agent": "Mozilla/5.0 (compatible)"}
try:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
stations = resp.json()
if not isinstance(stations, list) or not stations:
print("No stations found in response.")
return pd.DataFrame()
# Flatten full JSON to keep all fields
df = pd.json_normalize(stations)
# Extract and convert coordinates
tf_25832_to_4326 = Transformer.from_crs("EPSG:25832", "EPSG:4326", always_xy=True)
lats, lons, areas, rivers = [], [], [], []
for s in stations:
x, y, srid = _extract_lonlat(s)
lon = lat = None
if x is not None and y is not None:
try:
if srid and ("25832" in srid or "utm32" in srid or "etrs89" in srid):
lon, lat = tf_25832_to_4326.transform(x, y)
elif "4326" in srid or "wgs84" in srid:
lon, lat = float(x), float(y)
else:
lon, lat = float(x), float(y)
except Exception:
lon = lat = None
lons.append(lon)
lats.append(lat)
# Parse area (km²) from description
areas.append(_parse_area(s.get("description") or ""))
# Extract river from station name
name = (s.get("name") or "").strip()
rivers.append(name.split(",")[0].strip() if name else None)
# Add derived fields to flattened DataFrame
df["latitude"] = lats
df["longitude"] = lons
df["area"] = areas
df["river"] = rivers
# Rename standardized columns
rename_map = {
"stationId": "gauge_id",
"name": "station_name",
}
df = df.rename(columns=rename_map)
# Ensure standardized fields exist
std_cols = [
"gauge_id",
"station_name",
"river",
"latitude",
"longitude",
"altitude",
"area",
"country",
"source",
]
for col in std_cols:
if col not in df.columns:
df[col] = np.nan
# Add constants
df["country"] = "Denmark"
df["source"] = "VandA/H Miljøportal"
# Type conversions
df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")
df["area"] = pd.to_numeric(df["area"], errors="coerce")
df["gauge_id"] = df["gauge_id"].astype(str).str.strip()
return df.reset_index(drop=True)
except Exception as e:
print(f"Failed to fetch VandA/H metadata: {e}")
return pd.DataFrame()
Example usage
df_vandah = get_vandah_metadata()
print(df_vandah)
Code for downloading the data
import requests
import pandas as pd
import numpy as np
def get_vandah_data(
station_id: str,
variable: str,
start_date: str = None,
end_date: str = None,
) -> pd.DataFrame:
"""
Download hydrological time series data from VandA/H Miljøportal.
Parameters
----------
station_id : str
Station ID (e.g., '30000681').
variable : str
One of:
'discharge' or 'discharge-instantaneous' → water flow (originally L/s, converted to m³/s)
'stage' or 'stage-instantaneous' → water level elevation (cm, above sea level)
start_date : str, optional
Start date in 'YYYY-MM-DD' format (RFC3339 UTC is used internally).
If not provided, defaults to one year before today.
end_date : str, optional
End date in 'YYYY-MM-DD' format (RFC3339 UTC is used internally).
If not provided, defaults to today.
Notes
-----
- If no dates are specified, the function automatically retrieves data for the **last 12 months**.
- Instantaneous data are measured every 10 minutes (~52,560 points per year).
To avoid memory issues, do **not request more than 3 years** of instantaneous data.
- Daily (non-instantaneous) data are aggregated by mean over each day.
- For 'stage', only the elevation-corrected water level (cm above sea level) is returned.
Returns
-------
pd.DataFrame
Columns: ['time', '<variable>']
- Discharge is in m³/s
- Stage is in cm (absolute elevation)
"""
# Detect if user requested instantaneous data
instantaneous = variable.endswith("-instantaneous")
var_base = variable.replace("-instantaneous", "")
# Mapping of variables → API endpoints
var_map = {
"discharge": "water-flows",
"stage": "water-levels",
}
if var_base not in var_map:
raise ValueError("Variable must be one of: 'discharge' or 'stage', optionally with '-instantaneous'.")
base_url = f"https://vandah.miljoeportal.dk/api/{var_map[var_base]}"
params = {"stationId": station_id, "format": "json"}
# --- Handle date filters (default = last year) ---
if not start_date:
start_dt = pd.Timestamp.utcnow() - pd.DateOffset(years=1)
start_date = start_dt.strftime("%Y-%m-%d")
else:
start_dt = pd.to_datetime(start_date)
if not end_date:
end_dt = pd.Timestamp.utcnow()
end_date = end_dt.strftime("%Y-%m-%d")
else:
end_dt = pd.to_datetime(end_date)
start_iso = start_dt.strftime("%Y-%m-%dT00:00Z")
end_iso = end_dt.strftime("%Y-%m-%dT23:59Z")
params["from"] = start_iso
params["to"] = end_iso
# Safety check for instantaneous data duration
if instantaneous:
duration_years = (end_dt - start_dt).days / 365.25
if duration_years > 3:
raise MemoryError(
f"Requested period is {duration_years:.1f} years. "
"Please request less than 3 years of instantaneous (10-min) data "
"to avoid excessive memory usage."
)
# Perform request
r = requests.get(base_url, params=params, timeout=60)
if r.status_code != 200:
print(f"Request failed ({r.status_code}) for {station_id}")
return pd.DataFrame(columns=["time", var_base])
try:
data = r.json()
except Exception:
print("Response not in JSON format")
return pd.DataFrame(columns=["time", var_base])
if not data or not isinstance(data, list):
print(f"No data found for {station_id}")
return pd.DataFrame(columns=["time", var_base])
# Extract time series
records = []
for item in data:
for res in item.get("results", []):
t = res.get("measurementDateTime")
val = res.get("result")
val_corr = res.get("resultElevationCorrected")
if t is None:
continue
try:
ts = pd.to_datetime(t, utc=True)
rec = {"time": ts}
if var_base == "stage":
val_corr = pd.to_numeric(val_corr, errors="coerce")
rec[var_base] = val_corr # only elevation-corrected value
else:
val = pd.to_numeric(val, errors="coerce")
rec[var_base] = val
records.append(rec)
except Exception:
continue
if not records:
return pd.DataFrame(columns=["time", var_base])
df = pd.DataFrame(records).sort_values("time").reset_index(drop=True)
# Replace invalid values (e.g. -777)
for col in df.columns:
if col != "time":
df.loc[df[col] <= -777, col] = np.nan
# Handle very large datasets
if len(df) > 2_000_000 and instantaneous:
raise MemoryError(f"Too much data returned ({len(df)} rows) for {station_id}. Try a shorter period.")
# If not instantaneous, aggregate to daily mean
if not instantaneous:
agg_cols = [c for c in df.columns if c != "time"]
df = (
df.set_index("time")[agg_cols]
.resample("1D")
.mean()
.dropna(how="all")
.reset_index()
)
# Convert discharge from L/s → m³/s
if var_base == "discharge":
df[var_base] = df[var_base] * 0.001
# Filter by date range again (in case of aggregation)
if start_date:
df = df[df["time"] >= start_date]
if end_date:
df = df[df["time"] <= end_date]
return df
Example usage
# Daily discharge
df_daily = get_vandah_data("03000501", "discharge", "2023-01-01", "2023-12-31")
# 10-min instantaneous discharge (less than 3 years)
df_inst = get_vandah_data("30000681", "discharge-instantaneous", "2023-01-01", "2023-12-31")
Metadata
Metadata
Assignees
Labels
No labels