-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
Related to issue #5
Code for list of stations (and metadata #1):
import requests
import pandas as pd
import numpy as np
from tqdm import tqdm
def dms_to_decimal(dms: str) -> float:
"""Convert DMS format (e.g., '126-59-43') to decimal degrees."""
if not isinstance(dms, str) or not dms.strip():
return np.nan
try:
d, m, s = map(float, dms.split('-'))
return d + m / 60 + s / 3600
except Exception:
return np.nan
def get_wamis_metadata() -> pd.DataFrame:
"""
Fetch metadata for all WAMIS (Korea) water-level gauging stations.
Returns
-------
pd.DataFrame
Columns:
['gauge_id', 'station_name', 'river', 'latitude', 'longitude',
'altitude', 'area', 'country', 'source']
Notes
-----
- Data source: WAMIS Open API (http://www.wamis.go.kr)
- Combines station listings (wl_dubwlobs) and detailed info (wl_obsinfo)
- Coordinates are converted from DMS to decimal degrees (WGS84)
- 'area' = catchment area (㎢)
- 'altitude' = zero-level elevation (m)
- Country = 'Korea'
- Source = 'WAMIS Open API'
"""
# --- Step 1: Get list of all station codes ---
list_url = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_dubwlobs"
try:
resp = requests.get(list_url, params={"output": "json"}, timeout=30)
resp.raise_for_status()
stations = resp.json().get("list", [])
if not stations:
print("No stations found in wl_dubwlobs.")
return pd.DataFrame()
station_ids = [s["obscd"] for s in stations if "obscd" in s]
except Exception as e:
print(f"Failed to fetch station list: {e}")
return pd.DataFrame()
# --- Step 2: Retrieve detailed info for each station ---
info_url = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_obsinfo"
df_all = pd.DataFrame()
for sid in tqdm(station_ids, desc="Fetching WAMIS metadata"):
try:
r = requests.get(info_url, params={"obscd": sid, "output": "json"}, timeout=10)
data = r.json()
if data.get("result", {}).get("code") == "success" and "list" in data:
df = pd.json_normalize(data["list"])
df_all = pd.concat([df_all, df], ignore_index=True)
except Exception:
continue
if df_all.empty:
print("No metadata records retrieved.")
return pd.DataFrame()
# --- Step 3: Standardize and clean ---
df_all["gauge_id"] = df_all["wlobscd"].astype(str)
df_all["station_name"] = df_all["obsnmeng"]
df_all["river"] = df_all.get("rivnm", None)
# Convert DMS → decimal degrees
df_all["longitude"] = df_all["lon"].apply(dms_to_decimal)
df_all["latitude"] = df_all["lat"].apply(dms_to_decimal)
# Numeric conversions
df_all["altitude"] = pd.to_numeric(df_all["gdt"], errors="coerce") # zero-level elevation (EL.m)
df_all["area"] = pd.to_numeric(df_all["bsnara"], errors="coerce") # catchment area (㎢)
df_all["country"] = "Korea"
df_all["source"] = "WAMIS Open API"
keep_cols = [
"gauge_id",
"station_name",
"river",
"latitude",
"longitude",
"altitude",
"area",
"country",
"source",
]
df_final = df_all[keep_cols].dropna(subset=["gauge_id"]).drop_duplicates(subset=["gauge_id"])
df_final = df_final.reset_index(drop=True)
return df_final
Code for downloading the data
import requests
import pandas as pd
import numpy as np
from typing import Optional
from datetime import datetime
def get_wamis_data(
gauge_id: str,
variable: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> pd.DataFrame:
"""
Download hydrological time series data (flow or water level)
from the Korean WAMIS Open API.
API references
--------------
- Flow (discharge, daily): http://www.wamis.go.kr:8080/wamis/openapi/wkw/flw_dtdata
- Water level (stage, daily): http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_dtdata
- Water level (stage, hourly): http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_hrdata
Parameters
----------
gauge_id : str
Observatory code (Obscd).
variable : str
One of:
- 'discharge' → daily flow (m³/s)
- 'stage' → daily water level (m)
- 'stage_instantaneous' → hourly water level (m)
start_date, end_date : str, optional
ISO-format dates ('YYYY-MM-DD'). Default: current year.
Returns
-------
pd.DataFrame
Columns: ['time', '<variable>']
Notes
-----
- Flow ('fw') is in m³/s.
- Stage ('wl') is in meters (m).
- Dates parsed automatically.
- Invalid or missing values (≤ -777) are replaced with NaN.
- All variables loop by year for reliability and completeness.
"""
variable = variable.lower()
if variable not in ("discharge", "stage", "stage_instantaneous"):
raise ValueError("Variable must be 'discharge', 'stage', or 'stage_instantaneous'.")
# Parse date range
start_dt = pd.to_datetime(start_date) if start_date else pd.Timestamp(datetime.now().year, 1, 1)
end_dt = pd.to_datetime(end_date) if end_date else pd.Timestamp.now()
years = range(start_dt.year, end_dt.year + 1)
all_data = []
# Map endpoints
if variable == "discharge":
url = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/flw_dtdata"
value_field = "fw"
date_field = "ymd"
elif variable == "stage":
url = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_dtdata"
value_field = "wl"
date_field = "ymd"
else: # stage_instantaneous
url = "http://www.wamis.go.kr:8080/wamis/openapi/wkw/wl_hrdata"
value_field = "wl"
date_field = "ymdh"
# --- Loop per year for reliability ---
for year in years:
# Build params depending on variable type
if variable == "discharge":
params = {"obscd": gauge_id, "year": str(year), "output": "json"}
else:
# For stage, restrict to year range to avoid truncation
start_chunk = max(start_dt, pd.Timestamp(year=year, month=1, day=1))
end_chunk = min(end_dt, pd.Timestamp(year=year, month=12, day=31))
params = {
"obscd": gauge_id,
"startdt": start_chunk.strftime("%Y%m%d"),
"enddt": end_chunk.strftime("%Y%m%d"),
"output": "json",
}
try:
resp = requests.get(url, params=params, timeout=30)
print(f"Fetching from: {resp.url}")
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict) or "list" not in data:
continue
df = pd.DataFrame(data["list"])
if df.empty or date_field not in df.columns or value_field not in df.columns:
continue
df = df.rename(columns={date_field: "time", value_field: variable})
df["time"] = pd.to_datetime(
df["time"],
format="%Y%m%d%H" if variable == "stage_instantaneous" else "%Y%m%d",
errors="coerce",
)
df[variable] = pd.to_numeric(df[variable], errors="coerce")
df.loc[df[variable] <= -777, variable] = np.nan
all_data.append(df)
except Exception as e:
print(f"Failed to fetch WAMIS {variable} data for {gauge_id} ({year}): {e}")
continue
if not all_data:
return pd.DataFrame(columns=["time", variable])
# Combine and filter
df_all = pd.concat(all_data, ignore_index=True)
df_all = df_all.dropna(subset=["time", variable])
df_all = df_all[(df_all["time"] >= start_dt) & (df_all["time"] <= end_dt)]
df_all = df_all.drop_duplicates(subset="time", keep="first")
df_all = df_all.sort_values("time").reset_index(drop=True)
return df_all
Example usage
df_q = get_wamis_data("1001655", "discharge", start_date="2000-10-01", end_date= "2025-03-31")
print(df_q)
Metadata
Metadata
Assignees
Labels
No labels