Source code for rivretrieve.australia

"""Fetcher for Australian river gauge data from BoM."""

import json
import logging
from io import StringIO
from typing import Any, Dict, Optional

import pandas as pd
import requests

from . import base, constants, utils

logger = logging.getLogger(__name__)


[docs] class AustraliaFetcher(base.RiverDataFetcher): """Fetches river gauge data from Australia's Bureau of Meteorology (BoM). Data Source: Bureau of Meteorology Water Data Online (http://www.bom.gov.au/waterdata/) Supported Variables: - ``constants.DISCHARGE_DAILY_MEAN`` (m³/s) - ``constants.STAGE_DAILY_MEAN`` (m) """ BOM_URL = "http://www.bom.gov.au/waterdata/services"
[docs] @staticmethod def get_cached_metadata() -> pd.DataFrame: """Retrieves a DataFrame of available Australian gauge IDs and metadata. This method loads the metadata from a cached CSV file located in the ``rivretrieve/cached_site_data/`` directory. Returns: pd.DataFrame: A DataFrame indexed by gauge_id, containing site metadata. """ return utils.load_cached_metadata_csv("australia")
[docs] @staticmethod def get_available_variables() -> tuple[str, ...]: return (constants.DISCHARGE_DAILY_MEAN, constants.STAGE_DAILY_MEAN)
def _make_bom_request(self, params: Dict[str, Any]) -> Any: """Helper function to make requests to the BoM API.""" base_params = { "service": "kisters", "type": "QueryServices", } all_params = {**base_params, **params} s = utils.requests_retry_session() try: response = s.get(self.BOM_URL, params=all_params) response.raise_for_status() if params.get("format") == "csv": return response.text # Default format is json return response.json() except requests.exceptions.RequestException as e: logger.error(f"BoM API request failed for params {params}: {e}") raise except json.JSONDecodeError as e: logger.error(f"BoM API JSON decode failed for params {params}: {e}\nResponse: {response.text}") raise def _get_timeseries_id(self, gauge_id: str, variable: str) -> Optional[str]: """Retrieves the timeseries ID for the given site and variable.""" if variable == constants.STAGE_DAILY_MEAN: bom_variable = "Water Course Level" # ts_name = "H.Merged.DailyMean" elif variable == constants.DISCHARGE_DAILY_MEAN: bom_variable = "Water Course Discharge" # ts_name = "Q.Merged.DailyMean" else: raise ValueError(f"Unsupported variable: {variable}") ts_name = "DMQaQc.Merged.DailyMean.24HR" # Daily Mean Quality Controlled params = { "request": "getTimeseriesList", "parametertype_name": bom_variable, "ts_name": ts_name, "station_no": gauge_id, "format": "json", } try: json_data = self._make_bom_request(params) if isinstance(json_data, list) and len(json_data) > 1: # Response is a list of lists, header is index 0 header = json_data[0] data = json_data[1:] df = pd.DataFrame(data, columns=header) if not df.empty and "ts_id" in df.columns: return df["ts_id"].iloc[0] else: logger.warning(f"No ts_id found for site {gauge_id}, variable {variable}") return None elif isinstance(json_data, list) and len(json_data) == 1 and json_data[0] == "No matches.": logger.warning(f"No matches for site {gauge_id}, variable {variable} in getTimeseriesList") return None else: logger.warning(f"Unexpected response from getTimeseriesList for site {gauge_id}: {json_data}") return None except Exception as e: logger.error(f"Error getting timeseries ID for site {gauge_id}: {e}") return None def _download_data(self, gauge_id: str, variable: str, start_date: str, end_date: str) -> Optional[str]: """Downloads the raw CSV data.""" ts_id = self._get_timeseries_id(gauge_id, variable) if not ts_id: return None params = { "request": "getTimeseriesValues", "datasource": "0", "format": "csv", "ts_id": ts_id, "from": f"{start_date}T00:00:00.000", "to": f"{end_date}T00:00:00.000", "returnfields": "Timestamp,Value,Quality Code,Interpolation Type", "metadata": "true", } try: csv_data = self._make_bom_request(params) return csv_data except Exception as e: logger.error(f"Error downloading data for ts_id {ts_id}: {e}") return None def _parse_data(self, gauge_id: str, raw_data: Optional[str], variable: str) -> pd.DataFrame: """Parses the raw CSV data.""" if not raw_data: return pd.DataFrame(columns=[constants.TIME_INDEX, variable]) try: lines = raw_data.strip().split("\n") header_line_index = -1 header_line = "" for i, line in enumerate(lines): if line.startswith("#Timestamp;Value;Quality Code"): header_line_index = i header_line = line.lstrip("#") break if header_line_index == -1: logger.warning(f"Could not find data header in CSV for site {gauge_id}") return pd.DataFrame(columns=[constants.TIME_INDEX, variable]) # Join lines from the header row onwards csv_content = "\n".join(lines[header_line_index:]) # Replace the commented header with the uncommented version csv_content = csv_content.replace(lines[header_line_index], header_line) csv_io = StringIO(csv_content) df = pd.read_csv(csv_io, sep=";") if df.empty: return pd.DataFrame(columns=[constants.TIME_INDEX, variable]) df[constants.TIME_INDEX] = pd.to_datetime(df["Timestamp"]).dt.date df["Value"] = pd.to_numeric(df["Value"], errors="coerce") df = df.rename(columns={"Value": variable}) df[constants.TIME_INDEX] = pd.to_datetime(df[constants.TIME_INDEX]) return df[[constants.TIME_INDEX, variable]].dropna().set_index(constants.TIME_INDEX) except Exception as e: logger.error(f"Error parsing CSV data for site {gauge_id}: {e}") return pd.DataFrame(columns=[constants.TIME_INDEX, variable])
[docs] def get_data( self, gauge_id: str, variable: str, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> pd.DataFrame: """Fetches and parses time series data for a specific gauge and variable. This method retrieves the requested data from the provider's API or data source, parses it, and returns it in a standardized pandas DataFrame format. Args: gauge_id: The site-specific identifier for the gauge. variable: The variable to fetch. Must be one of the strings listed in the fetcher's ``get_available_variables()`` output. These are typically defined in ``rivretrieve.constants``. start_date: Optional start date for the data retrieval in 'YYYY-MM-DD' format. If None, data is fetched from the earliest available date. end_date: Optional end date for the data retrieval in 'YYYY-MM-DD' format. If None, data is fetched up to the latest available date. Returns: pd.DataFrame: A pandas DataFrame indexed by datetime objects (``constants.TIME_INDEX``) with a single column named after the requested ``variable``. The DataFrame will be empty if no data is found for the given parameters. Raises: ValueError: If the requested ``variable`` is not supported by this fetcher. requests.exceptions.RequestException: If a network error occurs during data download. Exception: For other unexpected errors during data fetching or parsing. """ start_date = utils.format_start_date(start_date) end_date = utils.format_end_date(end_date) if variable not in self.get_available_variables(): raise ValueError(f"Unsupported variable: {variable}") try: raw_data = self._download_data(gauge_id, variable, start_date, end_date) df = self._parse_data(gauge_id, raw_data, variable) return df except Exception as e: logger.error(f"Failed to get data for site {gauge_id}, variable {variable}: {e}") return pd.DataFrame(columns=[constants.TIME_INDEX, variable])