"Fetcher for Japanese river gauge data."
import calendar
import io
import logging
import re
from datetime import datetime, timedelta
from typing import List, Optional
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
from dateutil.relativedelta import relativedelta
from . import base, constants, utils
logger = logging.getLogger(__name__)
# Maps RivRetrieve variable to the single confirmed KIND value.
VARIABLE_KIND_MAP = {
constants.STAGE_HOURLY_MEAN: 2,
constants.STAGE_DAILY_MEAN: 3,
constants.DISCHARGE_HOURLY_MEAN: 6,
constants.DISCHARGE_DAILY_MEAN: 7,
}
[docs]
class JapanFetcher(base.RiverDataFetcher):
"""Fetches river gauge data from Japan's Ministry of Land, Infrastructure, Transport and Tourism (MLIT).
Data Source: Water Information System (http://www1.river.go.jp/)
Note: KINDs 2 and 6, described as "Daily" on the website, actually provide HOURLY data.
KINDs 3 and 7 are expected to provide true DAILY data.
This fetcher returns data at the resolution provided in the source .dat files.
Supported Variables:
- ``constants.DISCHARGE_HOURLY_MEAN`` (m³/s)
- ``constants.STAGE_HOURLY_MEAN`` (m)
- ``constants.DISCHARGE_DAILY_MEAN`` (m³/s)
- ``constants.STAGE_DAILY_MEAN`` (m)
"""
BASE_URL = "http://www1.river.go.jp"
DSP_URL = f"{BASE_URL}/cgi-bin/DspWaterData.exe"
SITE_INFO_URL = f"{BASE_URL}/cgi-bin/SiteInfo.exe"
[docs]
@staticmethod
def get_available_variables() -> tuple[str, ...]:
return (
constants.STAGE_HOURLY_MEAN,
constants.STAGE_DAILY_MEAN,
constants.DISCHARGE_HOURLY_MEAN,
constants.DISCHARGE_DAILY_MEAN,
)
def _get_kind(self, variable: str) -> int:
if variable in VARIABLE_KIND_MAP:
return VARIABLE_KIND_MAP[variable]
else:
raise ValueError(f"Unsupported variable: {variable}")
def _download_data(
self,
gauge_id: str,
variable: str,
start_date: str,
end_date: str,
) -> List[str]:
"""Downloads raw .dat file contents."""
s = utils.requests_retry_session()
kind_to_try = self._get_kind(variable)
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
dat_contents = []
headers = {"User-Agent": "Mozilla/5.0", "Referer": self.BASE_URL}
if kind_to_try in [2, 6]: # Monthly requests for hourly data
current_dt = start_dt.replace(day=1)
while current_dt <= end_dt:
year = current_dt.year
month = current_dt.month
month_str = f"{month:02d}"
last_day = calendar.monthrange(year, month)[1]
month_start_str = f"{year}{month_str}01"
month_end_str = f"{year}{month_str}{last_day}"
params = {
"KIND": kind_to_try,
"ID": gauge_id,
"BGNDATE": month_start_str,
"ENDDATE": month_end_str,
"KAWABOU": "NO",
}
try:
logger.debug(f"Fetching DspWaterData page for {gauge_id} {year}-{month_str} KIND {kind_to_try}")
response = s.get(self.DSP_URL, params=params, headers=headers)
response.raise_for_status()
response.encoding = "EUC-JP"
soup = BeautifulSoup(response.text, "html.parser")
link_tag = soup.find(re.compile("a", re.IGNORECASE), href=re.compile(r"/dat/dload/download/"))
if link_tag:
dat_url = f"{self.BASE_URL}{link_tag['href']}"
dat_response = s.get(dat_url, headers=headers)
dat_response.raise_for_status()
dat_contents.append(dat_response.content.decode("shift_jis", errors="replace"))
logger.info(f"Successfully downloaded {link_tag['href'].split('/')[-1]}")
else:
logger.warning(f"No .dat link found for {gauge_id} {year}-{month_str} KIND {kind_to_try}")
except Exception as e:
logger.error(f"Error fetching for {gauge_id} {year}-{month_str} KIND {kind_to_try}: {e}")
current_dt += relativedelta(months=1)
elif kind_to_try in [3, 7]: # Yearly requests for daily data
for year in range(start_dt.year, end_dt.year + 1):
year_start_str = f"{year}0131"
year_end_str = f"{year}1231"
params = {
"KIND": kind_to_try,
"ID": gauge_id,
"BGNDATE": year_start_str,
"ENDDATE": year_end_str,
"KAWABOU": "NO",
}
try:
logger.debug(f"Fetching DspWaterData page for {gauge_id} {year} KIND {kind_to_try}")
response = s.get(self.DSP_URL, params=params, headers=headers)
response.raise_for_status()
response.encoding = "EUC-JP"
soup = BeautifulSoup(response.text, "html.parser")
link_tag = soup.find(re.compile("a", re.IGNORECASE), href=re.compile(r"/dat/dload/download/"))
if link_tag:
dat_url = f"{self.BASE_URL}{link_tag['href']}"
dat_response = s.get(dat_url, headers=headers)
dat_response.raise_for_status()
dat_contents.append(dat_response.content.decode("shift_jis", errors="replace"))
logger.info(f"Successfully downloaded {link_tag['href'].split('/')[-1]}")
else:
logger.warning(f"No .dat link found for {gauge_id} {year} KIND {kind_to_try}")
except Exception as e:
logger.error(f"Error fetching for {gauge_id} {year} KIND {kind_to_try}: {e}")
return dat_contents
def _parse_data(
self,
gauge_id: str,
raw_data_list: List[str],
variable: str,
) -> pd.DataFrame:
"""Parses the list of monthly .dat file contents."""
if not raw_data_list:
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])
kind = self._get_kind(variable)
all_dfs = []
for dat_content in raw_data_list:
try:
lines = dat_content.strip().splitlines()
data_lines = [line for line in lines if not line.startswith("#") and line.strip()]
if not data_lines:
continue
header_line = next((line for line in lines if line.startswith(",")), None)
if header_line:
try:
header_index = lines.index(header_line)
data_lines = lines[header_index + 1 :]
data_lines = [line for line in data_lines if not line.startswith("#") and line.strip()]
except ValueError:
pass
if not data_lines:
continue
csv_io = io.StringIO("\n".join(data_lines))
if kind in [2, 6]: # Hourly data format
col_names = [constants.TIME_INDEX]
for i in range(1, 25):
col_names.append(f"{i}時")
col_names.append(f"{i}時フラグ")
df = pd.read_csv(
csv_io, header=None, names=col_names, na_values=["-9999.99"], dtype={constants.TIME_INDEX: str}
)
df[constants.TIME_INDEX] = pd.to_datetime(
df[constants.TIME_INDEX], format="%Y/%m/%d", errors="coerce"
)
df = df.dropna(subset=[constants.TIME_INDEX])
value_cols = [f"{i}時" for i in range(1, 25)]
df_long = df.melt(
id_vars=[constants.TIME_INDEX], value_vars=value_cols, var_name="Hour", value_name="Value"
)
df_long["Hour"] = df_long["Hour"].str.replace("時", "").astype(int)
df_long["Value"] = pd.to_numeric(df_long["Value"], errors="coerce")
# Different values are used to encode NaN. All seem to be -9999.XX
df_long.loc[df_long["Value"] <= -9999, "Value"] = np.nan
df_long = df_long.dropna(subset=["Value"])
df_long[constants.TIME_INDEX] = df_long.apply(
lambda row: row[constants.TIME_INDEX] + timedelta(hours=row["Hour"] - 1), axis=1
)
parsed_df = df_long[[constants.TIME_INDEX, "Value"]].rename(columns={"Value": variable})
all_dfs.append(parsed_df)
elif kind in [3, 7]: # Daily data format
year = None
for line in lines:
if "年" in line:
year_match = re.search(r"(\d{4})年", line)
if year_match:
year = int(year_match.group(1))
break
if year is None:
logger.warning(f"Could not extract year from .dat file for {gauge_id} KIND {kind}")
continue
col_names = ["月"]
for i in range(1, 32):
col_names.append(f"{i}日")
col_names.append(f"{i}日フラグ")
df = pd.read_csv(
csv_io, header=None, names=col_names, na_values=[" ", "-9999.99"], encoding="utf-8"
)
month_map = {f"{i}月": i for i in range(1, 13)}
df["Month"] = df["月"].map(month_map)
df = df.dropna(subset=["Month"])
df["Year"] = year
value_cols = [f"{i}日" for i in range(1, 32)]
df_long = df.melt(
id_vars=["Year", "Month"], value_vars=value_cols, var_name="Day", value_name="Value"
)
df_long["Day"] = df_long["Day"].str.replace("日", "").astype(int)
df_long["Value"] = pd.to_numeric(df_long["Value"], errors="coerce")
# Different values are used to encode NaN. All seem to be -9999.XX
df_long.loc[df_long["Value"] <= -9999, "Value"] = np.nan
df_long = df_long.dropna(subset=["Value"])
df_long[constants.TIME_INDEX] = pd.to_datetime(df_long[["Year", "Month", "Day"]], errors="coerce")
parsed_df = df_long[[constants.TIME_INDEX, "Value"]].rename(columns={"Value": variable})
parsed_df = parsed_df.dropna(subset=[constants.TIME_INDEX])
all_dfs.append(parsed_df)
else:
logger.warning(f"Unsupported KIND {kind} for parsing in _parse_data")
continue
except Exception as e:
logger.error(f"Error parsing .dat content for {gauge_id} KIND {kind}: {e}", exc_info=True)
continue
if not all_dfs:
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])
final_df = pd.concat(all_dfs, ignore_index=True)
final_df = final_df.sort_values(by=constants.TIME_INDEX)
return final_df.set_index(constants.TIME_INDEX)
[docs]
def get_data(
self,
gauge_id: str,
variable: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> pd.DataFrame:
"""Fetches and parses time series data for a specific gauge and variable."""
start_date = utils.format_start_date(start_date)
end_date = utils.format_end_date(end_date)
if variable not in self.get_available_variables():
raise ValueError(f"Unsupported variable: {variable}")
try:
raw_data_list = self._download_data(gauge_id, variable, start_date, end_date)
if not raw_data_list:
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])
df = self._parse_data(gauge_id, raw_data_list, variable)
if not df.empty:
start_date_dt = pd.to_datetime(start_date)
end_date_dt = pd.to_datetime(end_date) + timedelta(days=1) # Include end date
df = df[(df.index >= start_date_dt) & (df.index < end_date_dt)]
return df
except Exception as e:
logger.error(f"Failed to get data for site {gauge_id}, variable {variable}: {e}", exc_info=True)
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])