Source code for rivretrieve.canada

"""Fetcher for Canadian river gauge data from HYDAT."""

import io
import logging
import os
import re
import shutil
import sqlite3
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Optional

import pandas as pd
from bs4 import BeautifulSoup

from . import base, constants, utils

logger = logging.getLogger(__name__)


[docs] class CanadaFetcher(base.RiverDataFetcher): """Fetches river gauge data from Canada's National Hydrometric Program (HYDAT). Data Source: HYDAT Database (https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/) This fetcher downloads the entire HYDAT SQLite database on first use. Supported Variables: - ``constants.DISCHARGE_DAILY_MEAN`` (m³/s) - ``constants.STAGE_DAILY_MEAN`` (m) """ HYDAT_URL = "https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/" DATA_DIR = Path(os.path.dirname(__file__)) / "data" HYDAT_PATH = DATA_DIR / "Hydat.sqlite3"
[docs] @staticmethod def get_cached_metadata() -> pd.DataFrame: """Retrieves a DataFrame of available Canadian gauge IDs and metadata. This method loads the metadata from a cached CSV file located in the ``rivretrieve/cached_site_data/`` directory. Returns: pd.DataFrame: A DataFrame indexed by gauge_id, containing site metadata. """ return utils.load_cached_metadata_csv("canada")
[docs] @staticmethod def get_available_variables() -> tuple[str, ...]: return (constants.DISCHARGE_DAILY_MEAN, constants.STAGE_DAILY_MEAN)
def _find_latest_hydat_link(self) -> Optional[str]: s = utils.requests_retry_session() try: response = s.get(self.HYDAT_URL) response.raise_for_status() soup = BeautifulSoup(response.text, "lxml") links = soup.find_all("a") latest_date = None latest_link = None for link in links: href = link.get("href") if href: match = re.match(r"Hydat_sqlite3_(\d{8})\.zip", href) if match: date_str = match.group(1) try: current_date = datetime.strptime(date_str, "%Y%m%d") if latest_date is None or current_date > latest_date: latest_date = current_date latest_link = self.HYDAT_URL + href except ValueError: continue return latest_link except Exception as e: logger.error(f"Error finding latest HYDAT link: {e}") return None def _download_hydat(self) -> bool: """Downloads and extracts the latest HYDAT SQLite database.""" logger.info("Checking for latest HYDAT database...") latest_link = self._find_latest_hydat_link() if not latest_link: logger.error("Could not find download link for HYDAT database.") return False zip_filename = latest_link.split("/")[-1] sqlite_filename = zip_filename.replace(".zip", ".sqlite3") self.HYDAT_PATH = self.DATA_DIR / sqlite_filename if self.HYDAT_PATH.exists(): logger.info(f"Latest HYDAT database {sqlite_filename} already exists at {self.HYDAT_PATH}") return True logger.info(f"Downloading {zip_filename}...") s = utils.requests_retry_session() try: # HYDAT no longer requires license click-through on this new base URL response = s.get(latest_link, stream=True, timeout=300) response.raise_for_status() self.DATA_DIR.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(io.BytesIO(response.content)) as zf: extracted = False for name in zf.namelist(): if name.endswith(".sqlite3"): zf.extract(name, path=self.DATA_DIR) # Rename to the expected versioned filename shutil.move(self.DATA_DIR / name, self.HYDAT_PATH) extracted = True break if not extracted: logger.error(f"No .sqlite3 file found in {zip_filename}.") return False logger.info(f"Successfully downloaded and extracted HYDAT to {self.HYDAT_PATH}") return True except Exception as e: logger.error(f"Error downloading or extracting HYDAT: {e}") if self.HYDAT_PATH.exists(): # Clean up partial extraction os.remove(self.HYDAT_PATH) return False def _get_hydat_connection(self): if not self.HYDAT_PATH.exists(): if not self._download_hydat(): raise FileNotFoundError("Failed to download HYDAT database.") return sqlite3.connect(self.HYDAT_PATH)
[docs] def get_data( self, gauge_id: str, variable: str, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> pd.DataFrame: """Fetches and parses time series data for a specific gauge and variable. This method retrieves the requested data from the provider's API or data source, parses it, and returns it in a standardized pandas DataFrame format. Args: gauge_id: The site-specific identifier for the gauge. variable: The variable to fetch. Must be one of the strings listed in the fetcher's ``get_available_variables()`` output. These are typically defined in ``rivretrieve.constants``. start_date: Optional start date for the data retrieval in 'YYYY-MM-DD' format. If None, data is fetched from the earliest available date. end_date: Optional end date for the data retrieval in 'YYYY-MM-DD' format. If None, data is fetched up to the latest available date. Returns: pd.DataFrame: A pandas DataFrame indexed by datetime objects (``constants.TIME_INDEX``) with a single column named after the requested ``variable``. The DataFrame will be empty if no data is found for the given parameters. Raises: ValueError: If the requested ``variable`` is not supported by this fetcher. requests.exceptions.RequestException: If a network error occurs during data download. Exception: For other unexpected errors during data fetching or parsing. """ start_date = utils.format_start_date(start_date) end_date = utils.format_end_date(end_date) if variable not in self.get_available_variables(): raise ValueError(f"Unsupported variable: {variable}") var_map = { constants.DISCHARGE_DAILY_MEAN: {"table": "DLY_FLOWS", "prefix": "FLOW"}, constants.STAGE_DAILY_MEAN: {"table": "DLY_LEVELS", "prefix": "LEVEL"}, } table = var_map[variable]["table"] value_prefix = var_map[variable]["prefix"] try: conn = self._get_hydat_connection() start_dt = pd.to_datetime(start_date) end_dt = pd.to_datetime(end_date) query = f""" SELECT * FROM {table} WHERE STATION_NUMBER = ? AND YEAR BETWEEN ? AND ? """ df = pd.read_sql_query(query, conn, params=(gauge_id, start_dt.year, end_dt.year)) conn.close() if df.empty: return pd.DataFrame(columns=[constants.TIME_INDEX, variable]) # Unpivot day columns day_cols = [f"{value_prefix}{i}" for i in range(1, 32)] id_vars = ["STATION_NUMBER", "YEAR", "MONTH"] # Ensure all day columns exist, add if not for col in day_cols: if col not in df.columns: df[col] = None df_long = pd.melt( df, id_vars=id_vars, value_vars=day_cols, var_name="Day_Col", value_name=variable, ) df_long["DAY"] = df_long["Day_Col"].str.replace(value_prefix, "").astype(int) # Create Date column date_cols = ["YEAR", "MONTH", "DAY"] df_long[constants.TIME_INDEX] = pd.to_datetime(df_long[date_cols], errors="coerce") df_long = df_long.dropna(subset=[constants.TIME_INDEX]) # Filter by date range df_long = df_long[(df_long[constants.TIME_INDEX] >= start_dt) & (df_long[constants.TIME_INDEX] <= end_dt)] df_long[variable] = pd.to_numeric(df_long[variable], errors="coerce") return ( df_long[[constants.TIME_INDEX, variable]] .dropna() .sort_values(by=constants.TIME_INDEX) .set_index(constants.TIME_INDEX) ) except Exception as e: logger.error(f"Error querying or processing HYDAT for site {gauge_id}, variable {variable}: {e}") return pd.DataFrame(columns=[constants.TIME_INDEX, variable])
# These are not used for Canada as data is local def _download_data(self, gauge_id: str, variable: str, start_date: str, end_date: str) -> Any: return None def _parse_data(self, gauge_id: str, raw_data: Any, variable: str) -> pd.DataFrame: return pd.DataFrame()