"""Internal domain models for datasets, groups, and files from dados.gov.br."""
import asyncio
import pathlib
import re
from abc import abstractmethod
from collections.abc import Callable
from datetime import datetime
from typing import Any
import httpx
from dateparser import parse # type: ignore[import-untyped]
from pydantic import PrivateAttr
from pysus import CACHEPATH
from pysus.api.models import BaseRemoteDataset, BaseRemoteFile, BaseRemoteGroup
from pysus.api.types import State
from .client import ConjuntoDados, DadosGov, Recurso
_FORMAT_RE = re.compile(r"[._](csv|json|xml)(\.zip)?$", re.IGNORECASE)
def _dedup_entries(
entries: list[tuple[str, Any, dict]],
) -> list[tuple[str, Any, dict]]:
"""If the same file exists in CSV, JSON and XML, keep only CSV."""
grouped: dict[str, list[tuple[str, str, Any, dict]]] = {}
for filename, recurso, metadata in entries:
m = _FORMAT_RE.search(filename)
if m:
stem = filename[: m.start()]
fmt = m.group(1).lower()
grouped.setdefault(stem, []).append(
(fmt, filename, recurso, metadata)
)
else:
grouped.setdefault(filename, []).append(
("", filename, recurso, metadata)
)
result: list[tuple[str, Any, dict]] = []
for _, items in grouped.items():
formats = {fmt for fmt, _, _, _ in items}
if "csv" in formats:
for fmt, filename, recurso, metadata in items:
if fmt == "csv":
result.append((filename, recurso, metadata))
else:
for _, filename, recurso, metadata in items:
result.append((filename, recurso, metadata))
return result
[docs]
class File(BaseRemoteFile):
"""A downloadable file from a dados.gov.br dataset."""
record: Recurso
type: str = "File"
_metadata: dict[str, Any] = PrivateAttr(default_factory=dict)
def __init__(self, **data):
"""Initialize the File with optional metadata.
Parameters
----------
**data
Keyword arguments including an optional ``_metadata`` dict
that is stored on the private attribute ``_metadata``.
"""
metadata = data.pop("_metadata", {})
super().__init__(**data)
self._metadata = metadata
def __repr__(self):
"""Return the file basename as its string representation."""
return self.basename
[docs]
def model_post_init(self, __context: Any) -> None:
"""Fetch remote metadata if size or modify date is missing.
If both ``api_size`` and ``last_modified`` are falsy, schedules a
background task to fetch metadata from the remote server.
Parameters
----------
__context : Any
Pydantic validation context (unused).
"""
if not self.record.api_size or not self.record.last_modified:
try:
loop = asyncio.get_running_loop()
loop.create_task(self.fetch_metadata())
except RuntimeError:
pass
return
@property
def extension(self) -> str:
"""Return the file extension.
Returns
-------
str
The file extension (e.g., ``".csv"``, ``".zip"``).
"""
if self.record.file_name:
return pathlib.Path(self.record.file_name).suffix
return pathlib.Path(self.record.url.split("/")[-1].split("?")[0]).suffix
@property
def size(self) -> int:
"""Return the file size in bytes.
Returns
-------
int
The file size, or 0 if unknown.
"""
return self.record.api_size or 0
@property
def modify(self) -> datetime:
"""Return the last modification date.
Returns
-------
datetime
The last modification datetime.
Raises
------
ValueError
If the modification date has not been set.
"""
m = self.record.last_modified
if not m:
raise ValueError("File requires a modify date")
return m
@property
def year(self) -> int | None:
"""Return the inferred year from metadata.
Returns
-------
int or None
The year if present in metadata, otherwise None.
"""
return self._metadata.get("year")
@property
def month(self) -> int | None:
"""Return the inferred month from metadata.
Returns
-------
int or None
The month if present in metadata, otherwise None.
"""
return self._metadata.get("month")
@property
def state(self) -> State | None:
"""Return the inferred state from metadata.
Returns
-------
State or None
The state abbreviation if present in metadata, otherwise None.
"""
return self._metadata.get("state")
async def _download(
self,
output: pathlib.Path | None = None,
callback: Callable[[int, int], None] | None = None,
) -> pathlib.Path:
"""Download the file to a local path."""
if not output:
output = CACHEPATH / self.name
return await self.client._download_file(self, output, callback=callback)
[docs]
async def fetch_size(self) -> int:
"""Fetch the remote file size and update the local record.
Makes a HEAD request (falling back to GET with a Range header)
to determine the Content-Length.
Returns
-------
int
The file size in bytes, or 0 if the size could not be determined.
"""
try:
async with httpx.AsyncClient(
follow_redirects=True,
timeout=3,
) as client:
response = await client.head(str(self.path))
if response.status_code == 405:
response = await client.get(
str(self.path), headers={"Range": "bytes=0-0"}
)
remote_size = int(response.headers.get("Content-Length", 0))
if remote_size > 0:
self.record.api_size = remote_size
return remote_size
except Exception: # noqa: B902
return 0
[docs]
class Group(BaseRemoteGroup):
"""A group of files within a dataset."""
record: ConjuntoDados
_formatter: Callable[[str], dict[str, Any]] | None = PrivateAttr(
default=None
)
def __init__(
self,
record: ConjuntoDados,
dataset: BaseRemoteDataset,
formatter: Callable | None = None,
):
"""Initialize the Group with a dataset record and optional formatter.
Parameters
----------
record : ConjuntoDados
The API response record for this group.
dataset : BaseRemoteDataset
The parent dataset this group belongs to.
formatter : Callable, optional
A callable that extracts metadata from filenames.
"""
super().__init__(
record=record, dataset=dataset # type: ignore[call-arg]
)
self._formatter = formatter
def __repr__(self):
"""Return the group name as its string representation."""
return self.name
@property
def name(self) -> str:
"""Return the group name, resolved through dataset aliases.
Returns
-------
str
The alias for the group slug if defined, otherwise the raw slug.
"""
slug = self.record.slug
aliases = getattr(self.dataset, "group_aliases", {})
return aliases.get(slug, slug)
@property
def long_name(self) -> str:
"""Return the group title.
Returns
-------
str
The title of the underlying API record.
"""
return self.record.title
@property
def description(self) -> str:
"""Return an empty description for the group.
Returns
-------
str
An empty string.
"""
return ""
async def _fetch_files(self) -> list[BaseRemoteFile]:
"""Build File objects from the underlying resources."""
entries: list[tuple[str, Any, dict]] = []
for recurso in self.record.resources:
filename = (
recurso.file_name or recurso.url.split("/")[-1].split("?")[0]
)
if filename.lower().endswith(".pdf") or filename.startswith("get_"):
continue
metadata = {}
if self._formatter:
try:
metadata = self._formatter(filename)
except NotImplementedError:
pass
entries.append((filename, recurso, metadata))
entries = _dedup_entries(entries)
files: list[BaseRemoteFile] = []
for _, recurso, metadata in entries:
file = File(
record=recurso,
dataset=self.dataset,
group=self,
path=recurso.url,
_metadata=metadata,
)
files.append(file)
return files
[docs]
class Dataset(BaseRemoteDataset):
"""A health dataset available through dados.gov.br.
Subclasses define a list of API dataset IDs and an optional
:meth:`formatter` that extracts metadata from file names.
"""
ids: list[str] = []
client: "DadosGov"
group_aliases: dict[str, str] = {}
def __repr__(self):
"""Return the dataset name as its string representation."""
return self.name
async def _fetch_content(self) -> list[Group]:
"""Fetch all groups belonging to this dataset."""
items: list[Group] = []
client: "DadosGov" = self.client
if self.ids:
for group_id in self.ids:
record = await client.get_dataset(group_id)
items.append(
Group(record=record, dataset=self, formatter=self.formatter)
)
return items