Source code for pysus.api.ducklake.client

"""High-level client for DuckLake S3-based public health dataset catalog.

Provides authentication, dataset discovery, and file download
capabilities backed by per-dataset DuckDB engines.
"""

from collections.abc import Callable
from pathlib import Path
from typing import Any

import boto3
import httpx
from anyio import sleep, to_thread
from botocore.config import Config
from pydantic import BaseModel, PrivateAttr, SecretStr
from pysus import CACHEPATH
from pysus.api.models import BaseRemoteClient, BaseRemoteFile
from pysus.api.types import DUCKLAKE
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool

from .catalog.orm.default import Dataset
from .models import DuckDataset, File


[docs] class DuckLakeCredentials(BaseModel): """Credentials for authenticating with the S3-compatible object storage. Parameters ---------- access_key : SecretStr The S3 access key ID. secret_key : SecretStr The S3 secret access key. """ access_key: SecretStr secret_key: SecretStr
[docs] class DuckLake(BaseRemoteClient): """Client for the DuckLake S3-based public health dataset catalog. Parameters ---------- endpoint : str, optional S3-compatible object storage endpoint. region : str, optional Storage region name. bucket : str, optional Bucket name containing the catalog. credentials : DuckLakeCredentials, optional Credentials for authenticated S3 operations. """ endpoint: str = "nbg1.your-objectstorage.com" region: str = "nbg1" bucket: str = "pysus" credentials: DuckLakeCredentials | None = None _s3_client: Any = PrivateAttr(default=None) _Session: Any = PrivateAttr(default=None) _datasets: list = PrivateAttr(default_factory=list) def __init__(self, engine=None, **data) -> None: """Initialize the DuckLake client. Parameters ---------- engine : object, optional Pre-configured SQLAlchemy engine for the discovery catalog. ``**data`` Fields passed to the Pydantic base model. """ super().__init__(**data) self._engine = engine self._cache_dir: Path = Path(CACHEPATH) / "ducklake" self._cache_dir.mkdir(parents=True, exist_ok=True) self._catalog_local: Path = self._cache_dir / "catalog.duckdb" self._catalog_remote: str = "public/catalog.duckdb" @property def name(self) -> str: """Return the short name of this client. Returns ------- str The client short name. """ return DUCKLAKE @property def long_name(self) -> str: """Return the human-readable name of this client. Returns ------- str The client display name. """ return "PySUS s3 Client" @property def description(self) -> str: """Return a description of this client. Returns ------- str A description string (currently empty). """ return "" # TODO: @property def catalog_path(self) -> Path: """Return the local path to the discovery catalog database. Returns ------- Path Filesystem path to the local discovery catalog file. """ return self._catalog_local @property def _catalog_url(self) -> str: """Return the remote URL of the discovery catalog.""" return f"https://{self.endpoint}/{self.bucket}/{self._catalog_remote}" @property def _is_authenticated(self) -> bool: """Return whether the client has credentials configured.""" return self.credentials is not None
[docs] async def datasets(self, **kwargs) -> list[DuckDataset]: """Return all datasets from the catalog as DuckDataset instances. Parameters ---------- ``**kwargs`` Additional filter arguments (currently unused). Returns ------- list[DuckDataset] List of all datasets in the catalog. """ if not self._Session: await self.connect() def _fetch(): with self._Session() as session: results = session.query(Dataset).all() session.expunge_all() return results records = await to_thread.run_sync(_fetch) return [DuckDataset(record=rec, client=self) for rec in records]
[docs] async def login( self, access_key: str | None = None, secret_key: str | None = None, **kwargs, ) -> None: """Authenticate with S3 credentials and reconnect to the catalog. Parameters ---------- access_key : str, optional S3 access key ID. If omitted, credentials are cleared. secret_key : str, optional S3 secret access key. If omitted, credentials are cleared. ``**kwargs`` Additional arguments (currently unused). """ if access_key and secret_key: self.credentials = DuckLakeCredentials( access_key=SecretStr(access_key), secret_key=SecretStr(secret_key), ) else: self.credentials = None await self.connect(force=True) if self._is_authenticated: self._s3_client = await to_thread.run_sync( self._get_s3_client, )
def _setup_engine(self, local_path: Path | None = None): """Create and configure a DuckDB engine with S3 settings. Parameters ---------- local_path : Path, optional Path to the catalog database file. Defaults to the discovery catalog. """ if local_path is None: local_path = self._catalog_local engine = create_engine( f"duckdb:///{local_path}", poolclass=StaticPool, ) with engine.connect() as conn: conn.exec_driver_sql("INSTALL ducklake; LOAD ducklake;") has_pysus = conn.exec_driver_sql( "SELECT 1 FROM information_schema.schemata" " WHERE schema_name = 'pysus'" ).fetchone() if has_pysus: conn.exec_driver_sql("SET search_path='pysus,main';") else: conn.exec_driver_sql("SET search_path='main';") s3_cfg = { "s3_endpoint": self.endpoint, "s3_region": self.region, "s3_url_style": "path", "s3_use_ssl": "true", } if self.credentials and self._is_authenticated: s3_cfg["s3_access_key_id"] = ( self.credentials.access_key.get_secret_value() ) s3_cfg["s3_secret_access_key"] = ( self.credentials.secret_key.get_secret_value() ) for key, value in s3_cfg.items(): conn.exec_driver_sql(f"SET {key}='{value}'") conn.commit() return engine
[docs] async def connect(self, force: bool = False) -> None: """Connect to the discovery catalog, downloading first if needed. Parameters ---------- force : bool, optional Whether to re-download and re-connect even if already connected. """ if self._engine and not force: if not self._Session: self._Session = sessionmaker(bind=self._engine) return await self._download_catalog( self._catalog_local, self._catalog_remote, ) self._engine = await to_thread.run_sync(self._setup_engine) self._Session = sessionmaker(bind=self._engine)
[docs] async def close(self, update_catalog: bool = False) -> None: """Close all datasets and dispose the discovery engine. Parameters ---------- update_catalog : bool, optional Whether to upload all per-dataset catalogs before closing. Requires authenticated credentials. """ if update_catalog: await self._upload_catalog() datasets: list["DuckDataset"] = list(self._datasets) for ds in datasets: await ds.close(update_catalog=update_catalog) self._datasets.clear() if self._engine: await to_thread.run_sync(self._engine.dispose) self._engine = None self._Session = None self._s3_client = None
async def _download( self, remote_path: str, local_path: Path, *, callback: Callable[[int, int], None] | None = None, ) -> None: """Download *remote_path* to *local_path* with streaming and retries. Parameters ---------- remote_path : str Object key within the bucket. local_path : Path Local destination path. callback : Callable[[int, int], None], optional Progress callback receiving ``(downloaded, total)`` bytes. """ url = f"https://{self.endpoint}/{self.bucket}/{remote_path}" max_retries = 5 for attempt in range(max_retries): try: async with httpx.AsyncClient(follow_redirects=True) as client: async with client.stream("GET", url) as r: r.raise_for_status() total = int(r.headers.get("Content-Length", 0)) downloaded = 0 with open(local_path, "wb") as f: async for chunk in r.aiter_bytes( chunk_size=1024 * 1024, ): await to_thread.run_sync(f.write, chunk) downloaded += len(chunk) if callback: callback(downloaded, total) return except OSError as e: if attempt < max_retries - 1: await sleep(1) else: raise e async def _download_catalog( self, local_path: Path, remote_path: str ) -> None: """Download a catalog database from remote storage with retries. Parameters ---------- local_path : Path Local destination path for the catalog file. remote_path : str Remote object key within the bucket. """ url = f"https://{self.endpoint}/{self.bucket}/{remote_path}" if local_path.exists(): try: local_size = local_path.stat().st_size except OSError: local_size = -1 else: local_size = -1 async with httpx.AsyncClient(follow_redirects=True) as client: try: head = await client.head(url) head.raise_for_status() remote_size = int(head.headers.get("content-length", 0)) except Exception: # noqa: B902 remote_size = 0 if remote_size == local_size: return await self._download(remote_path, local_path) async def _download_file( self, file: BaseRemoteFile, output: Path, callback: Callable[[int, int], None] | None = None, ) -> Path: """Download a single file from object storage to the local path.""" if not isinstance(file, File): raise ValueError("FTP File was not properly instantiated") await self._download(file.record.path, output, callback=callback) return output def _get_s3_client(self): """Create and return a boto3 S3 client for the configured endpoint.""" if not self.credentials: raise ConnectionError("S3 Credentials not found") return boto3.client( "s3", endpoint_url=f"https://{self.endpoint}", aws_access_key_id=self.credentials.access_key.get_secret_value(), aws_secret_access_key=( self.credentials.secret_key.get_secret_value() ), region_name=self.region, config=Config(signature_version="s3v4"), ) async def _upload_catalog(self) -> None: """Upload all per-dataset catalogs to remote storage. Requires authenticated credentials. """ if not self.credentials: raise PermissionError( "Admin credentials required to upload catalog.", ) datasets = await self.datasets() for ds in datasets: if not ds._catalog_local.exists(): continue _local = str(ds._catalog_local) _name = ds._catalog_name def _upload(local=_local, name=_name): self._s3_client.upload_file( local, self.bucket, name, ) await to_thread.run_sync(_upload)
DuckDataset.model_rebuild(_types_namespace={"DuckLake": DuckLake})