Source code for pysus.api.ducklake.catalog

"""SQLAlchemy ORM models for the DuckLake catalog schema.

Defines tables for datasets, groups, files, and columns stored
in the pysus schema of the local DuckDB catalog.
"""

import enum
from datetime import datetime
from typing import Optional

from sqlalchemy import (
    Boolean,
    Column,
    DateTime,
    Enum,
    ForeignKey,
    Index,
    Integer,
    Sequence,
    String,
    Table,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


[docs] class Base(DeclarativeBase): """Base class for all DuckLake catalog ORM models.""" pass
file_columns = Table( "file_columns", Base.metadata, Column( "file_id", Integer, ForeignKey("pysus.files.id"), primary_key=True, ), Column( "column_id", Integer, ForeignKey("pysus.dataset_columns.id"), primary_key=True, ), schema="pysus", )
[docs] class CatalogTable(Base): """Abstract base for catalog tables sharing the pysus schema.""" __abstract__ = True __table_args__: tuple = ({"schema": "pysus"},)
[docs] class Origin(enum.Enum): """Origin type for a dataset. Attributes ---------- FTP : str Dataset sourced from the FTP server. API : str Dataset sourced from an API. """ FTP = "ftp" API = "api"
[docs] class CatalogDataset(CatalogTable): """ORM model for the datasets table, representing a dataset collection. Parameters ---------- id : int, optional Primary key (auto-generated by sequence). name : str Unique short name for the dataset. long_name : str Human-readable full name. description : str, optional Optional description of the dataset contents. origin : Origin Whether the dataset originates from FTP or an API. """ __tablename__ = "datasets" id = Column( Integer, Sequence("datasets_id_seq", schema="pysus"), primary_key=True, ) name = Column(String, nullable=False, unique=True, index=True) long_name = Column(String, nullable=False) description = Column(String, nullable=True) origin = Column(Enum(Origin), nullable=False) groups = relationship( "DatasetGroup", back_populates="dataset", cascade="all, delete-orphan", ) files = relationship( "CatalogFile", back_populates="dataset", cascade="all, delete-orphan", ) columns = relationship( "ColumnDefinition", back_populates="dataset", cascade="all, delete-orphan", )
[docs] class ColumnDefinition(CatalogTable): """ORM model for dataset column metadata. Parameters ---------- id : int, optional Primary key (auto-generated by sequence). dataset_id : int Foreign key referencing the parent dataset. name : str Column name. type : str Column data type string. description : str, optional Optional description of the column. nullable : bool, optional Whether the column allows null values. """ __tablename__ = "dataset_columns" id = Column( Integer, Sequence("columns_id_seq", schema="pysus"), primary_key=True, ) dataset_id = Column( Integer, ForeignKey("pysus.datasets.id"), nullable=False, index=True, ) name = Column(String, nullable=False) type = Column(String, nullable=False) description = Column(String, nullable=True) nullable = Column(Boolean, nullable=False, default=True) dataset = relationship("CatalogDataset", back_populates="columns") files = relationship( "CatalogFile", secondary=file_columns, back_populates="columns", ) __table_args__ = ( Index("ix_columns_dataset_name", "dataset_id", "name"), {"schema": "pysus"}, )
[docs] class DatasetGroup(CatalogTable): """ORM model for dataset groups, grouping related files within a dataset. Parameters ---------- id : int, optional Primary key (auto-generated by sequence). name : str Short name for the group. dataset_id : int Foreign key referencing the parent dataset. long_name : str Human-readable full name. description : str, optional Optional description of the group contents. """ __tablename__ = "dataset_groups" id = Column( Integer, Sequence("groups_id_seq", schema="pysus"), primary_key=True, ) name = Column(String, nullable=False) dataset_id = Column( Integer, ForeignKey("pysus.datasets.id"), nullable=False, index=True, ) long_name = Column(String, nullable=False) description = Column(String, nullable=True) dataset = relationship("CatalogDataset", back_populates="groups") files = relationship( "CatalogFile", back_populates="group", cascade="all, delete-orphan", ) __table_args__ = ( Index("ix_groups_dataset_name", "dataset_id", "name"), {"schema": "pysus"}, )
[docs] class CatalogFile(CatalogTable): """ORM model for the files table, representing individual data files. Parameters ---------- id : int, optional Primary key (auto-generated by sequence). dataset_id : int Foreign key referencing the parent dataset. group_id : int, optional Foreign key referencing the parent group. path : str Object storage path to the file. size : int File size in bytes. rows : int Number of rows in the file. modified : datetime Timestamp of the last known modification. origin_modified : datetime, optional Original modification timestamp from the source. origin_path : str Original source path of the file. sha256 : str, optional SHA-256 hex digest for integrity verification. year : int, optional Data year associated with the file. month : int, optional Data month associated with the file. state : str, optional Two-letter state code associated with the file. """ __tablename__ = "files" id: Mapped[int] = mapped_column( Integer, Sequence("files_id_seq", schema="pysus"), primary_key=True, ) dataset_id: Mapped[int] = mapped_column( Integer, ForeignKey("pysus.datasets.id"), nullable=False, index=True ) group_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("pysus.dataset_groups.id"), nullable=True, index=True, ) path: Mapped[str] = mapped_column(String, nullable=False, unique=True) size: Mapped[int] = mapped_column(Integer, nullable=False) rows: Mapped[int] = mapped_column(Integer, nullable=False) modified: Mapped[datetime] = mapped_column(DateTime, nullable=False) origin_modified: Mapped[datetime | None] = mapped_column( DateTime, nullable=True, ) origin_path: Mapped[str] = mapped_column(String, nullable=False) sha256: Mapped[str | None] = mapped_column( String(64), nullable=True, index=True, ) year: Mapped[int | None] = mapped_column( Integer, nullable=True, index=True, ) month: Mapped[int | None] = mapped_column( Integer, nullable=True, index=True, ) state: Mapped[str | None] = mapped_column( String(2), nullable=True, index=True, ) dataset: Mapped["CatalogDataset"] = relationship( "CatalogDataset", back_populates="files", ) group: Mapped[Optional["DatasetGroup"]] = relationship( "DatasetGroup", back_populates="files", ) columns: Mapped[list["ColumnDefinition"]] = relationship( "ColumnDefinition", secondary=file_columns, back_populates="files", cascade="all, delete", ) __table_args__ = ( Index("ix_files_dataset_group", "dataset_id", "group_id"), Index("ix_files_temporal", "year", "month"), Index( "ix_files_lookup", "dataset_id", "group_id", "year", "month", "state", ), {"schema": "pysus"}, )