Source code for pysus.api.ducklake.models

"""Application-level models for DuckLake remote resources.

Wraps catalog ORM records into BaseRemoteFile, BaseRemoteDataset,
and BaseRemoteGroup interfaces used by the rest of PySUS.
"""

import hashlib
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any, Union

import anyio
from pydantic import Field
from pysus import CACHEPATH
from pysus.api.models import (
    BaseRemoteClient,
    BaseRemoteDataset,
    BaseRemoteFile,
    BaseRemoteGroup,
)

from .catalog import CatalogDataset, CatalogFile, DatasetGroup


[docs] class File(BaseRemoteFile): """A remote file in the DuckLake catalog with download and verification. Parameters ---------- record : CatalogFile The underlying ORM record. type : str, optional File type identifier (default ``"remote"``). dataset : Any The parent dataset object. group : Any, optional The parent group object, if any. """ record: CatalogFile = Field(exclude=True) type: str = "remote" dataset: Any group: Any = None @property def basename(self) -> str: """Return the file name without directory components. Returns ------- str The base file name. """ return self.path.name @property def extension(self) -> str: """Return the file extension including the leading dot. Returns ------- str File extension (e.g. ``'.csv'``). """ return self.path.suffix @property def size(self) -> int: """Return the file size in bytes. Returns ------- int File size in bytes. """ return self.record.size @property def modify(self) -> datetime: """Return the last-modified timestamp. Returns ------- datetime The last modification timestamp. """ return self.record.modified @property def rows(self) -> int: """Return the number of rows in the file. Returns ------- int Row count. """ return self.record.rows @property def sha256(self) -> str | None: """Return the SHA-256 hash of the file, if available. Returns ------- str or None SHA-256 hex digest, or None if not recorded. """ return self.record.sha256 async def _download( self, output: Path | None = None, callback: Callable[[int, int], None] | None = None, ) -> Path: """Download the file from object storage to the given output path.""" if not output: output = CACHEPATH / self.name return await self.client._download_file( self, output, callback=callback, )
[docs] async def verify(self, path: Path) -> bool: """Verify the file matches the recorded SHA-256 hash. Parameters ---------- path : Path Path to the downloaded file on disk. Returns ------- bool True if the hash matches or no hash is recorded, False otherwise. """ if not self.sha256: return True def _calculate(): sha256_hash = hashlib.sha256() with open(path, "rb") as f: for byte_block in iter(lambda: f.read(8192), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() actual_hash = await anyio.to_thread.run_sync(_calculate) return actual_hash == self.sha256
[docs] class DuckDataset(BaseRemoteDataset): """A dataset from the DuckLake catalog, containing groups and files. Parameters ---------- record : CatalogDataset The underlying ORM record. client : BaseRemoteClient The parent client instance. """ record: CatalogDataset = Field(exclude=True) client: BaseRemoteClient = Field(exclude=True) def __repr__(self) -> str: """Return a string representation of the dataset. Returns ------- str The uppercased dataset name. """ return self.name.upper() @property def name(self) -> str: """Return the short name of the dataset. Returns ------- str The dataset short name. """ return self.record.name @property def long_name(self) -> str: """Return the human-readable name of the dataset. Returns ------- str The dataset display name, falling back to the short name. """ return ( self.record.dataset_metadata.long_name if self.record.dataset_metadata else self.name ) @property def description(self) -> str: """Return the description of the dataset. Returns ------- str The dataset description, or an empty string if unavailable. """ return ( self.record.dataset_metadata.description if self.record.dataset_metadata else "" ) async def _fetch_content(self) -> list[Union["DuckGroup", File]]: """Fetch groups and files belonging to this dataset.""" items: list[Union["DuckGroup", File]] = [] if self.record.groups: items.extend( [DuckGroup(record=g, dataset=self) for g in self.record.groups] ) if self.record.files: items.extend( [ File( path=f.path, record=f, dataset=self, ) for f in self.record.files ] ) return items
[docs] class DuckGroup(BaseRemoteGroup): """A group of related files within a DuckLake dataset. Parameters ---------- record : DatasetGroup The underlying ORM record. dataset : DuckDataset The parent dataset instance. """ record: DatasetGroup = Field(exclude=True) dataset: DuckDataset = Field(exclude=True) @property def name(self) -> str: """Return the short name of the group. Returns ------- str The group short name. """ return self.record.name @property def long_name(self) -> str: """Return the human-readable name of the group. Returns ------- str The group display name, falling back to the short name. """ return ( self.record.group_metadata.long_name if self.record.group_metadata else self.name ) @property def description(self) -> str: """Return the description of the group. Returns ------- str The group description, or an empty string if unavailable. """ if self.record.group_metadata: return self.record.group_metadata.description return "" async def _fetch_files(self) -> list[BaseRemoteFile]: """Fetch the list of files belonging to this group.""" files: list[BaseRemoteFile] = [ File( path=f.path, record=f, group=self, dataset=self.dataset, ) for f in self.record.files ] return files