Source code for pysus.api._impl.databases

"""High-level convenience functions for fetching Brazilian health data.

Each function wraps an asynchronous query/download pipeline and returns a
pandas DataFrame.  The available datasets cover disease notification (SINAN),
vital statistics (SINASC, SIM), hospital admissions (SIH), ambulatory care
(SIA), immunisation (PNI), census data (IBGE), health facilities (CNES),
and hospitalisation records (CIHA).
"""

import asyncio
from typing import Literal, cast

import pandas as pd
from pysus.api import types
from pysus.api.client import PySUS
from tqdm.asyncio import tqdm

__all__ = [
    "sinan",
    "sinasc",
    "sim",
    "sih",
    "sia",
    "pni",
    "ibge",
    "cnes",
    "ciha",
    "list_files",
]


def _fetch_data(
    dataset: str,
    group: str | None = None,
    state: str | None = None,
    year: int | list[int] | None = None,
    month: int | list[int] | None = None,
    show_progress: bool = True,
    as_dataframe: bool = False,
    **kwargs,
) -> list[str] | pd.DataFrame:
    """Query, download, and process Parquet files for a given dataset.

    Internally creates an async event loop, queries the PySUS API for matching
    files, and downloads them. By default, returns a list of local file paths.

    Parameters
    ----------
    dataset : str
        Name of the dataset (e.g. "sinan", "sinasc").
    group : str, optional
        Group or disease code to filter by.
    state : str, optional
        Two-letter state abbreviation (e.g. "RJ").
    year : int | list[int], optional
        Year or list of years to fetch.
    month : int | list[int], optional
        Month or list of months to fetch.
    show_progress : bool, optional
        Whether to display a tqdm progress bar during download.
    as_dataframe : bool, optional
        Whether to concatenate and return the data as a pandas DataFrame.
    **kwargs
        Additional arguments forwarded to :meth:`PySUS.read_parquet`.

    Returns
    -------
    list[str] | pd.DataFrame
        A list of paths to the downloaded Parquet files by default. If
        as_dataframe is True, returns a concatenated DataFrame.
    """

    async def _fetch() -> list[str] | pd.DataFrame:
        async with PySUS() as pysus:
            files = await pysus.query(
                dataset=dataset,
                group=group,
                state=state,
                year=year,
                month=month,
            )

            if not files:
                return pd.DataFrame() if as_dataframe else cast(list[str], [])

            sem = asyncio.Semaphore(3)

            async def _throttled_download(f):
                async with sem:
                    return await pysus.download(f)

            tasks = [_throttled_download(f) for f in files]

            if show_progress:
                downloaded_files = await tqdm.gather(
                    *tasks,
                    desc=f"Downloading {dataset}",
                    unit="file",
                )
            else:
                downloaded_files = await asyncio.gather(*tasks)

            paths: list[str] = [str(f.path) for f in downloaded_files]

            if as_dataframe:
                res = pysus.read_parquet(paths, **kwargs).df()
                return cast(pd.DataFrame, res)

            return paths

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop and loop.is_running():
        try:
            import nest_asyncio

            nest_asyncio.apply()
        except ImportError:
            msg = (
                "nest_asyncio is required when running inside Jupyter. "
                "Install it with: pip install nest_asyncio"
            )
            raise RuntimeError(msg) from None
        result = loop.run_until_complete(_fetch())
        return cast(list[str] | pd.DataFrame, result)

    result = asyncio.run(_fetch())
    return cast(list[str] | pd.DataFrame, result)


[docs] def sinan( disease: Literal[ "ACBI", "ACGR", "ANIM", "ANTR", "BOTU", "CANC", "CHAG", "CHIK", "COLE", "COQU", "DENG", "DERM", "DIFT", "ESQU", "EXAN", "FMAC", "FTIF", "HANS", "HANT", "HEPA", "IEXO", "INFL", "LEIV", "LEPT", "LERD", "LTAN", "MALA", "MENI", "MENT", "NTRA", "PAIR", "PEST", "PFAN", "PNEU", "RAIV", "SDTA", "SIFA", "SIFC", "SIFG", "SRC", "TETA", "TETN", "TOXC", "TOXG", "TRAC", "TUBE", "VARC", "VIOL", "ZIKA", ], year: int | list[int], **kwargs, ) -> list[str] | pd.DataFrame: """Fetch SINAN records for a given disease and year(s). SINAN (Sistema de Informação de Agravos de Notificação) is the Brazilian notifiable-disease information system. Parameters ---------- disease : Literal Disease code (e.g. "DENG" for dengue, "ZIKA" for zika). year : int | list[int] Year or list of years to fetch. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="sinan", group=disease.upper(), year=year, **kwargs, )
[docs] def sinasc( state: types.State, year: int | list[int], group: str | None = None, **kwargs, ) -> list[str] | pd.DataFrame: """Fetch SINASC birth certificates for a given state, year(s), and group. SINASC (Sistema de Informação sobre Nascidos Vivo) is the Brazilian live birth information system. Parameters ---------- state : types.State Two-letter state abbreviation (e.g. "RJ"). year : int | list[int] Year or list of years to fetch. group : str, optional Additional grouping code. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="sinasc", state=state.upper(), group=group, year=year, **kwargs, )
[docs] def sim( state: types.State, year: int | list[int], group: str | None = None, **kwargs, ) -> list[str] | pd.DataFrame: """Fetch SIM mortality records for a given state, year(s), and group. SIM (Sistema de Informação sobre Mortalidade) is the Brazilian mortality information system. Parameters ---------- state : State Two-letter state abbreviation (e.g. "RJ"). year : int | list[int] Year or list of years to fetch. group : str, optional Additional grouping code. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="sim", state=state.upper(), group=group, year=year, **kwargs, )
[docs] def sih( state: types.State, year: int | list[int], month: int | list[int], group: str | None = None, **kwargs, ) -> list[str] | pd.DataFrame: """Fetch SIH hospital admissions for a state, year, month, and group. SIH (Sistema de Informação Hospitalar) is the Brazilian hospital admission information system. Parameters ---------- state : types.State Two-letter state abbreviation (e.g. "RJ"). year : int | list[int] Year or list of years to fetch. month : int | list[int] Month or list of months to fetch. group : str, optional Additional grouping code. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="sih", state=state.upper(), group=group, year=year, month=month, **kwargs, )
[docs] def sia( state: types.State, year: int | list[int], month: int | list[int], group: str | None = None, **kwargs, ) -> list[str] | pd.DataFrame: """Fetch SIA ambulatory care for a state, year, month, and group. SIA (Sistema de Informação Ambulatorial) is the Brazilian ambulatory care information system. Parameters ---------- state : types.State Two-letter state abbreviation (e.g. "RJ"). year : int | list[int] Year or list of years to fetch. month : int | list[int] Month or list of months to fetch. group : str, optional Additional grouping code. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="sia", state=state.upper(), group=group, year=year, month=month, **kwargs, )
[docs] def pni( state: types.State, year: int | list[int], group: str | None = None, **kwargs, ) -> list[str] | pd.DataFrame: """Fetch PNI immunisation records for a given state, year(s), and group. PNI (Programa Nacional de Imunizações) is the Brazilian national immunisation programme. Parameters ---------- state : State Two-letter state abbreviation (e.g. "RJ"). year : int | list[int] Year or list of years to fetch. group : str, optional Additional grouping code. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="pni", state=state.upper(), group=group, year=year, **kwargs, )
[docs] def ibge( year: int | list[int], group: str | None = None, **kwargs, ) -> list[str] | pd.DataFrame: """Fetch IBGE census data for given year(s) and optional group. IBGE (Instituto Brasileiro de Geografia e Estatística) provides census and demographic data. Parameters ---------- year : int | list[int] Year or list of years to fetch. group : str, optional Additional grouping code. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data(dataset="ibge", group=group, year=year, **kwargs)
[docs] def cnes( state: types.State, year: int | list[int], month: int | list[int], group: str | None = None, **kwargs, ) -> list[str] | pd.DataFrame: """Fetch CNES health facilities for a state, year, month, and group. CNES (Cadastro Nacional de Estabelecimentos de Saúde) is the Brazilian registry of health-care facilities. Parameters ---------- state : State Two-letter state abbreviation (e.g. "RJ"). year : int | list[int] Year or list of years to fetch. month : int | list[int] Month or list of months to fetch. group : str, optional Additional grouping code. **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="cnes", state=state.upper(), group=group, year=year, month=month, **kwargs, )
[docs] def ciha( state: types.State, year: int | list[int], month: int | list[int], group: str | None = "CIHA", **kwargs, ) -> list[str] | pd.DataFrame: """Fetch CIHA hospitalisation records for state, year, month, and group. CIHA (Comunicação de Internação Hospitalar) provides hospitalisation records. Parameters ---------- state : State Two-letter state abbreviation (e.g. "RJ"). year : int | list[int] Year or list of years to fetch. month : int | list[int] Month or list of months to fetch. group : str, optional Additional grouping code. Default is "CIHA". **kwargs Additional arguments forwarded to :func:`_fetch_data`. Returns ------- list[str] | pd.DataFrame List of downloaded Parquet paths, or a DataFrame if specified. """ return _fetch_data( dataset="ciha", state=state.upper(), group=group, year=year, month=month, **kwargs, )
[docs] def list_files( dataset: types.DatasetName, client: types.Origin | None = None, group: str | None = None, state: str | None = None, year: int | list[int] | None = None, month: int | list[int] | None = None, **kwargs, ) -> pd.DataFrame: """List catalog files filtered by client, group, state, year, and month. Queries the PySUS API metadata and returns a DataFrame with file data without downloading the actual files. Parameters ---------- dataset : Literal Dataset name (e.g. "SINAN", "SINASC", etc.). client : Origin, optional Data source client to query. group : str, optional Group or disease code to filter by. state : str, optional Two-letter state abbreviation (e.g. "RJ"). year : int | list[int], optional Year or list of years to filter by. month : int | list[int], optional Month or list of months to filter by. **kwargs Additional arguments forwarded to :meth:`PySUS.query`. Returns ------- pd.DataFrame DataFrame with columns name, path, dataset, group, year, month, state, and modify. """ async def _list(): async with PySUS() as pysus: years = [year] if isinstance(year, int) else (year or [None]) months = [month] if isinstance(month, int) else (month or [None]) records = [] for y in years: for m in months: records.extend( await pysus.query( client=client, dataset=dataset, group=group, state=state, year=y, month=m, ) ) return [ { "name": str(r.path).split("/")[-1], "path": str(r.path), "dataset": r.dataset.name if r.dataset else None, "group": r.group.name if r.group else None, "year": r.record.year, "month": r.record.month, "state": r.record.state, "modify": r.record.origin_modified, } for r in records ] return pd.DataFrame(asyncio.run(_list()))