Source code for rivretrieve.germany_berlin
"""Fetcher for Berlin river gauge data from Wasserportal Berlin."""
import logging
from io import StringIO
from typing import Optional
import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from pyproj import Transformer
from . import base, constants, utils
logger = logging.getLogger(__name__)
[docs]
class GermanyBerlinFetcher(base.RiverDataFetcher):
"""Fetches river gauge data from Wasserportal Berlin.
Data source:
https://wasserportal.berlin.de/
Supported variables:
- constants.STAGE_DAILY_MEAN (m)
- constants.DISCHARGE_DAILY_MEAN (m³/s)
- constants.WATER_TEMPERATURE_DAILY_MEAN (°C)
- constants.STAGE_INSTANT (m)
- constants.DISCHARGE_INSTANT (m³/s)
Data description and API:
- see https://wasserportal.berlin.de/download/
Terms of use:
- see https://daten.berlin.de/impressum
"""
METADATA_URL = "https://wasserportal.berlin.de/start.php?anzeige=tabelle_ow&messanzeige=ms_all"
BASE_URL = (
"https://wasserportal.berlin.de/station.php"
"?anzeige=d&station={id}&thema={thema}&sreihe={frequency}&smode=c&sdatum={start_date}"
)
[docs]
@staticmethod
def get_cached_metadata() -> pd.DataFrame:
"""Retrieves cached metadata (if available)."""
return utils.load_cached_metadata_csv("germany_berlin")
[docs]
def get_metadata(self) -> pd.DataFrame:
"""Downloads and parses site metadata from Wasserportal Berlin.
Keeps all original columns, but renames and standardizes key fields.
Converts UTM33N (EPSG:32633) → WGS84 (EPSG:4326) coordinates.
"""
try:
logger.info(f"Fetching Berlin metadata from {self.METADATA_URL}")
resp = requests.get(self.METADATA_URL, timeout=20)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
table = soup.find("table")
if table is None:
raise ValueError("No table found in metadata page.")
df = pd.read_html(str(table))[0]
df.columns = [c.strip() for c in df.columns]
rename_map = {
"Messstellen- nummer": constants.GAUGE_ID,
"Messstellen- name": constants.STATION_NAME,
"Gewässer": constants.RIVER,
"Rechts- wert": "utm_easting",
"Hoch- wert": "utm_northing",
}
df = df.rename(columns=rename_map)
# Convert numeric UTM coordinates
if "utm_easting" in df.columns and "utm_northing" in df.columns:
df["utm_easting"] = pd.to_numeric(df["utm_easting"], errors="coerce")
df["utm_northing"] = pd.to_numeric(df["utm_northing"], errors="coerce")
# Drop invalid coordinates
df = df.dropna(subset=["utm_easting", "utm_northing"])
# Fix scaling (some values are 10× larger)
mask_large = df["utm_easting"] > 1_000_000
df.loc[mask_large, "utm_easting"] = df.loc[mask_large, "utm_easting"] / 10
transformer = Transformer.from_crs("EPSG:32633", "EPSG:4326", always_xy=True)
lon, lat = transformer.transform(df["utm_easting"].values, df["utm_northing"].values)
df[constants.LONGITUDE] = lon
df[constants.LATITUDE] = lat
else:
df[constants.LONGITUDE] = np.nan
df[constants.LATITUDE] = np.nan
# Add standardized metadata fields if missing
for col in [
constants.GAUGE_ID,
constants.STATION_NAME,
constants.RIVER,
constants.LATITUDE,
constants.LONGITUDE,
constants.ALTITUDE,
constants.AREA,
constants.COUNTRY,
constants.SOURCE,
]:
if col not in df.columns:
df[col] = np.nan
df[constants.ALTITUDE] = None
df[constants.AREA] = None
df[constants.COUNTRY] = "Germany"
df[constants.SOURCE] = "Wasserportal Berlin"
# Clean and type-correct
df[constants.GAUGE_ID] = df[constants.GAUGE_ID].astype(str).str.strip()
df[constants.LATITUDE] = pd.to_numeric(df[constants.LATITUDE], errors="coerce")
df[constants.LONGITUDE] = pd.to_numeric(df[constants.LONGITUDE], errors="coerce")
# Drop duplicates
df = df.drop_duplicates(subset=[constants.GAUGE_ID])
logger.info(f"Fetched {len(df)} Berlin gauge metadata records.")
return df.reset_index(drop=True)
except Exception as e:
logger.error(f"Failed to fetch metadata: {e}")
return pd.DataFrame()
[docs]
@staticmethod
def get_available_variables() -> tuple[str, ...]:
return (
constants.STAGE_DAILY_MEAN,
constants.DISCHARGE_DAILY_MEAN,
constants.WATER_TEMPERATURE_DAILY_MEAN,
constants.STAGE_INSTANT,
constants.DISCHARGE_INSTANT,
)
def _download_data(
self,
gauge_id: str,
variable: str,
start_date: str,
end_date: str,
) -> pd.DataFrame:
"""Downloads CSV data for a gauge and variable."""
thema_map = {
# Daily
constants.STAGE_DAILY_MEAN: ("ows", "tw"),
constants.DISCHARGE_DAILY_MEAN: ("odf", "tw"),
constants.WATER_TEMPERATURE_DAILY_MEAN: ("owt", "tw"),
# Instantaneous
constants.STAGE_INSTANT: ("ows", "ew"),
constants.DISCHARGE_INSTANT: ("odf", "ew"),
}
if variable not in thema_map:
raise ValueError(f"Unsupported variable: {variable}")
thema, frequency = thema_map[variable]
start_date_fmt = pd.to_datetime(start_date).strftime("%d.%m.%Y")
url = self.BASE_URL.format(id=gauge_id, thema=thema, frequency=frequency, start_date=start_date_fmt)
logger.info(f"Fetching {variable} for {gauge_id} from {url}")
r = requests.get(url, timeout=20)
r.raise_for_status()
csv_text = r.text.strip()
if not csv_text or "<html" in csv_text or "Fehler" in csv_text:
logger.warning(f"No data returned for {gauge_id} ({variable})")
return pd.DataFrame()
try:
df = pd.read_csv(StringIO(csv_text), sep=";", decimal=",", encoding="utf-8")
return df
except Exception as e:
logger.error(f"Error parsing CSV for {gauge_id}: {e}")
return pd.DataFrame()
def _parse_data(self, gauge_id: str, raw_data: pd.DataFrame, variable: str) -> pd.DataFrame:
"""Parses Wasserportal CSV to standardized DataFrame."""
if raw_data.empty:
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])
raw_data.columns = [c.strip().lower() for c in raw_data.columns]
time_col = next((c for c in raw_data.columns if "datum" in c or "zeit" in c), raw_data.columns[0])
val_col = next(
(c for c in raw_data.columns if c not in [time_col] and raw_data[c].dtype != "O"), raw_data.columns[1]
)
raw_data[constants.TIME_INDEX] = pd.to_datetime(raw_data[time_col], dayfirst=True, errors="coerce")
raw_data[variable] = pd.to_numeric(raw_data[val_col], errors="coerce")
if variable == constants.STAGE_DAILY_MEAN:
raw_data[variable] = raw_data[variable] / 100.0 # cm → m
df = (
raw_data[[constants.TIME_INDEX, variable]]
.dropna()
.sort_values(by=constants.TIME_INDEX)
.set_index(constants.TIME_INDEX)
)
return df
[docs]
def get_data(
self,
gauge_id: str,
variable: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> pd.DataFrame:
start_date = utils.format_start_date(start_date)
end_date = utils.format_end_date(end_date)
if variable not in self.get_available_variables():
raise ValueError(f"Unsupported variable: {variable}")
try:
raw_df = self._download_data(gauge_id, variable, start_date, end_date)
df = self._parse_data(gauge_id, raw_df, variable)
return df.loc[(df.index >= start_date) & (df.index <= end_date)]
except Exception as e:
logger.error(f"Failed to get data for site {gauge_id}, variable {variable}: {e}")
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])