pyducklake

pyducklake

A Python SDK for Ducklake, providing a pyiceberg-like API for the Ducklake lakehouse format.

Quick Start

from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType

# Connect to a Ducklake catalog (DuckDB, PostgreSQL, MySQL, or SQLite metadata)
catalog = Catalog("my_lake", "metadata.duckdb", data_path="./data")

# Ergonomic schema creation
schema = Schema.of(
    required("id", IntegerType()),
    optional("name", StringType()),
)

# Or with a dict (all fields optional):
schema = Schema.of({"id": IntegerType(), "name": StringType()})

# Explicit field IDs (used internally when loading from catalog):
from pyducklake import NestedField
schema = Schema(
    NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),
    NestedField(field_id=2, name="name", field_type=StringType()),
)

table = catalog.create_table("users", schema)

# Write data
import pyarrow as pa
df = pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]})
table.append(df)

# Read data
result = table.scan().to_arrow()
print(result)

Metadata Backends

Connect to any supported metadata backend by changing the URI:

Backend URI
DuckDB Catalog("lake", "meta.duckdb")
PostgreSQL Catalog("lake", "postgres:dbname=mydb host=localhost")
MySQL Catalog("lake", "mysql:host=localhost database=mydb")
SQLite Catalog("lake", "sqlite:meta.sqlite")

Key Features

  • Catalog management — Catalog provides namespace and table CRUD, views, configuration, and commit metadata.

  • Table operations — Table supports append(), overwrite(), delete(), upsert(), and add_files() for data mutation.

  • Scan API — DataScan offers an immutable builder for filtered, projected, time-traveling reads with output to Arrow, pandas, Polars, DuckDB, or streaming RecordBatchReader.

  • Schema evolution — UpdateSchema provides add_column(), drop_column(), rename_column(), and update_column() with a builder/context-manager pattern.

  • Partitioning — UpdateSpec manages hidden partitioning with identity, year, month, day, and hour transforms.

  • Sort orders — UpdateSortOrder configures sort orders applied during compaction.

  • Transactions — Transaction groups multiple operations across tables into a single atomic commit.

  • Time travel — Scan at any snapshot version or timestamp via scan().with_snapshot(id) or scan().with_timestamp(ts).

  • Change data capture — table_changes(), table_insertions(), and table_deletions() query row-level changes between snapshots.

  • Inspect API — InspectTable exposes snapshot history, data files, and partition metadata as Arrow tables.

  • Maintenance — MaintenanceTable provides compact(), expire_snapshots(), rewrite_data_files(), cleanup_files(), and checkpoint().

  • CLI — The pyducklake command-line tool provides catalog inspection and maintenance from the terminal.

Modules

Module Description
pyducklake.catalog Catalog connection and namespace/table/view management
pyducklake.table Table class with read, write, and metadata operations
pyducklake.scan DataScan builder for filtered, projected reads
pyducklake.schema Schema representation and field lookup
pyducklake.schema_evolution UpdateSchema builder for schema changes
pyducklake.types Ducklake type system and Arrow/SQL conversion
pyducklake.expressions Boolean expression tree for filters
pyducklake.partitioning Partition transforms, specs, and UpdateSpec builder
pyducklake.sorting Sort orders and UpdateSortOrder builder
pyducklake.transaction Multi-operation atomic transactions
pyducklake.inspect Metadata introspection (snapshots, files, partitions)
pyducklake.maintenance Table maintenance (compaction, expiration, cleanup)
pyducklake.snapshot Snapshot dataclass
pyducklake.exceptions Exception hierarchy
pyducklake.cli Command-line interface
  1"""
  2# pyducklake
  3
  4A Python SDK for [Ducklake](https://ducklake.select), providing a
  5[pyiceberg](https://py.iceberg.apache.org/)-like API for the Ducklake
  6lakehouse format.
  7
  8## Quick Start
  9
 10```python
 11from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
 12
 13# Connect to a Ducklake catalog (DuckDB, PostgreSQL, MySQL, or SQLite metadata)
 14catalog = Catalog("my_lake", "metadata.duckdb", data_path="./data")
 15
 16# Ergonomic schema creation
 17schema = Schema.of(
 18    required("id", IntegerType()),
 19    optional("name", StringType()),
 20)
 21
 22# Or with a dict (all fields optional):
 23schema = Schema.of({"id": IntegerType(), "name": StringType()})
 24
 25# Explicit field IDs (used internally when loading from catalog):
 26from pyducklake import NestedField
 27schema = Schema(
 28    NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),
 29    NestedField(field_id=2, name="name", field_type=StringType()),
 30)
 31
 32table = catalog.create_table("users", schema)
 33
 34# Write data
 35import pyarrow as pa
 36df = pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]})
 37table.append(df)
 38
 39# Read data
 40result = table.scan().to_arrow()
 41print(result)
 42```
 43
 44## Metadata Backends
 45
 46Connect to any supported metadata backend by changing the URI:
 47
 48| Backend | URI |
 49|---------|-----|
 50| DuckDB | `Catalog("lake", "meta.duckdb")` |
 51| PostgreSQL | `Catalog("lake", "postgres:dbname=mydb host=localhost")` |
 52| MySQL | `Catalog("lake", "mysql:host=localhost database=mydb")` |
 53| SQLite | `Catalog("lake", "sqlite:meta.sqlite")` |
 54
 55## Key Features
 56
 57- **Catalog management** — `Catalog` provides namespace and table CRUD,
 58  views, configuration, and commit metadata.
 59
 60- **Table operations** — `Table` supports `append()`, `overwrite()`, `delete()`,
 61  `upsert()`, and `add_files()` for data mutation.
 62
 63- **Scan API** — `DataScan` offers an immutable builder for filtered, projected,
 64  time-traveling reads with output to Arrow, pandas, Polars, DuckDB, or
 65  streaming `RecordBatchReader`.
 66
 67- **Schema evolution** — `UpdateSchema` provides `add_column()`, `drop_column()`,
 68  `rename_column()`, and `update_column()` with a builder/context-manager pattern.
 69
 70- **Partitioning** — `UpdateSpec` manages hidden partitioning with identity,
 71  year, month, day, and hour transforms.
 72
 73- **Sort orders** — `UpdateSortOrder` configures sort orders applied during
 74  compaction.
 75
 76- **Transactions** — `Transaction` groups multiple operations across tables
 77  into a single atomic commit.
 78
 79- **Time travel** — Scan at any snapshot version or timestamp via
 80  `scan().with_snapshot(id)` or `scan().with_timestamp(ts)`.
 81
 82- **Change data capture** — `table_changes()`, `table_insertions()`, and
 83  `table_deletions()` query row-level changes between snapshots.
 84
 85- **Inspect API** — `InspectTable` exposes snapshot history, data files,
 86  and partition metadata as Arrow tables.
 87
 88- **Maintenance** — `MaintenanceTable` provides `compact()`,
 89  `expire_snapshots()`, `rewrite_data_files()`, `cleanup_files()`, and
 90  `checkpoint()`.
 91
 92- **CLI** — The `pyducklake` command-line tool provides catalog inspection
 93  and maintenance from the terminal.
 94
 95## Modules
 96
 97| Module | Description |
 98|--------|-------------|
 99| `pyducklake.catalog` | Catalog connection and namespace/table/view management |
100| `pyducklake.table` | Table class with read, write, and metadata operations |
101| `pyducklake.scan` | DataScan builder for filtered, projected reads |
102| `pyducklake.schema` | Schema representation and field lookup |
103| `pyducklake.schema_evolution` | UpdateSchema builder for schema changes |
104| `pyducklake.types` | Ducklake type system and Arrow/SQL conversion |
105| `pyducklake.expressions` | Boolean expression tree for filters |
106| `pyducklake.partitioning` | Partition transforms, specs, and UpdateSpec builder |
107| `pyducklake.sorting` | Sort orders and UpdateSortOrder builder |
108| `pyducklake.transaction` | Multi-operation atomic transactions |
109| `pyducklake.inspect` | Metadata introspection (snapshots, files, partitions) |
110| `pyducklake.maintenance` | Table maintenance (compaction, expiration, cleanup) |
111| `pyducklake.snapshot` | Snapshot dataclass |
112| `pyducklake.exceptions` | Exception hierarchy |
113| `pyducklake.cli` | Command-line interface |
114"""
115
116__version__ = "0.1.0"
117
118from pyducklake.catalog import Catalog
119from pyducklake.cdc import ChangeSet
120from pyducklake.exceptions import (
121    CommitFailedError,
122    DucklakeError,
123    NamespaceAlreadyExistsError,
124    NamespaceNotEmptyError,
125    NoSuchNamespaceError,
126    NoSuchTableError,
127    NoSuchViewError,
128    TableAlreadyExistsError,
129    ViewAlreadyExistsError,
130)
131from pyducklake.expressions import (
132    AlwaysFalse,
133    AlwaysTrue,
134    And,
135    BooleanExpression,
136    EqualTo,
137    GreaterThan,
138    GreaterThanOrEqual,
139    In,
140    IsNaN,
141    IsNull,
142    LessThan,
143    LessThanOrEqual,
144    Not,
145    NotEqualTo,
146    NotIn,
147    NotNaN,
148    NotNull,
149    Or,
150    Reference,
151)
152from pyducklake.inspect import InspectTable
153from pyducklake.maintenance import MaintenanceTable
154from pyducklake.partitioning import (
155    DAY,
156    HOUR,
157    IDENTITY,
158    MONTH,
159    UNPARTITIONED,
160    YEAR,
161    DayTransform,
162    HourTransform,
163    IdentityTransform,
164    MonthTransform,
165    PartitionField,
166    PartitionSpec,
167    Transform,
168    UpdateSpec,
169    YearTransform,
170)
171from pyducklake.scan import DataScan
172from pyducklake.schema import Schema, optional, required
173from pyducklake.schema_evolution import UpdateSchema
174from pyducklake.snapshot import Snapshot
175from pyducklake.sorting import (
176    UNSORTED,
177    NullOrder,
178    SortDirection,
179    SortField,
180    SortOrder,
181    UpdateSortOrder,
182)
183from pyducklake.table import ArrowCompatible, ArrowStreamExportable, Table, UpsertResult
184from pyducklake.transaction import Transaction
185from pyducklake.types import (
186    BigIntType,
187    BinaryType,
188    BooleanType,
189    DateType,
190    DecimalType,
191    DoubleType,
192    DucklakeType,
193    FloatType,
194    HugeIntType,
195    IntegerType,
196    IntervalType,
197    JSONType,
198    ListType,
199    MapType,
200    NestedField,
201    SmallIntType,
202    StringType,
203    StructType,
204    TimestampType,
205    TimestampTZType,
206    TimeType,
207    TinyIntType,
208    UBigIntType,
209    UIntegerType,
210    USmallIntType,
211    UTinyIntType,
212    UUIDType,
213    arrow_type_to_ducklake,
214    ducklake_type_to_arrow,
215    ducklake_type_to_sql,
216)
217from pyducklake.view import View
218
219__all__ = [
220    "ArrowCompatible",
221    "ArrowStreamExportable",
222    "AlwaysFalse",
223    "AlwaysTrue",
224    "And",
225    "BooleanExpression",
226    "Catalog",
227    "ChangeSet",
228    "CommitFailedError",
229    "DataScan",
230    "DucklakeError",
231    "EqualTo",
232    "GreaterThan",
233    "GreaterThanOrEqual",
234    "In",
235    "InspectTable",
236    "MaintenanceTable",
237    "IsNaN",
238    "IsNull",
239    "LessThan",
240    "LessThanOrEqual",
241    "NamespaceAlreadyExistsError",
242    "NamespaceNotEmptyError",
243    "NoSuchNamespaceError",
244    "NoSuchTableError",
245    "NoSuchViewError",
246    "Not",
247    "NotEqualTo",
248    "NotIn",
249    "NotNaN",
250    "NotNull",
251    "Or",
252    "Reference",
253    "Schema",
254    "optional",
255    "required",
256    "Snapshot",
257    "Table",
258    "TableAlreadyExistsError",
259    "View",
260    "ViewAlreadyExistsError",
261    "Transaction",
262    "UpdateSchema",
263    "UpdateSortOrder",
264    "UpdateSpec",
265    "UNSORTED",
266    "UpsertResult",
267    "NullOrder",
268    "SortDirection",
269    "SortField",
270    "SortOrder",
271    "DAY",
272    "DayTransform",
273    "HOUR",
274    "HourTransform",
275    "IDENTITY",
276    "IdentityTransform",
277    "MONTH",
278    "MonthTransform",
279    "PartitionField",
280    "PartitionSpec",
281    "Transform",
282    "UNPARTITIONED",
283    "YEAR",
284    "YearTransform",
285    "BigIntType",
286    "BinaryType",
287    "BooleanType",
288    "DateType",
289    "DecimalType",
290    "DoubleType",
291    "DucklakeType",
292    "FloatType",
293    "HugeIntType",
294    "IntegerType",
295    "IntervalType",
296    "JSONType",
297    "ListType",
298    "MapType",
299    "NestedField",
300    "SmallIntType",
301    "StringType",
302    "StructType",
303    "TimeType",
304    "TimestampTZType",
305    "TimestampType",
306    "TinyIntType",
307    "UBigIntType",
308    "UIntegerType",
309    "USmallIntType",
310    "UTinyIntType",
311    "UUIDType",
312    "arrow_type_to_ducklake",
313    "ducklake_type_to_arrow",
314    "ducklake_type_to_sql",
315]
ArrowCompatible = pyarrow.lib.Table | ArrowStreamExportable
@runtime_checkable
class ArrowStreamExportable(typing.Protocol):
33@runtime_checkable
34class ArrowStreamExportable(Protocol):
35    """Protocol for objects implementing the Arrow PyCapsule interface."""
36
37    def __arrow_c_stream__(self, requested_schema: Any = None) -> Any: ...

Protocol for objects implementing the Arrow PyCapsule interface.

ArrowStreamExportable(*args, **kwargs)
1739def _no_init_or_replace_init(self, *args, **kwargs):
1740    cls = type(self)
1741
1742    if cls._is_protocol:
1743        raise TypeError('Protocols cannot be instantiated')
1744
1745    # Already using a custom `__init__`. No need to calculate correct
1746    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1747    if cls.__init__ is not _no_init_or_replace_init:
1748        return
1749
1750    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1751    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1752    # searches for a proper new `__init__` in the MRO. The new `__init__`
1753    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1754    # instantiation of the protocol subclass will thus use the new
1755    # `__init__` and no longer call `_no_init_or_replace_init`.
1756    for base in cls.__mro__:
1757        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1758        if init is not _no_init_or_replace_init:
1759            cls.__init__ = init
1760            break
1761    else:
1762        # should not happen
1763        cls.__init__ = object.__init__
1764
1765    cls.__init__(self, *args, **kwargs)
def AlwaysFalse() -> pyducklake.expressions._AlwaysFalse:
128def AlwaysFalse() -> _AlwaysFalse:  # noqa: N802
129    """Return the AlwaysFalse singleton."""
130    return _AlwaysFalse()

Return the AlwaysFalse singleton.

def AlwaysTrue() -> pyducklake.expressions._AlwaysTrue:
123def AlwaysTrue() -> _AlwaysTrue:  # noqa: N802
124    """Return the AlwaysTrue singleton."""
125    return _AlwaysTrue()

Return the AlwaysTrue singleton.

@dataclass(frozen=True)
class And(pyducklake.BooleanExpression):
166@dataclass(frozen=True)
167class And(BooleanExpression):
168    """Logical AND with short-circuit simplification."""
169
170    left: BooleanExpression
171    right: BooleanExpression
172
173    def __new__(cls, left: BooleanExpression, right: BooleanExpression) -> BooleanExpression:  # type: ignore[misc]
174        if isinstance(left, _AlwaysTrue):
175            return right
176        if isinstance(right, _AlwaysTrue):
177            return left
178        if isinstance(left, _AlwaysFalse) or isinstance(right, _AlwaysFalse):
179            return AlwaysFalse()
180        instance = super().__new__(cls)
181        return instance
182
183    def to_sql(self) -> str:
184        return f"({self.left.to_sql()} AND {self.right.to_sql()})"
185
186    def __repr__(self) -> str:
187        return f"And(left={self.left!r}, right={self.right!r})"

Logical AND with short-circuit simplification.

And( left: BooleanExpression, right: BooleanExpression)
def to_sql(self) -> str:
183    def to_sql(self) -> str:
184        return f"({self.left.to_sql()} AND {self.right.to_sql()})"
class BooleanExpression(abc.ABC):
61class BooleanExpression(ABC):
62    """Base for all filter expressions."""
63
64    @abstractmethod
65    def to_sql(self) -> str: ...
66
67    def __and__(self, other: BooleanExpression) -> BooleanExpression:
68        return And(self, other)
69
70    def __or__(self, other: BooleanExpression) -> BooleanExpression:
71        return Or(self, other)
72
73    def __invert__(self) -> BooleanExpression:
74        return Not(self)

Base for all filter expressions.

@abstractmethod
def to_sql(self) -> str:
64    @abstractmethod
65    def to_sql(self) -> str: ...
class Catalog:
195class Catalog:
196    """Ducklake catalog backed by DuckDB + ducklake extension.
197
198    Examples:
199
200    ```pycon
201    >>> import tempfile, os
202    >>> tmp = tempfile.mkdtemp()
203    >>> cat = Catalog("my_lake", os.path.join(tmp, "meta.duckdb"), data_path=os.path.join(tmp, "data"))
204    >>> cat.name
205    'my_lake'
206
207    ```
208    """
209
210    def __init__(
211        self,
212        name: str,
213        uri: str,
214        *,
215        data_path: str | None = None,
216        properties: dict[str, str] | None = None,
217        encrypted: bool = False,
218    ) -> None:
219        self._name = name
220        self._uri = uri
221        self._data_path = data_path
222        self._encrypted = encrypted
223
224        self._conn = duckdb.connect()
225        self._conn.execute("INSTALL ducklake; LOAD ducklake;")
226
227        # Apply connection properties (e.g., S3 configuration)
228        for key, value in (properties or {}).items():
229            if not self._OPTION_KEY_RE.match(key):
230                raise ValueError(
231                    f"Invalid property key: {key!r}. Keys must contain only alphanumeric characters and underscores."
232                )
233            self._conn.execute(f"SET {key} = '{escape_string_literal(value)}'")
234
235        attach_sql = f"ATTACH 'ducklake:{uri}' AS {quote_identifier(name)}"
236        options: list[str] = []
237        if data_path is not None:
238            options.append(f"DATA_PATH '{escape_string_literal(data_path)}'")
239        if encrypted:
240            options.append("ENCRYPTED")
241        if options:
242            attach_sql += f" ({', '.join(options)})"
243        self._conn.execute(attach_sql)
244
245    @property
246    def name(self) -> str:
247        return self._name
248
249    @property
250    def encrypted(self) -> bool:
251        """Whether this catalog uses Parquet encryption."""
252        return self._encrypted
253
254    @property
255    def connection(self) -> duckdb.DuckDBPyConnection:
256        """Access the underlying DuckDB connection."""
257        return self._conn
258
259    # -- Namespace operations ------------------------------------------------
260
261    def list_namespaces(self) -> list[str]:
262        """List all namespaces (schemas)."""
263        rows = self.fetchall(
264            "SELECT schema_name FROM information_schema.schemata "
265            f"WHERE catalog_name = '{escape_string_literal(self._name)}' "
266            "ORDER BY schema_name"
267        )
268        return [row[0] for row in rows]
269
270    def create_namespace(self, namespace: str) -> None:
271        """CREATE SCHEMA. Raises NamespaceAlreadyExistsError if exists."""
272        if self.namespace_exists(namespace):
273            raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}")
274        self._execute(f"CREATE SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}")
275
276    def create_namespace_if_not_exists(self, namespace: str) -> None:
277        """Create namespace, no-op if already exists."""
278        self._execute(f"CREATE SCHEMA IF NOT EXISTS {quote_identifier(self._name)}.{quote_identifier(namespace)}")
279
280    def drop_namespace(self, namespace: str) -> None:
281        """DROP SCHEMA. Raises NamespaceNotEmptyError if non-empty, NoSuchNamespaceError if not found."""
282        if not self.namespace_exists(namespace):
283            raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")
284        tables = self.list_tables(namespace)
285        views = self.list_views(namespace)
286        if tables or views:
287            raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}")
288        self._execute(f"DROP SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}")
289
290    def namespace_exists(self, namespace: str) -> bool:
291        rows = self.fetchall(
292            "SELECT 1 FROM information_schema.schemata "
293            f"WHERE catalog_name = '{escape_string_literal(self._name)}' "
294            f"AND schema_name = '{escape_string_literal(namespace)}'"
295        )
296        return len(rows) > 0
297
298    # -- Table operations ----------------------------------------------------
299
300    def list_tables(self, namespace: str = "main") -> list[tuple[str, str]]:
301        """List tables in namespace. Returns list of (namespace, table_name) tuples."""
302        rows = self.fetchall(
303            "SELECT table_schema, table_name FROM information_schema.tables "
304            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
305            f"AND table_schema = '{escape_string_literal(namespace)}' "
306            "AND table_type != 'VIEW' "
307            "ORDER BY table_name"
308        )
309        return [(row[0], row[1]) for row in rows]
310
311    def create_table(
312        self,
313        identifier: str | tuple[str, str],
314        schema: Schema,
315    ) -> Table:
316        """CREATE TABLE with the given schema. Raises TableAlreadyExistsError if exists.
317
318        Examples:
319
320        ```pycon
321        >>> import tempfile, os
322        >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
323        >>> tmp = tempfile.mkdtemp()
324        >>> cat = Catalog("test", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
325        >>> schema = Schema.of(required("id", IntegerType()), optional("name", StringType()))
326        >>> table = cat.create_table("users", schema)
327        >>> table.name
328        'users'
329        >>> [f.name for f in table.schema.fields]
330        ['id', 'name']
331
332        ```
333        """
334        namespace, table_name = self._resolve_identifier(identifier)
335
336        if self.table_exists((namespace, table_name)):
337            raise TableAlreadyExistsError(f"Table already exists: {namespace}.{table_name}")
338
339        fqn = self.fully_qualified_name(namespace, table_name)
340        col_defs = ", ".join(
341            f"{quote_identifier(f.name)} {ducklake_type_to_sql(f.field_type)}{' NOT NULL' if f.required else ''}"
342            for f in schema.fields
343        )
344        self._execute(f"CREATE TABLE {fqn} ({col_defs})")
345
346        # Reload schema from DuckDB to get canonical types
347        loaded_schema = self.build_schema_from_describe(namespace, table_name)
348        from pyducklake.sorting import UNSORTED
349
350        return Table(identifier=(namespace, table_name), schema=loaded_schema, catalog=self, sort_order=UNSORTED)
351
352    def create_table_if_not_exists(
353        self,
354        identifier: str | tuple[str, str],
355        schema: Schema,
356    ) -> Table:
357        """Create table if it doesn't exist, otherwise load and return existing."""
358        namespace, table_name = self._resolve_identifier(identifier)
359        if self.table_exists((namespace, table_name)):
360            return self.load_table((namespace, table_name))
361        return self.create_table((namespace, table_name), schema)
362
363    def load_table(self, identifier: str | tuple[str, str]) -> Table:
364        """Load table metadata. Raises NoSuchTableError if not found."""
365        namespace, table_name = self._resolve_identifier(identifier)
366        if not self.table_exists((namespace, table_name)):
367            raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}")
368        schema = self.build_schema_from_describe(namespace, table_name)
369        return Table(identifier=(namespace, table_name), schema=schema, catalog=self)
370
371    def drop_table(self, identifier: str | tuple[str, str]) -> None:
372        """DROP TABLE. Raises NoSuchTableError if not found."""
373        namespace, table_name = self._resolve_identifier(identifier)
374        if not self.table_exists((namespace, table_name)):
375            raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}")
376        fqn = self.fully_qualified_name(namespace, table_name)
377        self._execute(f"DROP TABLE {fqn}")
378
379    def rename_table(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> Table:
380        """ALTER TABLE RENAME TO. Returns the renamed table."""
381        from_ns, from_name = self._resolve_identifier(from_identifier)
382        to_ns, to_name = self._resolve_identifier(to_identifier)
383
384        if not self.table_exists((from_ns, from_name)):
385            raise NoSuchTableError(f"Table does not exist: {from_ns}.{from_name}")
386
387        if from_ns != to_ns:
388            raise DucklakeError("Cross-namespace rename is not supported")
389
390        from_fqn = self.fully_qualified_name(from_ns, from_name)
391        # DuckDB RENAME TO expects just the new table name, not fully qualified
392        self._execute(f"ALTER TABLE {from_fqn} RENAME TO {quote_identifier(to_name)}")
393        return self.load_table((to_ns, to_name))
394
395    def table_exists(self, identifier: str | tuple[str, str]) -> bool:
396        namespace, table_name = self._resolve_identifier(identifier)
397        rows = self.fetchall(
398            "SELECT 1 FROM information_schema.tables "
399            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
400            f"AND table_schema = '{escape_string_literal(namespace)}' "
401            f"AND table_name = '{escape_string_literal(table_name)}'"
402        )
403        return len(rows) > 0
404
405    # -- View operations -----------------------------------------------------
406
407    def create_view(
408        self,
409        identifier: str | tuple[str, str],
410        sql: str,
411    ) -> View:
412        """Create a view in the catalog.
413
414        Uses: CREATE VIEW {fqn} AS {sql}
415
416        Raises ViewAlreadyExistsError if the view already exists.
417        """
418        namespace, view_name = self._resolve_identifier(identifier)
419        if self.view_exists((namespace, view_name)):
420            raise ViewAlreadyExistsError(f"View already exists: {namespace}.{view_name}")
421        fqn = self.fully_qualified_name(namespace, view_name)
422        self._execute(f"CREATE VIEW {fqn} AS {sql}")
423        return self.load_view((namespace, view_name))
424
425    def create_or_replace_view(
426        self,
427        identifier: str | tuple[str, str],
428        sql: str,
429    ) -> View:
430        """Create or replace a view. Returns the View object."""
431        namespace, view_name = self._resolve_identifier(identifier)
432        fqn = self.fully_qualified_name(namespace, view_name)
433        self._execute(f"CREATE OR REPLACE VIEW {fqn} AS {sql}")
434        return self.load_view((namespace, view_name))
435
436    def load_view(self, identifier: str | tuple[str, str]) -> View:
437        """Load a view. Raises NoSuchViewError if not found.
438
439        Queries the view's schema and SQL definition.
440        """
441        namespace, view_name = self._resolve_identifier(identifier)
442        if not self.view_exists((namespace, view_name)):
443            raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}")
444        schema = self.build_schema_from_describe(namespace, view_name)
445        # Get the SQL definition
446        rows = self.fetchall(
447            "SELECT view_definition FROM information_schema.views "
448            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
449            f"AND table_schema = '{escape_string_literal(namespace)}' "
450            f"AND table_name = '{escape_string_literal(view_name)}'"
451        )
452        sql_text = rows[0][0] if rows else ""
453        return View(identifier=(namespace, view_name), schema=schema, sql=sql_text, catalog=self)
454
455    def rename_view(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> View:
456        """Rename a view. Raises NoSuchViewError if not found."""
457        from_ns, from_name = self._resolve_identifier(from_identifier)
458        to_ns, to_name = self._resolve_identifier(to_identifier)
459
460        if not self.view_exists((from_ns, from_name)):
461            raise NoSuchViewError(f"View does not exist: {from_ns}.{from_name}")
462
463        if from_ns != to_ns:
464            raise DucklakeError("Cross-namespace rename is not supported")
465
466        from_fqn = self.fully_qualified_name(from_ns, from_name)
467        self._execute(f"ALTER VIEW {from_fqn} RENAME TO {quote_identifier(to_name)}")
468        return self.load_view((to_ns, to_name))
469
470    def drop_view(self, identifier: str | tuple[str, str]) -> None:
471        """Drop a view. Raises NoSuchViewError if not found."""
472        namespace, view_name = self._resolve_identifier(identifier)
473        if not self.view_exists((namespace, view_name)):
474            raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}")
475        fqn = self.fully_qualified_name(namespace, view_name)
476        self._execute(f"DROP VIEW {fqn}")
477
478    def list_views(self, namespace: str = "main") -> list[tuple[str, str]]:
479        """List views in namespace. Returns list of (namespace, view_name) tuples."""
480        rows = self.fetchall(
481            "SELECT table_schema, table_name FROM information_schema.tables "
482            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
483            f"AND table_schema = '{escape_string_literal(namespace)}' "
484            "AND table_type = 'VIEW' "
485            "ORDER BY table_name"
486        )
487        return [(row[0], row[1]) for row in rows]
488
489    def view_exists(self, identifier: str | tuple[str, str]) -> bool:
490        """Check if a view exists."""
491        namespace, view_name = self._resolve_identifier(identifier)
492        rows = self.fetchall(
493            "SELECT 1 FROM information_schema.tables "
494            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
495            f"AND table_schema = '{escape_string_literal(namespace)}' "
496            f"AND table_name = '{escape_string_literal(view_name)}' "
497            "AND table_type = 'VIEW'"
498        )
499        return len(rows) > 0
500
501    # -- Internal helpers ----------------------------------------------------
502
503    def _resolve_identifier(self, identifier: str | tuple[str, str]) -> tuple[str, str]:
504        """Parse identifier into (namespace, table_name). Default namespace is 'main'."""
505        if isinstance(identifier, tuple):
506            return identifier
507        parts = identifier.split(".", 1)
508        if len(parts) == 2:
509            return (parts[0], parts[1])
510        return ("main", parts[0])
511
512    def fully_qualified_name(self, namespace: str, table_name: str) -> str:
513        """Returns 'catalog_name.namespace.table_name' with proper quoting."""
514        return f"{quote_identifier(self._name)}.{quote_identifier(namespace)}.{quote_identifier(table_name)}"
515
516    def _execute(self, sql: str, params: list[Any] | None = None) -> duckdb.DuckDBPyConnection:
517        """Execute SQL on the connection."""
518        if params:
519            return self._conn.execute(sql, params)
520        return self._conn.execute(sql)
521
522    def fetchall(self, sql: str) -> list[tuple[Any, ...]]:
523        """Execute and fetchall."""
524        result = self._conn.execute(sql)
525        rows: list[tuple[Any, ...]] = result.fetchall()
526        return rows
527
528    def build_schema_from_describe(self, namespace: str, table_name: str) -> Schema:
529        """Build a Schema from information_schema.columns for an existing table."""
530        rows = self.fetchall(
531            "SELECT column_name, data_type, is_nullable FROM information_schema.columns "
532            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
533            f"AND table_schema = '{escape_string_literal(namespace)}' "
534            f"AND table_name = '{escape_string_literal(table_name)}' "
535            "ORDER BY ordinal_position"
536        )
537        # field_id_counter tracks IDs for nested types as well
538        field_id_counter = [1]
539        fields: list[NestedField] = []
540        for col_name, col_type, is_nullable in rows:
541            dtype = _duckdb_type_to_ducklake(col_type, field_id_counter)
542            fid = field_id_counter[0]
543            field_id_counter[0] += 1
544            required = is_nullable == "NO"
545            fields.append(NestedField(field_id=fid, name=col_name, field_type=dtype, required=required))
546        return Schema(*fields)
547
548    # -- Commit Metadata -----------------------------------------------------
549
550    def set_commit_message(self, message: str, *, author: str | None = None) -> None:
551        """Set commit message and optional author for the next transaction.
552
553        Must be called inside an explicit ``BEGIN TRANSACTION`` / ``COMMIT``
554        block for the metadata to be recorded on the snapshot.
555
556        Uses: ``CALL {catalog}.set_commit_message(author, commit_message)``
557        """
558        cat = quote_identifier(self._name)
559        author_sql = f"'{escape_string_literal(author)}'" if author is not None else "NULL"
560        msg_sql = f"'{escape_string_literal(message)}'"
561        self._execute(f"CALL {cat}.set_commit_message({author_sql}, {msg_sql})")
562
563    # -- Configuration -------------------------------------------------------
564
565    _OPTION_KEY_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
566    _SCOPE_PART_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
567
568    def set_option(
569        self,
570        key: str,
571        value: str,
572        *,
573        scope: str | None = None,
574    ) -> None:
575        """Set a Ducklake configuration option.
576
577        Args:
578            key: Option key (e.g., ``'target_file_size'``).
579            value: Option value.
580            scope: Optional ``'schema.table'`` for table-level scope.
581                   ``None`` for global scope.
582
583        Uses: ``CALL {catalog}.set_option(option, value, table_name, schema)``
584
585        Raises:
586            ValueError: If ``key`` or ``scope`` contain invalid characters.
587        """
588        if not self._OPTION_KEY_RE.match(key):
589            raise ValueError(
590                f"Invalid option key: {key!r}. Keys must contain only alphanumeric characters and underscores."
591            )
592
593        cat = quote_identifier(self._name)
594        key_sql = f"'{escape_string_literal(key)}'"
595        value_sql = f"'{escape_string_literal(value)}'"
596        if scope is not None:
597            parts = scope.split(".", 1)
598            if len(parts) == 2:
599                schema_name, table_name = parts
600            else:
601                schema_name = "main"
602                table_name = parts[0]
603            for part_label, part_value in [("schema", schema_name), ("table", table_name)]:
604                if not self._SCOPE_PART_RE.match(part_value):
605                    raise ValueError(
606                        f"Invalid scope {part_label}: {part_value!r}. "
607                        "Scope parts must contain only alphanumeric characters and underscores."
608                    )
609            self._execute(
610                f"CALL {cat}.set_option("
611                f"{key_sql}, {value_sql}, "
612                f"table_name := '{escape_string_literal(table_name)}', "
613                f"schema := '{escape_string_literal(schema_name)}')"
614            )
615        else:
616            self._execute(f"CALL {cat}.set_option({key_sql}, {value_sql})")
617
618    def get_options(self) -> pa.Table:
619        """Get all configuration options as an Arrow table.
620
621        Columns: option_name, description, value, scope, scope_entry
622        """
623        cat = quote_identifier(self._name)
624        result: Any = self._conn.execute(f"SELECT * FROM {cat}.options()")
625        arrow_obj: Any = result.arrow()
626        if isinstance(arrow_obj, pa.Table):
627            return arrow_obj
628        tbl: pa.Table = arrow_obj.read_all()
629        return tbl
630
631    # -- Transaction ---------------------------------------------------------
632
633    def begin_transaction(self) -> Transaction:
634        """Begin a multi-operation transaction.
635
636        All table operations within the transaction are committed atomically.
637        """
638        from pyducklake.transaction import Transaction
639
640        return Transaction(self)
641
642    # -- Context manager -----------------------------------------------------
643
644    def __enter__(self) -> Catalog:
645        return self
646
647    def __exit__(self, *args: Any) -> None:
648        self.close()
649
650    def close(self) -> None:
651        """Close the DuckDB connection."""
652        self._conn.close()

Ducklake catalog backed by DuckDB + ducklake extension.

Examples:

>>> import tempfile, os
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("my_lake", os.path.join(tmp, "meta.duckdb"), data_path=os.path.join(tmp, "data"))
>>> cat.name
'my_lake'
Catalog( name: str, uri: str, *, data_path: str | None = None, properties: dict[str, str] | None = None, encrypted: bool = False)
210    def __init__(
211        self,
212        name: str,
213        uri: str,
214        *,
215        data_path: str | None = None,
216        properties: dict[str, str] | None = None,
217        encrypted: bool = False,
218    ) -> None:
219        self._name = name
220        self._uri = uri
221        self._data_path = data_path
222        self._encrypted = encrypted
223
224        self._conn = duckdb.connect()
225        self._conn.execute("INSTALL ducklake; LOAD ducklake;")
226
227        # Apply connection properties (e.g., S3 configuration)
228        for key, value in (properties or {}).items():
229            if not self._OPTION_KEY_RE.match(key):
230                raise ValueError(
231                    f"Invalid property key: {key!r}. Keys must contain only alphanumeric characters and underscores."
232                )
233            self._conn.execute(f"SET {key} = '{escape_string_literal(value)}'")
234
235        attach_sql = f"ATTACH 'ducklake:{uri}' AS {quote_identifier(name)}"
236        options: list[str] = []
237        if data_path is not None:
238            options.append(f"DATA_PATH '{escape_string_literal(data_path)}'")
239        if encrypted:
240            options.append("ENCRYPTED")
241        if options:
242            attach_sql += f" ({', '.join(options)})"
243        self._conn.execute(attach_sql)
name: str
245    @property
246    def name(self) -> str:
247        return self._name
encrypted: bool
249    @property
250    def encrypted(self) -> bool:
251        """Whether this catalog uses Parquet encryption."""
252        return self._encrypted

Whether this catalog uses Parquet encryption.

connection: _duckdb.DuckDBPyConnection
254    @property
255    def connection(self) -> duckdb.DuckDBPyConnection:
256        """Access the underlying DuckDB connection."""
257        return self._conn

Access the underlying DuckDB connection.

def list_namespaces(self) -> list[str]:
261    def list_namespaces(self) -> list[str]:
262        """List all namespaces (schemas)."""
263        rows = self.fetchall(
264            "SELECT schema_name FROM information_schema.schemata "
265            f"WHERE catalog_name = '{escape_string_literal(self._name)}' "
266            "ORDER BY schema_name"
267        )
268        return [row[0] for row in rows]

List all namespaces (schemas).

def create_namespace(self, namespace: str) -> None:
270    def create_namespace(self, namespace: str) -> None:
271        """CREATE SCHEMA. Raises NamespaceAlreadyExistsError if exists."""
272        if self.namespace_exists(namespace):
273            raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}")
274        self._execute(f"CREATE SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}")

CREATE SCHEMA. Raises NamespaceAlreadyExistsError if exists.

def create_namespace_if_not_exists(self, namespace: str) -> None:
276    def create_namespace_if_not_exists(self, namespace: str) -> None:
277        """Create namespace, no-op if already exists."""
278        self._execute(f"CREATE SCHEMA IF NOT EXISTS {quote_identifier(self._name)}.{quote_identifier(namespace)}")

Create namespace, no-op if already exists.

def drop_namespace(self, namespace: str) -> None:
280    def drop_namespace(self, namespace: str) -> None:
281        """DROP SCHEMA. Raises NamespaceNotEmptyError if non-empty, NoSuchNamespaceError if not found."""
282        if not self.namespace_exists(namespace):
283            raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")
284        tables = self.list_tables(namespace)
285        views = self.list_views(namespace)
286        if tables or views:
287            raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}")
288        self._execute(f"DROP SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}")

DROP SCHEMA. Raises NamespaceNotEmptyError if non-empty, NoSuchNamespaceError if not found.

def namespace_exists(self, namespace: str) -> bool:
290    def namespace_exists(self, namespace: str) -> bool:
291        rows = self.fetchall(
292            "SELECT 1 FROM information_schema.schemata "
293            f"WHERE catalog_name = '{escape_string_literal(self._name)}' "
294            f"AND schema_name = '{escape_string_literal(namespace)}'"
295        )
296        return len(rows) > 0
def list_tables(self, namespace: str = 'main') -> list[tuple[str, str]]:
300    def list_tables(self, namespace: str = "main") -> list[tuple[str, str]]:
301        """List tables in namespace. Returns list of (namespace, table_name) tuples."""
302        rows = self.fetchall(
303            "SELECT table_schema, table_name FROM information_schema.tables "
304            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
305            f"AND table_schema = '{escape_string_literal(namespace)}' "
306            "AND table_type != 'VIEW' "
307            "ORDER BY table_name"
308        )
309        return [(row[0], row[1]) for row in rows]

List tables in namespace. Returns list of (namespace, table_name) tuples.

def create_table( self, identifier: str | tuple[str, str], schema: Schema) -> Table:
311    def create_table(
312        self,
313        identifier: str | tuple[str, str],
314        schema: Schema,
315    ) -> Table:
316        """CREATE TABLE with the given schema. Raises TableAlreadyExistsError if exists.
317
318        Examples:
319
320        ```pycon
321        >>> import tempfile, os
322        >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
323        >>> tmp = tempfile.mkdtemp()
324        >>> cat = Catalog("test", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
325        >>> schema = Schema.of(required("id", IntegerType()), optional("name", StringType()))
326        >>> table = cat.create_table("users", schema)
327        >>> table.name
328        'users'
329        >>> [f.name for f in table.schema.fields]
330        ['id', 'name']
331
332        ```
333        """
334        namespace, table_name = self._resolve_identifier(identifier)
335
336        if self.table_exists((namespace, table_name)):
337            raise TableAlreadyExistsError(f"Table already exists: {namespace}.{table_name}")
338
339        fqn = self.fully_qualified_name(namespace, table_name)
340        col_defs = ", ".join(
341            f"{quote_identifier(f.name)} {ducklake_type_to_sql(f.field_type)}{' NOT NULL' if f.required else ''}"
342            for f in schema.fields
343        )
344        self._execute(f"CREATE TABLE {fqn} ({col_defs})")
345
346        # Reload schema from DuckDB to get canonical types
347        loaded_schema = self.build_schema_from_describe(namespace, table_name)
348        from pyducklake.sorting import UNSORTED
349
350        return Table(identifier=(namespace, table_name), schema=loaded_schema, catalog=self, sort_order=UNSORTED)

CREATE TABLE with the given schema. Raises TableAlreadyExistsError if exists.

Examples:

>>> import tempfile, os
>>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("test", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
>>> schema = Schema.of(required("id", IntegerType()), optional("name", StringType()))
>>> table = cat.create_table("users", schema)
>>> table.name
'users'
>>> [f.name for f in table.schema.fields]
['id', 'name']
def create_table_if_not_exists( self, identifier: str | tuple[str, str], schema: Schema) -> Table:
352    def create_table_if_not_exists(
353        self,
354        identifier: str | tuple[str, str],
355        schema: Schema,
356    ) -> Table:
357        """Create table if it doesn't exist, otherwise load and return existing."""
358        namespace, table_name = self._resolve_identifier(identifier)
359        if self.table_exists((namespace, table_name)):
360            return self.load_table((namespace, table_name))
361        return self.create_table((namespace, table_name), schema)

Create table if it doesn't exist, otherwise load and return existing.

def load_table(self, identifier: str | tuple[str, str]) -> Table:
363    def load_table(self, identifier: str | tuple[str, str]) -> Table:
364        """Load table metadata. Raises NoSuchTableError if not found."""
365        namespace, table_name = self._resolve_identifier(identifier)
366        if not self.table_exists((namespace, table_name)):
367            raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}")
368        schema = self.build_schema_from_describe(namespace, table_name)
369        return Table(identifier=(namespace, table_name), schema=schema, catalog=self)

Load table metadata. Raises NoSuchTableError if not found.

def drop_table(self, identifier: str | tuple[str, str]) -> None:
371    def drop_table(self, identifier: str | tuple[str, str]) -> None:
372        """DROP TABLE. Raises NoSuchTableError if not found."""
373        namespace, table_name = self._resolve_identifier(identifier)
374        if not self.table_exists((namespace, table_name)):
375            raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}")
376        fqn = self.fully_qualified_name(namespace, table_name)
377        self._execute(f"DROP TABLE {fqn}")

DROP TABLE. Raises NoSuchTableError if not found.

def rename_table( self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> Table:
379    def rename_table(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> Table:
380        """ALTER TABLE RENAME TO. Returns the renamed table."""
381        from_ns, from_name = self._resolve_identifier(from_identifier)
382        to_ns, to_name = self._resolve_identifier(to_identifier)
383
384        if not self.table_exists((from_ns, from_name)):
385            raise NoSuchTableError(f"Table does not exist: {from_ns}.{from_name}")
386
387        if from_ns != to_ns:
388            raise DucklakeError("Cross-namespace rename is not supported")
389
390        from_fqn = self.fully_qualified_name(from_ns, from_name)
391        # DuckDB RENAME TO expects just the new table name, not fully qualified
392        self._execute(f"ALTER TABLE {from_fqn} RENAME TO {quote_identifier(to_name)}")
393        return self.load_table((to_ns, to_name))

ALTER TABLE RENAME TO. Returns the renamed table.

def table_exists(self, identifier: str | tuple[str, str]) -> bool:
395    def table_exists(self, identifier: str | tuple[str, str]) -> bool:
396        namespace, table_name = self._resolve_identifier(identifier)
397        rows = self.fetchall(
398            "SELECT 1 FROM information_schema.tables "
399            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
400            f"AND table_schema = '{escape_string_literal(namespace)}' "
401            f"AND table_name = '{escape_string_literal(table_name)}'"
402        )
403        return len(rows) > 0
def create_view( self, identifier: str | tuple[str, str], sql: str) -> View:
407    def create_view(
408        self,
409        identifier: str | tuple[str, str],
410        sql: str,
411    ) -> View:
412        """Create a view in the catalog.
413
414        Uses: CREATE VIEW {fqn} AS {sql}
415
416        Raises ViewAlreadyExistsError if the view already exists.
417        """
418        namespace, view_name = self._resolve_identifier(identifier)
419        if self.view_exists((namespace, view_name)):
420            raise ViewAlreadyExistsError(f"View already exists: {namespace}.{view_name}")
421        fqn = self.fully_qualified_name(namespace, view_name)
422        self._execute(f"CREATE VIEW {fqn} AS {sql}")
423        return self.load_view((namespace, view_name))

Create a view in the catalog.

Uses: CREATE VIEW {fqn} AS {sql}

Raises ViewAlreadyExistsError if the view already exists.

def create_or_replace_view( self, identifier: str | tuple[str, str], sql: str) -> View:
425    def create_or_replace_view(
426        self,
427        identifier: str | tuple[str, str],
428        sql: str,
429    ) -> View:
430        """Create or replace a view. Returns the View object."""
431        namespace, view_name = self._resolve_identifier(identifier)
432        fqn = self.fully_qualified_name(namespace, view_name)
433        self._execute(f"CREATE OR REPLACE VIEW {fqn} AS {sql}")
434        return self.load_view((namespace, view_name))

Create or replace a view. Returns the View object.

def load_view(self, identifier: str | tuple[str, str]) -> View:
436    def load_view(self, identifier: str | tuple[str, str]) -> View:
437        """Load a view. Raises NoSuchViewError if not found.
438
439        Queries the view's schema and SQL definition.
440        """
441        namespace, view_name = self._resolve_identifier(identifier)
442        if not self.view_exists((namespace, view_name)):
443            raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}")
444        schema = self.build_schema_from_describe(namespace, view_name)
445        # Get the SQL definition
446        rows = self.fetchall(
447            "SELECT view_definition FROM information_schema.views "
448            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
449            f"AND table_schema = '{escape_string_literal(namespace)}' "
450            f"AND table_name = '{escape_string_literal(view_name)}'"
451        )
452        sql_text = rows[0][0] if rows else ""
453        return View(identifier=(namespace, view_name), schema=schema, sql=sql_text, catalog=self)

Load a view. Raises NoSuchViewError if not found.

Queries the view's schema and SQL definition.

def rename_view( self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> View:
455    def rename_view(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> View:
456        """Rename a view. Raises NoSuchViewError if not found."""
457        from_ns, from_name = self._resolve_identifier(from_identifier)
458        to_ns, to_name = self._resolve_identifier(to_identifier)
459
460        if not self.view_exists((from_ns, from_name)):
461            raise NoSuchViewError(f"View does not exist: {from_ns}.{from_name}")
462
463        if from_ns != to_ns:
464            raise DucklakeError("Cross-namespace rename is not supported")
465
466        from_fqn = self.fully_qualified_name(from_ns, from_name)
467        self._execute(f"ALTER VIEW {from_fqn} RENAME TO {quote_identifier(to_name)}")
468        return self.load_view((to_ns, to_name))

Rename a view. Raises NoSuchViewError if not found.

def drop_view(self, identifier: str | tuple[str, str]) -> None:
470    def drop_view(self, identifier: str | tuple[str, str]) -> None:
471        """Drop a view. Raises NoSuchViewError if not found."""
472        namespace, view_name = self._resolve_identifier(identifier)
473        if not self.view_exists((namespace, view_name)):
474            raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}")
475        fqn = self.fully_qualified_name(namespace, view_name)
476        self._execute(f"DROP VIEW {fqn}")

Drop a view. Raises NoSuchViewError if not found.

def list_views(self, namespace: str = 'main') -> list[tuple[str, str]]:
478    def list_views(self, namespace: str = "main") -> list[tuple[str, str]]:
479        """List views in namespace. Returns list of (namespace, view_name) tuples."""
480        rows = self.fetchall(
481            "SELECT table_schema, table_name FROM information_schema.tables "
482            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
483            f"AND table_schema = '{escape_string_literal(namespace)}' "
484            "AND table_type = 'VIEW' "
485            "ORDER BY table_name"
486        )
487        return [(row[0], row[1]) for row in rows]

List views in namespace. Returns list of (namespace, view_name) tuples.

def view_exists(self, identifier: str | tuple[str, str]) -> bool:
489    def view_exists(self, identifier: str | tuple[str, str]) -> bool:
490        """Check if a view exists."""
491        namespace, view_name = self._resolve_identifier(identifier)
492        rows = self.fetchall(
493            "SELECT 1 FROM information_schema.tables "
494            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
495            f"AND table_schema = '{escape_string_literal(namespace)}' "
496            f"AND table_name = '{escape_string_literal(view_name)}' "
497            "AND table_type = 'VIEW'"
498        )
499        return len(rows) > 0

Check if a view exists.

def fully_qualified_name(self, namespace: str, table_name: str) -> str:
512    def fully_qualified_name(self, namespace: str, table_name: str) -> str:
513        """Returns 'catalog_name.namespace.table_name' with proper quoting."""
514        return f"{quote_identifier(self._name)}.{quote_identifier(namespace)}.{quote_identifier(table_name)}"

Returns 'catalog_name.namespace.table_name' with proper quoting.

def fetchall(self, sql: str) -> list[tuple[typing.Any, ...]]:
522    def fetchall(self, sql: str) -> list[tuple[Any, ...]]:
523        """Execute and fetchall."""
524        result = self._conn.execute(sql)
525        rows: list[tuple[Any, ...]] = result.fetchall()
526        return rows

Execute and fetchall.

def build_schema_from_describe(self, namespace: str, table_name: str) -> Schema:
528    def build_schema_from_describe(self, namespace: str, table_name: str) -> Schema:
529        """Build a Schema from information_schema.columns for an existing table."""
530        rows = self.fetchall(
531            "SELECT column_name, data_type, is_nullable FROM information_schema.columns "
532            f"WHERE table_catalog = '{escape_string_literal(self._name)}' "
533            f"AND table_schema = '{escape_string_literal(namespace)}' "
534            f"AND table_name = '{escape_string_literal(table_name)}' "
535            "ORDER BY ordinal_position"
536        )
537        # field_id_counter tracks IDs for nested types as well
538        field_id_counter = [1]
539        fields: list[NestedField] = []
540        for col_name, col_type, is_nullable in rows:
541            dtype = _duckdb_type_to_ducklake(col_type, field_id_counter)
542            fid = field_id_counter[0]
543            field_id_counter[0] += 1
544            required = is_nullable == "NO"
545            fields.append(NestedField(field_id=fid, name=col_name, field_type=dtype, required=required))
546        return Schema(*fields)

Build a Schema from information_schema.columns for an existing table.

def set_commit_message(self, message: str, *, author: str | None = None) -> None:
550    def set_commit_message(self, message: str, *, author: str | None = None) -> None:
551        """Set commit message and optional author for the next transaction.
552
553        Must be called inside an explicit ``BEGIN TRANSACTION`` / ``COMMIT``
554        block for the metadata to be recorded on the snapshot.
555
556        Uses: ``CALL {catalog}.set_commit_message(author, commit_message)``
557        """
558        cat = quote_identifier(self._name)
559        author_sql = f"'{escape_string_literal(author)}'" if author is not None else "NULL"
560        msg_sql = f"'{escape_string_literal(message)}'"
561        self._execute(f"CALL {cat}.set_commit_message({author_sql}, {msg_sql})")

Set commit message and optional author for the next transaction.

Must be called inside an explicit BEGIN TRANSACTION / COMMIT block for the metadata to be recorded on the snapshot.

Uses: CALL {catalog}.set_commit_message(author, commit_message)

def set_option(self, key: str, value: str, *, scope: str | None = None) -> None:
568    def set_option(
569        self,
570        key: str,
571        value: str,
572        *,
573        scope: str | None = None,
574    ) -> None:
575        """Set a Ducklake configuration option.
576
577        Args:
578            key: Option key (e.g., ``'target_file_size'``).
579            value: Option value.
580            scope: Optional ``'schema.table'`` for table-level scope.
581                   ``None`` for global scope.
582
583        Uses: ``CALL {catalog}.set_option(option, value, table_name, schema)``
584
585        Raises:
586            ValueError: If ``key`` or ``scope`` contain invalid characters.
587        """
588        if not self._OPTION_KEY_RE.match(key):
589            raise ValueError(
590                f"Invalid option key: {key!r}. Keys must contain only alphanumeric characters and underscores."
591            )
592
593        cat = quote_identifier(self._name)
594        key_sql = f"'{escape_string_literal(key)}'"
595        value_sql = f"'{escape_string_literal(value)}'"
596        if scope is not None:
597            parts = scope.split(".", 1)
598            if len(parts) == 2:
599                schema_name, table_name = parts
600            else:
601                schema_name = "main"
602                table_name = parts[0]
603            for part_label, part_value in [("schema", schema_name), ("table", table_name)]:
604                if not self._SCOPE_PART_RE.match(part_value):
605                    raise ValueError(
606                        f"Invalid scope {part_label}: {part_value!r}. "
607                        "Scope parts must contain only alphanumeric characters and underscores."
608                    )
609            self._execute(
610                f"CALL {cat}.set_option("
611                f"{key_sql}, {value_sql}, "
612                f"table_name := '{escape_string_literal(table_name)}', "
613                f"schema := '{escape_string_literal(schema_name)}')"
614            )
615        else:
616            self._execute(f"CALL {cat}.set_option({key_sql}, {value_sql})")

Set a Ducklake configuration option.

Args: key: Option key (e.g., 'target_file_size'). value: Option value. scope: Optional 'schema.table' for table-level scope. None for global scope.

Uses: CALL {catalog}.set_option(option, value, table_name, schema)

Raises: ValueError: If key or scope contain invalid characters.

def get_options(self) -> pyarrow.lib.Table:
618    def get_options(self) -> pa.Table:
619        """Get all configuration options as an Arrow table.
620
621        Columns: option_name, description, value, scope, scope_entry
622        """
623        cat = quote_identifier(self._name)
624        result: Any = self._conn.execute(f"SELECT * FROM {cat}.options()")
625        arrow_obj: Any = result.arrow()
626        if isinstance(arrow_obj, pa.Table):
627            return arrow_obj
628        tbl: pa.Table = arrow_obj.read_all()
629        return tbl

Get all configuration options as an Arrow table.

Columns: option_name, description, value, scope, scope_entry

def begin_transaction(self) -> Transaction:
633    def begin_transaction(self) -> Transaction:
634        """Begin a multi-operation transaction.
635
636        All table operations within the transaction are committed atomically.
637        """
638        from pyducklake.transaction import Transaction
639
640        return Transaction(self)

Begin a multi-operation transaction.

All table operations within the transaction are committed atomically.

def close(self) -> None:
650    def close(self) -> None:
651        """Close the DuckDB connection."""
652        self._conn.close()

Close the DuckDB connection.

class ChangeSet:
 19class ChangeSet:
 20    """Result of a CDC query with helpers for analyzing changes."""
 21
 22    def __init__(
 23        self,
 24        arrow_table: pa.Table,
 25        change_type_col: str | None = "change_type",
 26    ) -> None:
 27        self._table = arrow_table
 28        self._change_type_col = change_type_col
 29
 30    # -- Accessors -------------------------------------------------------------
 31
 32    def to_arrow(self) -> pa.Table:
 33        """Return the raw Arrow table."""
 34        return self._table
 35
 36    def to_pandas(self) -> Any:
 37        """Return as pandas DataFrame.
 38
 39        Requires pandas to be installed separately.
 40        """
 41        if importlib.util.find_spec("pandas") is None:
 42            raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas")
 43        return self._table.to_pandas()
 44
 45    @property
 46    def num_rows(self) -> int:
 47        """Number of rows in the result."""
 48        result: int = self._table.num_rows
 49        return result
 50
 51    @property
 52    def column_names(self) -> list[str]:
 53        """Column names in the result."""
 54        result: list[str] = self._table.column_names
 55        return result
 56
 57    # -- Change type filtering -------------------------------------------------
 58
 59    def _require_change_type(self) -> str:
 60        if self._change_type_col is None:
 61            raise ValueError(
 62                "This ChangeSet has no change_type column. "
 63                "Change type filtering is only available on table_changes() results."
 64            )
 65        return self._change_type_col
 66
 67    def _filter_by_type(self, change_type: str) -> pa.Table:
 68        col = self._require_change_type()
 69        mask = pc.equal(self._table.column(col), pa.scalar(change_type))
 70        return self._table.filter(mask)
 71
 72    def inserts(self) -> pa.Table:
 73        """Return only inserted rows (change_type = 'insert')."""
 74        return self._filter_by_type("insert")
 75
 76    def deletes(self) -> pa.Table:
 77        """Return only deleted rows (change_type = 'delete')."""
 78        return self._filter_by_type("delete")
 79
 80    def update_preimages(self) -> pa.Table:
 81        """Return pre-update row images (change_type = 'update_preimage')."""
 82        return self._filter_by_type("update_preimage")
 83
 84    def update_postimages(self) -> pa.Table:
 85        """Return post-update row images (change_type = 'update_postimage')."""
 86        return self._filter_by_type("update_postimage")
 87
 88    def updates(self) -> list[tuple[dict[str, Any], dict[str, Any]]]:
 89        """Return paired (old_row, new_row) for each update.
 90
 91        Correlates update_preimage and update_postimage rows by rowid.
 92        Returns list of (preimage_dict, postimage_dict) tuples.
 93        """
 94        pre = self.update_preimages()
 95        post = self.update_postimages()
 96
 97        pre_by_rowid: dict[Any, dict[str, Any]] = {}
 98        for row in pre.to_pylist():
 99            pre_by_rowid[row.get("rowid")] = row
100
101        pairs: list[tuple[dict[str, Any], dict[str, Any]]] = []
102        for row in post.to_pylist():
103            rid = row.get("rowid")
104            if rid in pre_by_rowid:
105                pairs.append((pre_by_rowid[rid], row))
106
107        return pairs
108
109    def has_updates(self) -> bool:
110        """Whether there are any update changes."""
111        col = self._require_change_type()
112        types = self._table.column(col).to_pylist()
113        return "update_preimage" in types or "update_postimage" in types
114
115    def summary(self) -> dict[str, int]:
116        """Return count of each change type.
117
118        Example: {"insert": 5, "delete": 2, "update_preimage": 1, "update_postimage": 1}
119        """
120        col = self._require_change_type()
121        counts: dict[str, int] = {}
122        for ct in self._table.column(col).to_pylist():
123            key = str(ct)
124            counts[key] = counts.get(key, 0) + 1
125        return counts
126
127    def __repr__(self) -> str:
128        return f"ChangeSet(num_rows={self.num_rows}, columns={self.column_names})"

Result of a CDC query with helpers for analyzing changes.

ChangeSet( arrow_table: pyarrow.lib.Table, change_type_col: str | None = 'change_type')
22    def __init__(
23        self,
24        arrow_table: pa.Table,
25        change_type_col: str | None = "change_type",
26    ) -> None:
27        self._table = arrow_table
28        self._change_type_col = change_type_col
def to_arrow(self) -> pyarrow.lib.Table:
32    def to_arrow(self) -> pa.Table:
33        """Return the raw Arrow table."""
34        return self._table

Return the raw Arrow table.

def to_pandas(self) -> Any:
36    def to_pandas(self) -> Any:
37        """Return as pandas DataFrame.
38
39        Requires pandas to be installed separately.
40        """
41        if importlib.util.find_spec("pandas") is None:
42            raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas")
43        return self._table.to_pandas()

Return as pandas DataFrame.

Requires pandas to be installed separately.

num_rows: int
45    @property
46    def num_rows(self) -> int:
47        """Number of rows in the result."""
48        result: int = self._table.num_rows
49        return result

Number of rows in the result.

column_names: list[str]
51    @property
52    def column_names(self) -> list[str]:
53        """Column names in the result."""
54        result: list[str] = self._table.column_names
55        return result

Column names in the result.

def inserts(self) -> pyarrow.lib.Table:
72    def inserts(self) -> pa.Table:
73        """Return only inserted rows (change_type = 'insert')."""
74        return self._filter_by_type("insert")

Return only inserted rows (change_type = 'insert').

def deletes(self) -> pyarrow.lib.Table:
76    def deletes(self) -> pa.Table:
77        """Return only deleted rows (change_type = 'delete')."""
78        return self._filter_by_type("delete")

Return only deleted rows (change_type = 'delete').

def update_preimages(self) -> pyarrow.lib.Table:
80    def update_preimages(self) -> pa.Table:
81        """Return pre-update row images (change_type = 'update_preimage')."""
82        return self._filter_by_type("update_preimage")

Return pre-update row images (change_type = 'update_preimage').

def update_postimages(self) -> pyarrow.lib.Table:
84    def update_postimages(self) -> pa.Table:
85        """Return post-update row images (change_type = 'update_postimage')."""
86        return self._filter_by_type("update_postimage")

Return post-update row images (change_type = 'update_postimage').

def updates(self) -> list[tuple[dict[str, typing.Any], dict[str, typing.Any]]]:
 88    def updates(self) -> list[tuple[dict[str, Any], dict[str, Any]]]:
 89        """Return paired (old_row, new_row) for each update.
 90
 91        Correlates update_preimage and update_postimage rows by rowid.
 92        Returns list of (preimage_dict, postimage_dict) tuples.
 93        """
 94        pre = self.update_preimages()
 95        post = self.update_postimages()
 96
 97        pre_by_rowid: dict[Any, dict[str, Any]] = {}
 98        for row in pre.to_pylist():
 99            pre_by_rowid[row.get("rowid")] = row
100
101        pairs: list[tuple[dict[str, Any], dict[str, Any]]] = []
102        for row in post.to_pylist():
103            rid = row.get("rowid")
104            if rid in pre_by_rowid:
105                pairs.append((pre_by_rowid[rid], row))
106
107        return pairs

Return paired (old_row, new_row) for each update.

Correlates update_preimage and update_postimage rows by rowid. Returns list of (preimage_dict, postimage_dict) tuples.

def has_updates(self) -> bool:
109    def has_updates(self) -> bool:
110        """Whether there are any update changes."""
111        col = self._require_change_type()
112        types = self._table.column(col).to_pylist()
113        return "update_preimage" in types or "update_postimage" in types

Whether there are any update changes.

def summary(self) -> dict[str, int]:
115    def summary(self) -> dict[str, int]:
116        """Return count of each change type.
117
118        Example: {"insert": 5, "delete": 2, "update_preimage": 1, "update_postimage": 1}
119        """
120        col = self._require_change_type()
121        counts: dict[str, int] = {}
122        for ct in self._table.column(col).to_pylist():
123            key = str(ct)
124            counts[key] = counts.get(key, 0) + 1
125        return counts

Return count of each change type.

Example: {"insert": 5, "delete": 2, "update_preimage": 1, "update_postimage": 1}

class CommitFailedError(pyducklake.DucklakeError):
43class CommitFailedError(DucklakeError):
44    """Raised when a commit operation fails."""

Raised when a commit operation fails.

class DataScan:
 50class DataScan:
 51    """Scans a Ducklake table with optional filtering, column selection, and time travel.
 52
 53    Immutable builder: each method returns a new DataScan instance.
 54    """
 55
 56    __slots__ = ("_table", "_row_filter", "_selected_fields", "_snapshot_id", "_timestamp", "_limit")
 57
 58    def __init__(
 59        self,
 60        table: Table | View,
 61        row_filter: BooleanExpression = AlwaysTrue(),
 62        selected_fields: tuple[str, ...] = ("*",),
 63        snapshot_id: int | None = None,
 64        limit: int | None = None,
 65        timestamp: datetime | None = None,
 66    ) -> None:
 67        self._table = table
 68        self._row_filter = row_filter
 69        self._selected_fields = selected_fields
 70        self._snapshot_id = snapshot_id
 71        self._timestamp = timestamp
 72        self._limit = limit
 73
 74    # -- Builder methods (return new DataScan) ---------------------------------
 75
 76    def filter(self, expr: BooleanExpression | str) -> DataScan:
 77        """Add a row filter.
 78
 79        If string, wrap in a raw SQL expression.
 80        If BooleanExpression, combine with existing filter via And.
 81        """
 82        if isinstance(expr, str):
 83            new_filter: BooleanExpression = RawSQL(expr)
 84        else:
 85            new_filter = expr
 86
 87        combined = And(self._row_filter, new_filter)
 88        return DataScan(
 89            table=self._table,
 90            row_filter=combined,
 91            selected_fields=self._selected_fields,
 92            snapshot_id=self._snapshot_id,
 93            limit=self._limit,
 94            timestamp=self._timestamp,
 95        )
 96
 97    def select(self, *fields: str) -> DataScan:
 98        """Select specific columns. Replaces any previous selection."""
 99        return DataScan(
100            table=self._table,
101            row_filter=self._row_filter,
102            selected_fields=fields,
103            snapshot_id=self._snapshot_id,
104            limit=self._limit,
105            timestamp=self._timestamp,
106        )
107
108    def with_snapshot(self, snapshot_id: int) -> DataScan:
109        """Time travel to a specific snapshot version."""
110        return DataScan(
111            table=self._table,
112            row_filter=self._row_filter,
113            selected_fields=self._selected_fields,
114            snapshot_id=snapshot_id,
115            limit=self._limit,
116            timestamp=None,
117        )
118
119    def with_timestamp(self, timestamp: datetime) -> DataScan:
120        """Time travel to data as of a specific timestamp.
121
122        Uses AT (TIMESTAMP => ...) syntax.
123        """
124        return DataScan(
125            table=self._table,
126            row_filter=self._row_filter,
127            selected_fields=self._selected_fields,
128            snapshot_id=None,
129            limit=self._limit,
130            timestamp=timestamp,
131        )
132
133    def with_limit(self, limit: int) -> DataScan:
134        """Limit number of rows returned."""
135        return DataScan(
136            table=self._table,
137            row_filter=self._row_filter,
138            selected_fields=self._selected_fields,
139            snapshot_id=self._snapshot_id,
140            limit=limit,
141            timestamp=self._timestamp,
142        )
143
144    # -- Terminal methods ------------------------------------------------------
145
146    def to_arrow(self) -> pa.Table:
147        """Execute scan and return PyArrow Table."""
148        sql = self._build_sql()
149        result = self._table.catalog.connection.execute(sql)
150        return result.fetch_arrow_table()
151
152    def to_pandas(self) -> pd.DataFrame:
153        """Execute scan and return pandas DataFrame.
154
155        Requires pandas to be installed separately.
156        """
157        if importlib.util.find_spec("pandas") is None:
158            raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas")
159        sql = self._build_sql()
160        result = self._table.catalog.connection.execute(sql)
161        df: pd.DataFrame = result.fetchdf()
162        return df
163
164    def to_duckdb(self, *, connection: duckdb.DuckDBPyConnection | None = None) -> duckdb.DuckDBPyRelation:
165        """Execute scan and return a DuckDB relation.
166
167        If connection is None, uses the catalog's connection.
168        """
169        sql = self._build_sql()
170        conn = connection if connection is not None else self._table.catalog.connection
171        return conn.sql(sql)
172
173    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
174        """Execute scan and return a streaming Arrow RecordBatchReader."""
175        sql = self._build_sql()
176        result = self._table.catalog.connection.execute(sql)
177        return result.fetch_record_batch()
178
179    def to_polars(self) -> pl.DataFrame:
180        """Execute scan and return a Polars DataFrame.
181
182        Converts via Arrow (to_arrow() then pl.from_arrow()).
183        Requires polars to be installed separately.
184        """
185        try:
186            import polars as pl_mod
187        except ImportError:
188            raise ImportError("polars is required for to_polars(). Install it with: pip install polars") from None
189        arrow_table = self.to_arrow()
190        result = pl_mod.from_arrow(arrow_table)
191        assert isinstance(result, pl_mod.DataFrame)
192        return result
193
194    def to_ray(self) -> ray.data.Dataset:
195        """Execute scan and return a Ray Dataset.
196
197        Converts via Arrow.
198        Requires ray to be installed separately.
199        """
200        try:
201            import ray.data  # pyright: ignore[reportMissingImports]
202        except ImportError:
203            raise ImportError("ray is required for to_ray(). Install it with: pip install 'ray[data]'") from None
204        arrow_table = self.to_arrow()
205        dataset: ray.data.Dataset = ray.data.from_arrow(arrow_table)
206        return dataset
207
208    def to_arrow_dataset(self) -> ds.Dataset:
209        """Return scan results as a PyArrow Dataset.
210
211        Preserves any filters, projections, and time travel settings.
212        """
213        import pyarrow.dataset as ds
214
215        return ds.dataset(self.to_arrow())
216
217    def count(self) -> int:
218        """Return row count (executes SELECT COUNT(*))."""
219        sql = self._build_count_sql()
220        rows = self._table.catalog.fetchall(sql)
221        return int(rows[0][0])
222
223    # -- SQL generation --------------------------------------------------------
224
225    def _build_sql(self) -> str:
226        """Build the SELECT SQL statement."""
227        columns = self._format_columns()
228        table_ref = self._format_table_ref()
229        sql = f"SELECT {columns} FROM {table_ref}"
230        sql = self._append_where(sql)
231        sql = self._append_limit(sql)
232        return sql
233
234    def _build_count_sql(self) -> str:
235        """Build a SELECT COUNT(*) SQL statement."""
236        table_ref = self._format_table_ref()
237        sql = f"SELECT COUNT(*) FROM {table_ref}"
238        sql = self._append_where(sql)
239        sql = self._append_limit(sql)
240        return sql
241
242    def _format_columns(self) -> str:
243        if self._selected_fields == ("*",):
244            return "*"
245        return ", ".join(f'"{col}"' for col in self._selected_fields)
246
247    def _format_table_ref(self) -> str:
248        fqn = self._table.fully_qualified_name
249        if self._snapshot_id is not None and self._timestamp is not None:
250            raise ValueError("Cannot set both snapshot_id and timestamp for time travel")
251        if self._snapshot_id is not None:
252            return f"{fqn} AT (VERSION => {self._snapshot_id})"
253        if self._timestamp is not None:
254            ts_str = self._timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")
255            return f"{fqn} AT (TIMESTAMP => '{ts_str}')"
256        return fqn
257
258    def _append_where(self, sql: str) -> str:
259        if not _is_always_true(self._row_filter):
260            sql += f" WHERE {self._row_filter.to_sql()}"
261        return sql
262
263    def _append_limit(self, sql: str) -> str:
264        if self._limit is not None:
265            sql += f" LIMIT {self._limit}"
266        return sql

Scans a Ducklake table with optional filtering, column selection, and time travel.

Immutable builder: each method returns a new DataScan instance.

DataScan( table: 'Table | View', row_filter: BooleanExpression = AlwaysTrue(), selected_fields: tuple[str, ...] = ('*',), snapshot_id: int | None = None, limit: int | None = None, timestamp: datetime.datetime | None = None)
58    def __init__(
59        self,
60        table: Table | View,
61        row_filter: BooleanExpression = AlwaysTrue(),
62        selected_fields: tuple[str, ...] = ("*",),
63        snapshot_id: int | None = None,
64        limit: int | None = None,
65        timestamp: datetime | None = None,
66    ) -> None:
67        self._table = table
68        self._row_filter = row_filter
69        self._selected_fields = selected_fields
70        self._snapshot_id = snapshot_id
71        self._timestamp = timestamp
72        self._limit = limit
def filter( self, expr: BooleanExpression | str) -> DataScan:
76    def filter(self, expr: BooleanExpression | str) -> DataScan:
77        """Add a row filter.
78
79        If string, wrap in a raw SQL expression.
80        If BooleanExpression, combine with existing filter via And.
81        """
82        if isinstance(expr, str):
83            new_filter: BooleanExpression = RawSQL(expr)
84        else:
85            new_filter = expr
86
87        combined = And(self._row_filter, new_filter)
88        return DataScan(
89            table=self._table,
90            row_filter=combined,
91            selected_fields=self._selected_fields,
92            snapshot_id=self._snapshot_id,
93            limit=self._limit,
94            timestamp=self._timestamp,
95        )

Add a row filter.

If string, wrap in a raw SQL expression. If BooleanExpression, combine with existing filter via And.

def select(self, *fields: str) -> DataScan:
 97    def select(self, *fields: str) -> DataScan:
 98        """Select specific columns. Replaces any previous selection."""
 99        return DataScan(
100            table=self._table,
101            row_filter=self._row_filter,
102            selected_fields=fields,
103            snapshot_id=self._snapshot_id,
104            limit=self._limit,
105            timestamp=self._timestamp,
106        )

Select specific columns. Replaces any previous selection.

def with_snapshot(self, snapshot_id: int) -> DataScan:
108    def with_snapshot(self, snapshot_id: int) -> DataScan:
109        """Time travel to a specific snapshot version."""
110        return DataScan(
111            table=self._table,
112            row_filter=self._row_filter,
113            selected_fields=self._selected_fields,
114            snapshot_id=snapshot_id,
115            limit=self._limit,
116            timestamp=None,
117        )

Time travel to a specific snapshot version.

def with_timestamp(self, timestamp: datetime.datetime) -> DataScan:
119    def with_timestamp(self, timestamp: datetime) -> DataScan:
120        """Time travel to data as of a specific timestamp.
121
122        Uses AT (TIMESTAMP => ...) syntax.
123        """
124        return DataScan(
125            table=self._table,
126            row_filter=self._row_filter,
127            selected_fields=self._selected_fields,
128            snapshot_id=None,
129            limit=self._limit,
130            timestamp=timestamp,
131        )

Time travel to data as of a specific timestamp.

Uses AT (TIMESTAMP => ...) syntax.

def with_limit(self, limit: int) -> DataScan:
133    def with_limit(self, limit: int) -> DataScan:
134        """Limit number of rows returned."""
135        return DataScan(
136            table=self._table,
137            row_filter=self._row_filter,
138            selected_fields=self._selected_fields,
139            snapshot_id=self._snapshot_id,
140            limit=limit,
141            timestamp=self._timestamp,
142        )

Limit number of rows returned.

def to_arrow(self) -> pyarrow.lib.Table:
146    def to_arrow(self) -> pa.Table:
147        """Execute scan and return PyArrow Table."""
148        sql = self._build_sql()
149        result = self._table.catalog.connection.execute(sql)
150        return result.fetch_arrow_table()

Execute scan and return PyArrow Table.

def to_pandas(self) -> pandas.DataFrame:
152    def to_pandas(self) -> pd.DataFrame:
153        """Execute scan and return pandas DataFrame.
154
155        Requires pandas to be installed separately.
156        """
157        if importlib.util.find_spec("pandas") is None:
158            raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas")
159        sql = self._build_sql()
160        result = self._table.catalog.connection.execute(sql)
161        df: pd.DataFrame = result.fetchdf()
162        return df

Execute scan and return pandas DataFrame.

Requires pandas to be installed separately.

def to_duckdb( self, *, connection: _duckdb.DuckDBPyConnection | None = None) -> _duckdb.DuckDBPyRelation:
164    def to_duckdb(self, *, connection: duckdb.DuckDBPyConnection | None = None) -> duckdb.DuckDBPyRelation:
165        """Execute scan and return a DuckDB relation.
166
167        If connection is None, uses the catalog's connection.
168        """
169        sql = self._build_sql()
170        conn = connection if connection is not None else self._table.catalog.connection
171        return conn.sql(sql)

Execute scan and return a DuckDB relation.

If connection is None, uses the catalog's connection.

def to_arrow_batch_reader(self) -> pyarrow.lib.RecordBatchReader:
173    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
174        """Execute scan and return a streaming Arrow RecordBatchReader."""
175        sql = self._build_sql()
176        result = self._table.catalog.connection.execute(sql)
177        return result.fetch_record_batch()

Execute scan and return a streaming Arrow RecordBatchReader.

def to_polars(self) -> polars.dataframe.frame.DataFrame:
179    def to_polars(self) -> pl.DataFrame:
180        """Execute scan and return a Polars DataFrame.
181
182        Converts via Arrow (to_arrow() then pl.from_arrow()).
183        Requires polars to be installed separately.
184        """
185        try:
186            import polars as pl_mod
187        except ImportError:
188            raise ImportError("polars is required for to_polars(). Install it with: pip install polars") from None
189        arrow_table = self.to_arrow()
190        result = pl_mod.from_arrow(arrow_table)
191        assert isinstance(result, pl_mod.DataFrame)
192        return result

Execute scan and return a Polars DataFrame.

Converts via Arrow (to_arrow() then pl.from_arrow()). Requires polars to be installed separately.

def to_ray(self) -> 'ray.data.Dataset':
194    def to_ray(self) -> ray.data.Dataset:
195        """Execute scan and return a Ray Dataset.
196
197        Converts via Arrow.
198        Requires ray to be installed separately.
199        """
200        try:
201            import ray.data  # pyright: ignore[reportMissingImports]
202        except ImportError:
203            raise ImportError("ray is required for to_ray(). Install it with: pip install 'ray[data]'") from None
204        arrow_table = self.to_arrow()
205        dataset: ray.data.Dataset = ray.data.from_arrow(arrow_table)
206        return dataset

Execute scan and return a Ray Dataset.

Converts via Arrow. Requires ray to be installed separately.

def to_arrow_dataset(self) -> pyarrow._dataset.Dataset:
208    def to_arrow_dataset(self) -> ds.Dataset:
209        """Return scan results as a PyArrow Dataset.
210
211        Preserves any filters, projections, and time travel settings.
212        """
213        import pyarrow.dataset as ds
214
215        return ds.dataset(self.to_arrow())

Return scan results as a PyArrow Dataset.

Preserves any filters, projections, and time travel settings.

def count(self) -> int:
217    def count(self) -> int:
218        """Return row count (executes SELECT COUNT(*))."""
219        sql = self._build_count_sql()
220        rows = self._table.catalog.fetchall(sql)
221        return int(rows[0][0])

Return row count (executes SELECT COUNT(*)).

class DucklakeError(builtins.Exception):
19class DucklakeError(Exception):
20    """Base exception for all Ducklake errors."""

Base exception for all Ducklake errors.

@dataclass(frozen=True)
class EqualTo(pyducklake.BooleanExpression):
217@dataclass(frozen=True)
218class EqualTo(BooleanExpression):
219    """Column equals value."""
220
221    term: str
222    value: Any
223
224    def to_sql(self) -> str:
225        return f"{_quote_column(self.term)} = {_format_value(self.value)}"
226
227    def __repr__(self) -> str:
228        return f"EqualTo(term={self.term!r}, value={self.value!r})"

Column equals value.

EqualTo(term: str, value: Any)
term: str
value: Any
def to_sql(self) -> str:
224    def to_sql(self) -> str:
225        return f"{_quote_column(self.term)} = {_format_value(self.value)}"
@dataclass(frozen=True)
class GreaterThan(pyducklake.BooleanExpression):
245@dataclass(frozen=True)
246class GreaterThan(BooleanExpression):
247    """Column greater than value."""
248
249    term: str
250    value: Any
251
252    def to_sql(self) -> str:
253        return f"{_quote_column(self.term)} > {_format_value(self.value)}"
254
255    def __repr__(self) -> str:
256        return f"GreaterThan(term={self.term!r}, value={self.value!r})"

Column greater than value.

GreaterThan(term: str, value: Any)
term: str
value: Any
def to_sql(self) -> str:
252    def to_sql(self) -> str:
253        return f"{_quote_column(self.term)} > {_format_value(self.value)}"
@dataclass(frozen=True)
class GreaterThanOrEqual(pyducklake.BooleanExpression):
259@dataclass(frozen=True)
260class GreaterThanOrEqual(BooleanExpression):
261    """Column greater than or equal to value."""
262
263    term: str
264    value: Any
265
266    def to_sql(self) -> str:
267        return f"{_quote_column(self.term)} >= {_format_value(self.value)}"
268
269    def __repr__(self) -> str:
270        return f"GreaterThanOrEqual(term={self.term!r}, value={self.value!r})"

Column greater than or equal to value.

GreaterThanOrEqual(term: str, value: Any)
term: str
value: Any
def to_sql(self) -> str:
266    def to_sql(self) -> str:
267        return f"{_quote_column(self.term)} >= {_format_value(self.value)}"
@dataclass(frozen=True)
class In(pyducklake.BooleanExpression):
304@dataclass(frozen=True)
305class In(BooleanExpression):
306    """Column value in a set of values."""
307
308    term: str
309    values: tuple[Any, ...]
310
311    def __new__(cls, term: str, values: tuple[Any, ...]) -> BooleanExpression:  # type: ignore[misc]
312        if not values:
313            return AlwaysFalse()
314        instance = super().__new__(cls)
315        return instance
316
317    def to_sql(self) -> str:
318        vals = ", ".join(_format_value(v) for v in self.values)
319        return f"{_quote_column(self.term)} IN ({vals})"
320
321    def __repr__(self) -> str:
322        return f"In(term={self.term!r}, values={self.values!r})"

Column value in a set of values.

In(term: str, values: tuple[typing.Any, ...])
term: str
values: tuple[typing.Any, ...]
def to_sql(self) -> str:
317    def to_sql(self) -> str:
318        vals = ", ".join(_format_value(v) for v in self.values)
319        return f"{_quote_column(self.term)} IN ({vals})"
class InspectTable:
 31class InspectTable:
 32    """Metadata introspection for a Ducklake table.
 33
 34    Obtained via ``table.inspect()``.
 35    """
 36
 37    def __init__(self, table: Table) -> None:
 38        self._table = table
 39
 40    def snapshots(self) -> pa.Table:
 41        """Return all snapshots as an Arrow table.
 42
 43        Columns: snapshot_id, snapshot_time, schema_version, changes,
 44                 author, commit_message, commit_extra_info
 45        """
 46        catalog_name = self._table.catalog.name
 47        result = self._table.catalog.connection.execute(
 48            f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id'
 49        )
 50        return _to_arrow_table(result)
 51
 52    def files(
 53        self,
 54        snapshot_id: int | None = None,
 55        snapshot_time: str | None = None,
 56    ) -> pa.Table:
 57        """Return data files for the table, optionally at a specific snapshot.
 58
 59        Columns returned: data_file, data_file_size_bytes,
 60        data_file_footer_size, data_file_encryption_key, delete_file,
 61        delete_file_size_bytes, delete_file_footer_size,
 62        delete_file_encryption_key.
 63
 64        Args:
 65            snapshot_id: Fetch files at a specific snapshot version.
 66            snapshot_time: Fetch files at a specific timestamp
 67                (e.g. ``'2025-06-16 15:24:30'``).
 68
 69        Raises:
 70            ValueError: If both ``snapshot_id`` and ``snapshot_time``
 71                are provided.
 72
 73        Uses ``ducklake_list_files()`` function.
 74        """
 75        if snapshot_id is not None and snapshot_time is not None:
 76            raise ValueError("Cannot specify both snapshot_id and snapshot_time")
 77
 78        from pyducklake.catalog import escape_string_literal
 79
 80        catalog_name = self._table.catalog.name
 81        table_name = self._table.name
 82        schema_name = self._table.namespace
 83        sql = (
 84            f"SELECT * FROM ducklake_list_files('{escape_string_literal(catalog_name)}', "
 85            f"'{escape_string_literal(table_name)}', schema := '{escape_string_literal(schema_name)}'"
 86        )
 87        if snapshot_id is not None:
 88            sql += f", snapshot_version := {snapshot_id}::BIGINT"
 89        if snapshot_time is not None:
 90            sql += f", snapshot_time := '{escape_string_literal(snapshot_time)}'"
 91        sql += ")"
 92        result = self._table.catalog.connection.execute(sql)
 93        return _to_arrow_table(result)
 94
 95    def history(self) -> pa.Table:
 96        """Return snapshot history as an Arrow table, newest-first."""
 97        catalog_name = self._table.catalog.name
 98        result = self._table.catalog.connection.execute(
 99            f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id DESC'
100        )
101        return _to_arrow_table(result)
102
103    def partitions(self) -> pa.Table:
104        """Return partition info as an Arrow table.
105
106        Columns: partition_id, column_id, transform
107        If no partitioning, returns an empty table.
108        """
109        from pyducklake.catalog import escape_string_literal
110
111        catalog_name = self._table.catalog.name
112        meta_schema = f"__ducklake_metadata_{catalog_name}"
113
114        # Get table_id first
115        esc_name = escape_string_literal(self._table.name)
116        esc_ns = escape_string_literal(self._table.namespace)
117        table_rows = self._table.catalog.fetchall(
118            f'SELECT t.table_id FROM "{meta_schema}".ducklake_table t'
119            f' JOIN "{meta_schema}".ducklake_schema s ON t.schema_id = s.schema_id'
120            f" WHERE t.table_name = '{esc_name}'"
121            f" AND s.schema_name = '{esc_ns}'"
122        )
123        if not table_rows:
124            return pa.table(
125                {
126                    "partition_id": pa.array([], type=pa.int64()),
127                    "column_id": pa.array([], type=pa.int64()),
128                    "transform": pa.array([], type=pa.string()),
129                }
130            )
131
132        table_id = table_rows[0][0]
133
134        try:
135            rows = self._table.catalog.fetchall(
136                f"SELECT pc.partition_id, pc.column_id, pc.transform "
137                f'FROM "{meta_schema}".ducklake_partition_column pc '
138                f'JOIN "{meta_schema}".ducklake_partition_info pi '
139                f"ON pc.partition_id = pi.partition_id "
140                f"AND pc.table_id = pi.table_id "
141                f"WHERE pc.table_id = {table_id} "
142                f"AND pi.end_snapshot IS NULL"
143            )
144        except (duckdb.CatalogException, duckdb.BinderException):
145            rows = []
146
147        if not rows:
148            return pa.table(
149                {
150                    "partition_id": pa.array([], type=pa.int64()),
151                    "column_id": pa.array([], type=pa.int64()),
152                    "transform": pa.array([], type=pa.string()),
153                }
154            )
155
156        return pa.table(
157            {
158                "partition_id": pa.array([r[0] for r in rows], type=pa.int64()),
159                "column_id": pa.array([r[1] for r in rows], type=pa.int64()),
160                "transform": pa.array([str(r[2]) for r in rows], type=pa.string()),
161            }
162        )

Metadata introspection for a Ducklake table.

Obtained via table.inspect().

InspectTable(table: Table)
37    def __init__(self, table: Table) -> None:
38        self._table = table
def snapshots(self) -> pyarrow.lib.Table:
40    def snapshots(self) -> pa.Table:
41        """Return all snapshots as an Arrow table.
42
43        Columns: snapshot_id, snapshot_time, schema_version, changes,
44                 author, commit_message, commit_extra_info
45        """
46        catalog_name = self._table.catalog.name
47        result = self._table.catalog.connection.execute(
48            f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id'
49        )
50        return _to_arrow_table(result)

Return all snapshots as an Arrow table.

Columns: snapshot_id, snapshot_time, schema_version, changes, author, commit_message, commit_extra_info

def files( self, snapshot_id: int | None = None, snapshot_time: str | None = None) -> pyarrow.lib.Table:
52    def files(
53        self,
54        snapshot_id: int | None = None,
55        snapshot_time: str | None = None,
56    ) -> pa.Table:
57        """Return data files for the table, optionally at a specific snapshot.
58
59        Columns returned: data_file, data_file_size_bytes,
60        data_file_footer_size, data_file_encryption_key, delete_file,
61        delete_file_size_bytes, delete_file_footer_size,
62        delete_file_encryption_key.
63
64        Args:
65            snapshot_id: Fetch files at a specific snapshot version.
66            snapshot_time: Fetch files at a specific timestamp
67                (e.g. ``'2025-06-16 15:24:30'``).
68
69        Raises:
70            ValueError: If both ``snapshot_id`` and ``snapshot_time``
71                are provided.
72
73        Uses ``ducklake_list_files()`` function.
74        """
75        if snapshot_id is not None and snapshot_time is not None:
76            raise ValueError("Cannot specify both snapshot_id and snapshot_time")
77
78        from pyducklake.catalog import escape_string_literal
79
80        catalog_name = self._table.catalog.name
81        table_name = self._table.name
82        schema_name = self._table.namespace
83        sql = (
84            f"SELECT * FROM ducklake_list_files('{escape_string_literal(catalog_name)}', "
85            f"'{escape_string_literal(table_name)}', schema := '{escape_string_literal(schema_name)}'"
86        )
87        if snapshot_id is not None:
88            sql += f", snapshot_version := {snapshot_id}::BIGINT"
89        if snapshot_time is not None:
90            sql += f", snapshot_time := '{escape_string_literal(snapshot_time)}'"
91        sql += ")"
92        result = self._table.catalog.connection.execute(sql)
93        return _to_arrow_table(result)

Return data files for the table, optionally at a specific snapshot.

Columns returned: data_file, data_file_size_bytes, data_file_footer_size, data_file_encryption_key, delete_file, delete_file_size_bytes, delete_file_footer_size, delete_file_encryption_key.

Args: snapshot_id: Fetch files at a specific snapshot version. snapshot_time: Fetch files at a specific timestamp (e.g. '2025-06-16 15:24:30').

Raises: ValueError: If both snapshot_id and snapshot_time are provided.

Uses ducklake_list_files() function.

def history(self) -> pyarrow.lib.Table:
 95    def history(self) -> pa.Table:
 96        """Return snapshot history as an Arrow table, newest-first."""
 97        catalog_name = self._table.catalog.name
 98        result = self._table.catalog.connection.execute(
 99            f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id DESC'
100        )
101        return _to_arrow_table(result)

Return snapshot history as an Arrow table, newest-first.

def partitions(self) -> pyarrow.lib.Table:
103    def partitions(self) -> pa.Table:
104        """Return partition info as an Arrow table.
105
106        Columns: partition_id, column_id, transform
107        If no partitioning, returns an empty table.
108        """
109        from pyducklake.catalog import escape_string_literal
110
111        catalog_name = self._table.catalog.name
112        meta_schema = f"__ducklake_metadata_{catalog_name}"
113
114        # Get table_id first
115        esc_name = escape_string_literal(self._table.name)
116        esc_ns = escape_string_literal(self._table.namespace)
117        table_rows = self._table.catalog.fetchall(
118            f'SELECT t.table_id FROM "{meta_schema}".ducklake_table t'
119            f' JOIN "{meta_schema}".ducklake_schema s ON t.schema_id = s.schema_id'
120            f" WHERE t.table_name = '{esc_name}'"
121            f" AND s.schema_name = '{esc_ns}'"
122        )
123        if not table_rows:
124            return pa.table(
125                {
126                    "partition_id": pa.array([], type=pa.int64()),
127                    "column_id": pa.array([], type=pa.int64()),
128                    "transform": pa.array([], type=pa.string()),
129                }
130            )
131
132        table_id = table_rows[0][0]
133
134        try:
135            rows = self._table.catalog.fetchall(
136                f"SELECT pc.partition_id, pc.column_id, pc.transform "
137                f'FROM "{meta_schema}".ducklake_partition_column pc '
138                f'JOIN "{meta_schema}".ducklake_partition_info pi '
139                f"ON pc.partition_id = pi.partition_id "
140                f"AND pc.table_id = pi.table_id "
141                f"WHERE pc.table_id = {table_id} "
142                f"AND pi.end_snapshot IS NULL"
143            )
144        except (duckdb.CatalogException, duckdb.BinderException):
145            rows = []
146
147        if not rows:
148            return pa.table(
149                {
150                    "partition_id": pa.array([], type=pa.int64()),
151                    "column_id": pa.array([], type=pa.int64()),
152                    "transform": pa.array([], type=pa.string()),
153                }
154            )
155
156        return pa.table(
157            {
158                "partition_id": pa.array([r[0] for r in rows], type=pa.int64()),
159                "column_id": pa.array([r[1] for r in rows], type=pa.int64()),
160                "transform": pa.array([str(r[2]) for r in rows], type=pa.string()),
161            }
162        )

Return partition info as an Arrow table.

Columns: partition_id, column_id, transform If no partitioning, returns an empty table.

class MaintenanceTable:
 23class MaintenanceTable:
 24    """Table maintenance operations. Obtained via ``table.maintenance()``."""
 25
 26    def __init__(self, table: Table) -> None:
 27        self._table = table
 28
 29    def compact(
 30        self,
 31        *,
 32        min_file_size: int | None = None,
 33        max_file_size: int | None = None,
 34        max_compacted_files: int | None = None,
 35    ) -> None:
 36        """Merge small files into larger ones.
 37
 38        Operates at the catalog level (ducklake does not support per-table compaction).
 39        """
 40        from pyducklake.catalog import escape_string_literal
 41
 42        catalog_name = self._table.catalog.name
 43        conn = self._table.catalog.connection
 44
 45        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
 46        if min_file_size is not None:
 47            params.append(f"min_file_size := {min_file_size}::UBIGINT")
 48        if max_file_size is not None:
 49            params.append(f"max_file_size := {max_file_size}::UBIGINT")
 50        if max_compacted_files is not None:
 51            params.append(f"max_compacted_files := {max_compacted_files}::UBIGINT")
 52
 53        sql = f"CALL ducklake_merge_adjacent_files({', '.join(params)})"
 54        conn.execute(sql)
 55
 56    def rewrite_data_files(
 57        self,
 58        *,
 59        delete_threshold: float | None = None,
 60    ) -> None:
 61        """Rewrite files with excessive deletions.
 62
 63        Operates at the catalog level.
 64        """
 65        from pyducklake.catalog import escape_string_literal
 66
 67        catalog_name = self._table.catalog.name
 68        conn = self._table.catalog.connection
 69
 70        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
 71        if delete_threshold is not None:
 72            params.append(f"delete_threshold := {delete_threshold}")
 73
 74        sql = f"CALL ducklake_rewrite_data_files({', '.join(params)})"
 75        conn.execute(sql)
 76
 77    def expire_snapshots(
 78        self,
 79        *,
 80        older_than: str | None = None,
 81        versions: int | None = None,
 82        dry_run: bool = False,
 83    ) -> None:
 84        """Remove old snapshots.
 85
 86        Args:
 87            older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``).
 88            versions: Number of versions to keep.
 89            dry_run: If True, report what would be expired without acting.
 90        """
 91        from pyducklake.catalog import escape_string_literal
 92
 93        catalog_name = self._table.catalog.name
 94        conn = self._table.catalog.connection
 95
 96        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
 97        if older_than is not None:
 98            validate_older_than(older_than)
 99            params.append(f"older_than := '{escape_string_literal(older_than)}'")
100        if versions is not None:
101            params.append(f"versions := [{versions}::UBIGINT]")
102        if dry_run:
103            params.append("dry_run := true")
104
105        sql = f"CALL ducklake_expire_snapshots({', '.join(params)})"
106        conn.execute(sql)
107
108    def cleanup_files(
109        self,
110        *,
111        older_than: str | None = None,
112        dry_run: bool = False,
113    ) -> None:
114        """Delete files scheduled for removal.
115
116        Args:
117            older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``).
118            dry_run: If True, report what would be cleaned without acting.
119        """
120        from pyducklake.catalog import escape_string_literal
121
122        catalog_name = self._table.catalog.name
123        conn = self._table.catalog.connection
124
125        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
126        if older_than is not None:
127            validate_older_than(older_than)
128            params.append(f"older_than := '{escape_string_literal(older_than)}'")
129        if dry_run:
130            params.append("dry_run := true")
131
132        sql = f"CALL ducklake_cleanup_old_files({', '.join(params)})"
133        conn.execute(sql)
134
135    def delete_orphaned_files(self, *, dry_run: bool = False) -> None:
136        """Remove untracked files from storage."""
137        from pyducklake.catalog import escape_string_literal
138
139        catalog_name = self._table.catalog.name
140        conn = self._table.catalog.connection
141
142        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
143        if dry_run:
144            params.append("dry_run := true")
145
146        sql = f"CALL ducklake_delete_orphaned_files({', '.join(params)})"
147        conn.execute(sql)
148
149    def checkpoint(self) -> None:
150        """Run all maintenance operations sequentially."""
151        catalog_name = self._table.catalog.name
152        conn = self._table.catalog.connection
153        conn.execute(f'CHECKPOINT "{catalog_name}"')

Table maintenance operations. Obtained via table.maintenance().

MaintenanceTable(table: Table)
26    def __init__(self, table: Table) -> None:
27        self._table = table
def compact( self, *, min_file_size: int | None = None, max_file_size: int | None = None, max_compacted_files: int | None = None) -> None:
29    def compact(
30        self,
31        *,
32        min_file_size: int | None = None,
33        max_file_size: int | None = None,
34        max_compacted_files: int | None = None,
35    ) -> None:
36        """Merge small files into larger ones.
37
38        Operates at the catalog level (ducklake does not support per-table compaction).
39        """
40        from pyducklake.catalog import escape_string_literal
41
42        catalog_name = self._table.catalog.name
43        conn = self._table.catalog.connection
44
45        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
46        if min_file_size is not None:
47            params.append(f"min_file_size := {min_file_size}::UBIGINT")
48        if max_file_size is not None:
49            params.append(f"max_file_size := {max_file_size}::UBIGINT")
50        if max_compacted_files is not None:
51            params.append(f"max_compacted_files := {max_compacted_files}::UBIGINT")
52
53        sql = f"CALL ducklake_merge_adjacent_files({', '.join(params)})"
54        conn.execute(sql)

Merge small files into larger ones.

Operates at the catalog level (ducklake does not support per-table compaction).

def rewrite_data_files(self, *, delete_threshold: float | None = None) -> None:
56    def rewrite_data_files(
57        self,
58        *,
59        delete_threshold: float | None = None,
60    ) -> None:
61        """Rewrite files with excessive deletions.
62
63        Operates at the catalog level.
64        """
65        from pyducklake.catalog import escape_string_literal
66
67        catalog_name = self._table.catalog.name
68        conn = self._table.catalog.connection
69
70        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
71        if delete_threshold is not None:
72            params.append(f"delete_threshold := {delete_threshold}")
73
74        sql = f"CALL ducklake_rewrite_data_files({', '.join(params)})"
75        conn.execute(sql)

Rewrite files with excessive deletions.

Operates at the catalog level.

def expire_snapshots( self, *, older_than: str | None = None, versions: int | None = None, dry_run: bool = False) -> None:
 77    def expire_snapshots(
 78        self,
 79        *,
 80        older_than: str | None = None,
 81        versions: int | None = None,
 82        dry_run: bool = False,
 83    ) -> None:
 84        """Remove old snapshots.
 85
 86        Args:
 87            older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``).
 88            versions: Number of versions to keep.
 89            dry_run: If True, report what would be expired without acting.
 90        """
 91        from pyducklake.catalog import escape_string_literal
 92
 93        catalog_name = self._table.catalog.name
 94        conn = self._table.catalog.connection
 95
 96        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
 97        if older_than is not None:
 98            validate_older_than(older_than)
 99            params.append(f"older_than := '{escape_string_literal(older_than)}'")
100        if versions is not None:
101            params.append(f"versions := [{versions}::UBIGINT]")
102        if dry_run:
103            params.append("dry_run := true")
104
105        sql = f"CALL ducklake_expire_snapshots({', '.join(params)})"
106        conn.execute(sql)

Remove old snapshots.

Args: older_than: Timestamp string ('YYYY-MM-DD HH:MM:SS'). versions: Number of versions to keep. dry_run: If True, report what would be expired without acting.

def cleanup_files(self, *, older_than: str | None = None, dry_run: bool = False) -> None:
108    def cleanup_files(
109        self,
110        *,
111        older_than: str | None = None,
112        dry_run: bool = False,
113    ) -> None:
114        """Delete files scheduled for removal.
115
116        Args:
117            older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``).
118            dry_run: If True, report what would be cleaned without acting.
119        """
120        from pyducklake.catalog import escape_string_literal
121
122        catalog_name = self._table.catalog.name
123        conn = self._table.catalog.connection
124
125        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
126        if older_than is not None:
127            validate_older_than(older_than)
128            params.append(f"older_than := '{escape_string_literal(older_than)}'")
129        if dry_run:
130            params.append("dry_run := true")
131
132        sql = f"CALL ducklake_cleanup_old_files({', '.join(params)})"
133        conn.execute(sql)

Delete files scheduled for removal.

Args: older_than: Timestamp string ('YYYY-MM-DD HH:MM:SS'). dry_run: If True, report what would be cleaned without acting.

def delete_orphaned_files(self, *, dry_run: bool = False) -> None:
135    def delete_orphaned_files(self, *, dry_run: bool = False) -> None:
136        """Remove untracked files from storage."""
137        from pyducklake.catalog import escape_string_literal
138
139        catalog_name = self._table.catalog.name
140        conn = self._table.catalog.connection
141
142        params: list[str] = [f"'{escape_string_literal(catalog_name)}'"]
143        if dry_run:
144            params.append("dry_run := true")
145
146        sql = f"CALL ducklake_delete_orphaned_files({', '.join(params)})"
147        conn.execute(sql)

Remove untracked files from storage.

def checkpoint(self) -> None:
149    def checkpoint(self) -> None:
150        """Run all maintenance operations sequentially."""
151        catalog_name = self._table.catalog.name
152        conn = self._table.catalog.connection
153        conn.execute(f'CHECKPOINT "{catalog_name}"')

Run all maintenance operations sequentially.

@dataclass(frozen=True)
class IsNaN(pyducklake.BooleanExpression):
375@dataclass(frozen=True)
376class IsNaN(BooleanExpression):
377    """Column is NaN."""
378
379    term: str
380
381    def to_sql(self) -> str:
382        return f"isnan({_quote_column(self.term)})"
383
384    def __repr__(self) -> str:
385        return f"IsNaN(term={self.term!r})"

Column is NaN.

IsNaN(term: str)
term: str
def to_sql(self) -> str:
381    def to_sql(self) -> str:
382        return f"isnan({_quote_column(self.term)})"
@dataclass(frozen=True)
class IsNull(pyducklake.BooleanExpression):
349@dataclass(frozen=True)
350class IsNull(BooleanExpression):
351    """Column is NULL."""
352
353    term: str
354
355    def to_sql(self) -> str:
356        return f"{_quote_column(self.term)} IS NULL"
357
358    def __repr__(self) -> str:
359        return f"IsNull(term={self.term!r})"

Column is NULL.

IsNull(term: str)
term: str
def to_sql(self) -> str:
355    def to_sql(self) -> str:
356        return f"{_quote_column(self.term)} IS NULL"
@dataclass(frozen=True)
class LessThan(pyducklake.BooleanExpression):
273@dataclass(frozen=True)
274class LessThan(BooleanExpression):
275    """Column less than value."""
276
277    term: str
278    value: Any
279
280    def to_sql(self) -> str:
281        return f"{_quote_column(self.term)} < {_format_value(self.value)}"
282
283    def __repr__(self) -> str:
284        return f"LessThan(term={self.term!r}, value={self.value!r})"

Column less than value.

LessThan(term: str, value: Any)
term: str
value: Any
def to_sql(self) -> str:
280    def to_sql(self) -> str:
281        return f"{_quote_column(self.term)} < {_format_value(self.value)}"
@dataclass(frozen=True)
class LessThanOrEqual(pyducklake.BooleanExpression):
287@dataclass(frozen=True)
288class LessThanOrEqual(BooleanExpression):
289    """Column less than or equal to value."""
290
291    term: str
292    value: Any
293
294    def to_sql(self) -> str:
295        return f"{_quote_column(self.term)} <= {_format_value(self.value)}"
296
297    def __repr__(self) -> str:
298        return f"LessThanOrEqual(term={self.term!r}, value={self.value!r})"

Column less than or equal to value.

LessThanOrEqual(term: str, value: Any)
term: str
value: Any
def to_sql(self) -> str:
294    def to_sql(self) -> str:
295        return f"{_quote_column(self.term)} <= {_format_value(self.value)}"
class NamespaceAlreadyExistsError(pyducklake.DucklakeError):
35class NamespaceAlreadyExistsError(DucklakeError):
36    """Raised when attempting to create a namespace that already exists."""

Raised when attempting to create a namespace that already exists.

class NamespaceNotEmptyError(pyducklake.DucklakeError):
39class NamespaceNotEmptyError(DucklakeError):
40    """Raised when attempting to drop a non-empty namespace."""

Raised when attempting to drop a non-empty namespace.

class NoSuchNamespaceError(pyducklake.DucklakeError):
31class NoSuchNamespaceError(DucklakeError):
32    """Raised when a namespace does not exist."""

Raised when a namespace does not exist.

class NoSuchTableError(pyducklake.DucklakeError):
23class NoSuchTableError(DucklakeError):
24    """Raised when a table does not exist."""

Raised when a table does not exist.

class NoSuchViewError(pyducklake.DucklakeError):
47class NoSuchViewError(DucklakeError):
48    """Raised when a view does not exist."""

Raised when a view does not exist.

@dataclass(frozen=True)
class Not(pyducklake.BooleanExpression):
143@dataclass(frozen=True)
144class Not(BooleanExpression):
145    """Logical NOT."""
146
147    child: BooleanExpression
148
149    def __new__(cls, child: BooleanExpression) -> BooleanExpression:  # type: ignore[misc]
150        if isinstance(child, _AlwaysTrue):
151            return AlwaysFalse()
152        if isinstance(child, _AlwaysFalse):
153            return AlwaysTrue()
154        if isinstance(child, Not):
155            return child.child
156        instance = super().__new__(cls)
157        return instance
158
159    def to_sql(self) -> str:
160        return f"(NOT {self.child.to_sql()})"
161
162    def __repr__(self) -> str:
163        return f"Not(child={self.child!r})"

Logical NOT.

Not(child: BooleanExpression)
def to_sql(self) -> str:
159    def to_sql(self) -> str:
160        return f"(NOT {self.child.to_sql()})"
@dataclass(frozen=True)
class NotEqualTo(pyducklake.BooleanExpression):
231@dataclass(frozen=True)
232class NotEqualTo(BooleanExpression):
233    """Column not equals value."""
234
235    term: str
236    value: Any
237
238    def to_sql(self) -> str:
239        return f"{_quote_column(self.term)} != {_format_value(self.value)}"
240
241    def __repr__(self) -> str:
242        return f"NotEqualTo(term={self.term!r}, value={self.value!r})"

Column not equals value.

NotEqualTo(term: str, value: Any)
term: str
value: Any
def to_sql(self) -> str:
238    def to_sql(self) -> str:
239        return f"{_quote_column(self.term)} != {_format_value(self.value)}"
@dataclass(frozen=True)
class NotIn(pyducklake.BooleanExpression):
325@dataclass(frozen=True)
326class NotIn(BooleanExpression):
327    """Column value not in a set of values."""
328
329    term: str
330    values: tuple[Any, ...]
331
332    def __new__(cls, term: str, values: tuple[Any, ...]) -> BooleanExpression:  # type: ignore[misc]
333        if not values:
334            return AlwaysTrue()
335        instance = super().__new__(cls)
336        return instance
337
338    def to_sql(self) -> str:
339        vals = ", ".join(_format_value(v) for v in self.values)
340        return f"{_quote_column(self.term)} NOT IN ({vals})"
341
342    def __repr__(self) -> str:
343        return f"NotIn(term={self.term!r}, values={self.values!r})"

Column value not in a set of values.

NotIn(term: str, values: tuple[typing.Any, ...])
term: str
values: tuple[typing.Any, ...]
def to_sql(self) -> str:
338    def to_sql(self) -> str:
339        vals = ", ".join(_format_value(v) for v in self.values)
340        return f"{_quote_column(self.term)} NOT IN ({vals})"
@dataclass(frozen=True)
class NotNaN(pyducklake.BooleanExpression):
388@dataclass(frozen=True)
389class NotNaN(BooleanExpression):
390    """Column is not NaN."""
391
392    term: str
393
394    def to_sql(self) -> str:
395        return f"NOT isnan({_quote_column(self.term)})"
396
397    def __repr__(self) -> str:
398        return f"NotNaN(term={self.term!r})"

Column is not NaN.

NotNaN(term: str)
term: str
def to_sql(self) -> str:
394    def to_sql(self) -> str:
395        return f"NOT isnan({_quote_column(self.term)})"
@dataclass(frozen=True)
class NotNull(pyducklake.BooleanExpression):
362@dataclass(frozen=True)
363class NotNull(BooleanExpression):
364    """Column is not NULL."""
365
366    term: str
367
368    def to_sql(self) -> str:
369        return f"{_quote_column(self.term)} IS NOT NULL"
370
371    def __repr__(self) -> str:
372        return f"NotNull(term={self.term!r})"

Column is not NULL.

NotNull(term: str)
term: str
def to_sql(self) -> str:
368    def to_sql(self) -> str:
369        return f"{_quote_column(self.term)} IS NOT NULL"
@dataclass(frozen=True)
class Or(pyducklake.BooleanExpression):
190@dataclass(frozen=True)
191class Or(BooleanExpression):
192    """Logical OR with short-circuit simplification."""
193
194    left: BooleanExpression
195    right: BooleanExpression
196
197    def __new__(cls, left: BooleanExpression, right: BooleanExpression) -> BooleanExpression:  # type: ignore[misc]
198        if isinstance(left, _AlwaysTrue) or isinstance(right, _AlwaysTrue):
199            return AlwaysTrue()
200        if isinstance(left, _AlwaysFalse):
201            return right
202        if isinstance(right, _AlwaysFalse):
203            return left
204        instance = super().__new__(cls)
205        return instance
206
207    def to_sql(self) -> str:
208        return f"({self.left.to_sql()} OR {self.right.to_sql()})"
209
210    def __repr__(self) -> str:
211        return f"Or(left={self.left!r}, right={self.right!r})"

Logical OR with short-circuit simplification.

def to_sql(self) -> str:
207    def to_sql(self) -> str:
208        return f"({self.left.to_sql()} OR {self.right.to_sql()})"
@dataclass(frozen=True)
class Reference:
133@dataclass(frozen=True)
134class Reference:
135    """Column reference."""
136
137    name: str
138
139    def __repr__(self) -> str:
140        return f"Reference('{self.name}')"

Column reference.

Reference(name: str)
name: str
class Schema:
 74class Schema:
 75    """Represents a Ducklake table schema."""
 76
 77    __slots__ = ("_fields", "_schema_id", "_name_to_field", "_id_to_field")
 78
 79    def __init__(self, *fields: NestedField, schema_id: int = 0) -> None:
 80        # Validate uniqueness
 81        seen_names: set[str] = set()
 82        seen_ids: set[int] = set()
 83        name_index: dict[str, NestedField] = {}
 84        id_index: dict[int, NestedField] = {}
 85
 86        for f in fields:
 87            if f.name in seen_names:
 88                raise ValueError(f"Duplicate field name: {f.name!r}")
 89            if f.field_id in seen_ids:
 90                raise ValueError(f"Duplicate field_id: {f.field_id}")
 91            seen_names.add(f.name)
 92            seen_ids.add(f.field_id)
 93            name_index[f.name] = f
 94            id_index[f.field_id] = f
 95
 96        self._fields = fields
 97        self._schema_id = schema_id
 98        self._name_to_field = name_index
 99        self._id_to_field = id_index
100
101    # -- Alternate constructors ----------------------------------------------
102
103    @classmethod
104    def of(cls, *args: NestedField | dict[str, DucklakeType]) -> Schema:
105        """Create a Schema with auto-assigned field IDs.
106
107        Accepts either :class:`NestedField` objects (from :func:`required` /
108        :func:`optional`) or a single *dict* mapping column names to types.
109
110        Examples:
111            Using ``required()`` and ``optional()`` helpers:
112
113            ```pycon
114            >>> from pyducklake import Schema, required, optional, IntegerType, StringType, DoubleType
115            >>> schema = Schema.of(
116            ...     required("id", IntegerType()),
117            ...     optional("name", StringType()),
118            ...     optional("value", DoubleType()),
119            ... )
120            >>> schema.column_names()
121            ['id', 'name', 'value']
122            >>> schema.find_field("id").required
123            True
124
125            ```
126
127            Using a dict (all fields optional by default):
128
129            ```pycon
130            >>> schema = Schema.of({"id": IntegerType(), "name": StringType()})
131            >>> schema.column_names()
132            ['id', 'name']
133
134            ```
135
136        Args:
137            *args: Either NestedField objects or a single dict[str, DucklakeType].
138
139        Returns:
140            A new Schema with field IDs assigned sequentially starting from 1.
141
142        Raises:
143            TypeError: If args are mixed types or invalid.
144            ValueError: If duplicate field names are provided.
145        """
146        if not args:
147            raise TypeError("Schema.of() requires at least one argument")
148
149        # Single-dict form
150        if len(args) == 1 and isinstance(args[0], dict):
151            mapping = args[0]
152            fields: list[NestedField] = []
153            for i, (name, ftype) in enumerate(mapping.items(), start=1):
154                if not isinstance(ftype, DucklakeType):  # pyright: ignore[reportUnnecessaryIsInstance]
155                    raise TypeError(
156                        f"Dict values must be DucklakeType instances, got {type(ftype).__name__} for key {name!r}"
157                    )
158                fields.append(NestedField(field_id=i, name=name, field_type=ftype, required=False))
159            return cls(*fields)
160
161        # NestedField form — validate types first
162        raw_fields: list[NestedField] = []
163        for arg in args:
164            if not isinstance(arg, NestedField):
165                raise TypeError(
166                    f"Schema.of() arguments must be all NestedField or a single dict, got {type(arg).__name__}"
167                )
168            raw_fields.append(arg)
169
170        # 1. Collect all explicit (non-sentinel) field_ids
171        explicit_ids = {f.field_id for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID}
172        if len(explicit_ids) != sum(1 for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID):
173            raise ValueError("Duplicate explicit field_ids")
174
175        # 2. Auto-assign sentinel fields, skipping explicit IDs
176        next_id = 1
177        resolved: list[NestedField] = []
178        for f in raw_fields:
179            if f.field_id == _SENTINEL_FIELD_ID:
180                while next_id in explicit_ids:
181                    next_id += 1
182                resolved.append(
183                    NestedField(
184                        field_id=next_id,
185                        name=f.name,
186                        field_type=f.field_type,
187                        required=f.required,
188                        doc=f.doc,
189                    )
190                )
191                next_id += 1
192            else:
193                resolved.append(f)
194
195        return cls(*resolved)
196
197    # -- Properties ----------------------------------------------------------
198
199    @property
200    def fields(self) -> tuple[NestedField, ...]:
201        return self._fields
202
203    @property
204    def schema_id(self) -> int:
205        return self._schema_id
206
207    # -- Field lookup --------------------------------------------------------
208
209    def find_field(self, name_or_id: str | int, case_sensitive: bool = True) -> NestedField:
210        """Find a field by name or field_id. Raises ValueError if not found."""
211        if isinstance(name_or_id, int):
212            field = self._id_to_field.get(name_or_id)
213            if field is None:
214                raise ValueError(f"Field with field_id {name_or_id} not found")
215            return field
216
217        if case_sensitive:
218            field = self._name_to_field.get(name_or_id)
219            if field is None:
220                raise ValueError(f"Field {name_or_id!r} not found")
221            return field
222
223        # Case-insensitive
224        lower = name_or_id.lower()
225        for f in self._fields:
226            if f.name.lower() == lower:
227                return f
228        raise ValueError(f"Field {name_or_id!r} not found")
229
230    def find_type(self, name_or_id: str | int, case_sensitive: bool = True) -> DucklakeType:
231        """Find a field's type by name or field_id."""
232        return self.find_field(name_or_id, case_sensitive=case_sensitive).field_type
233
234    def find_column_name(self, field_id: int) -> str | None:
235        """Find a column name by field_id. Returns None if not found."""
236        field = self._id_to_field.get(field_id)
237        return field.name if field is not None else None
238
239    # -- Accessors -----------------------------------------------------------
240
241    def column_names(self) -> list[str]:
242        return [f.name for f in self._fields]
243
244    def field_ids(self) -> set[int]:
245        return {f.field_id for f in self._fields}
246
247    @property
248    def highest_field_id(self) -> int:
249        """Highest field_id in the schema (useful for generating new IDs)."""
250        if not self._fields:
251            return 0
252        return max(f.field_id for f in self._fields)
253
254    # -- Conversion ----------------------------------------------------------
255
256    def as_struct(self) -> StructType:
257        """Convert to a StructType."""
258        return StructType(fields=self._fields)
259
260    def as_arrow(self) -> pa.Schema:
261        """Convert to a PyArrow Schema.
262
263        Each field carries ``PARQUET:field_id`` metadata matching Ducklake/Iceberg
264        conventions for Parquet files.
265        """
266        arrow_fields: list[pa.Field[pa.DataType]] = []
267        for f in self._fields:
268            arrow_type = ducklake_type_to_arrow(f.field_type)
269            arrow_field = pa.field(
270                f.name,
271                arrow_type,
272                nullable=not f.required,
273                metadata={b"PARQUET:field_id": str(f.field_id).encode()},
274            )
275            arrow_fields.append(arrow_field)
276        return pa.schema(arrow_fields)
277
278    # -- Projection ----------------------------------------------------------
279
280    def select(self, *names: str, case_sensitive: bool = True) -> Schema:
281        """Return a new Schema with only the selected columns.
282
283        Field order is preserved from the original schema.
284        Raises ValueError for unknown names.
285        """
286        # Validate all names exist first
287        if case_sensitive:
288            selected: set[str] = set()
289            for n in names:
290                if n not in self._name_to_field:
291                    raise ValueError(f"Field {n!r} not found")
292                selected.add(n)
293            kept = tuple(f for f in self._fields if f.name in selected)
294        else:
295            lower_names = {n.lower() for n in names}
296            # Validate
297            known_lower = {f.name.lower() for f in self._fields}
298            for n in names:
299                if n.lower() not in known_lower:
300                    raise ValueError(f"Field {n!r} not found")
301            kept = tuple(f for f in self._fields if f.name.lower() in lower_names)
302
303        return Schema(*kept, schema_id=self._schema_id)
304
305    # -- Standard methods ----------------------------------------------------
306
307    def __eq__(self, other: object) -> bool:
308        if not isinstance(other, Schema):
309            return NotImplemented
310        return self._fields == other._fields and self._schema_id == other._schema_id
311
312    def __repr__(self) -> str:
313        fields_repr = ", ".join(repr(f) for f in self._fields)
314        return f"Schema(fields=({fields_repr}), schema_id={self._schema_id})"
315
316    def __len__(self) -> int:
317        return len(self._fields)
318
319    def __iter__(self) -> Iterator[NestedField]:
320        return iter(self._fields)

Represents a Ducklake table schema.

Schema(*fields: NestedField, schema_id: int = 0)
79    def __init__(self, *fields: NestedField, schema_id: int = 0) -> None:
80        # Validate uniqueness
81        seen_names: set[str] = set()
82        seen_ids: set[int] = set()
83        name_index: dict[str, NestedField] = {}
84        id_index: dict[int, NestedField] = {}
85
86        for f in fields:
87            if f.name in seen_names:
88                raise ValueError(f"Duplicate field name: {f.name!r}")
89            if f.field_id in seen_ids:
90                raise ValueError(f"Duplicate field_id: {f.field_id}")
91            seen_names.add(f.name)
92            seen_ids.add(f.field_id)
93            name_index[f.name] = f
94            id_index[f.field_id] = f
95
96        self._fields = fields
97        self._schema_id = schema_id
98        self._name_to_field = name_index
99        self._id_to_field = id_index
@classmethod
def of( cls, *args: NestedField | dict[str, DucklakeType]) -> Schema:
103    @classmethod
104    def of(cls, *args: NestedField | dict[str, DucklakeType]) -> Schema:
105        """Create a Schema with auto-assigned field IDs.
106
107        Accepts either :class:`NestedField` objects (from :func:`required` /
108        :func:`optional`) or a single *dict* mapping column names to types.
109
110        Examples:
111            Using ``required()`` and ``optional()`` helpers:
112
113            ```pycon
114            >>> from pyducklake import Schema, required, optional, IntegerType, StringType, DoubleType
115            >>> schema = Schema.of(
116            ...     required("id", IntegerType()),
117            ...     optional("name", StringType()),
118            ...     optional("value", DoubleType()),
119            ... )
120            >>> schema.column_names()
121            ['id', 'name', 'value']
122            >>> schema.find_field("id").required
123            True
124
125            ```
126
127            Using a dict (all fields optional by default):
128
129            ```pycon
130            >>> schema = Schema.of({"id": IntegerType(), "name": StringType()})
131            >>> schema.column_names()
132            ['id', 'name']
133
134            ```
135
136        Args:
137            *args: Either NestedField objects or a single dict[str, DucklakeType].
138
139        Returns:
140            A new Schema with field IDs assigned sequentially starting from 1.
141
142        Raises:
143            TypeError: If args are mixed types or invalid.
144            ValueError: If duplicate field names are provided.
145        """
146        if not args:
147            raise TypeError("Schema.of() requires at least one argument")
148
149        # Single-dict form
150        if len(args) == 1 and isinstance(args[0], dict):
151            mapping = args[0]
152            fields: list[NestedField] = []
153            for i, (name, ftype) in enumerate(mapping.items(), start=1):
154                if not isinstance(ftype, DucklakeType):  # pyright: ignore[reportUnnecessaryIsInstance]
155                    raise TypeError(
156                        f"Dict values must be DucklakeType instances, got {type(ftype).__name__} for key {name!r}"
157                    )
158                fields.append(NestedField(field_id=i, name=name, field_type=ftype, required=False))
159            return cls(*fields)
160
161        # NestedField form — validate types first
162        raw_fields: list[NestedField] = []
163        for arg in args:
164            if not isinstance(arg, NestedField):
165                raise TypeError(
166                    f"Schema.of() arguments must be all NestedField or a single dict, got {type(arg).__name__}"
167                )
168            raw_fields.append(arg)
169
170        # 1. Collect all explicit (non-sentinel) field_ids
171        explicit_ids = {f.field_id for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID}
172        if len(explicit_ids) != sum(1 for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID):
173            raise ValueError("Duplicate explicit field_ids")
174
175        # 2. Auto-assign sentinel fields, skipping explicit IDs
176        next_id = 1
177        resolved: list[NestedField] = []
178        for f in raw_fields:
179            if f.field_id == _SENTINEL_FIELD_ID:
180                while next_id in explicit_ids:
181                    next_id += 1
182                resolved.append(
183                    NestedField(
184                        field_id=next_id,
185                        name=f.name,
186                        field_type=f.field_type,
187                        required=f.required,
188                        doc=f.doc,
189                    )
190                )
191                next_id += 1
192            else:
193                resolved.append(f)
194
195        return cls(*resolved)

Create a Schema with auto-assigned field IDs.

Accepts either NestedField objects (from required() / optional()) or a single dict mapping column names to types.

Examples: Using required() and optional() helpers:

>>> from pyducklake import Schema, required, optional, IntegerType, StringType, DoubleType
>>> schema = Schema.of(
...     required("id", IntegerType()),
...     optional("name", StringType()),
...     optional("value", DoubleType()),
... )
>>> schema.column_names()
['id', 'name', 'value']
>>> schema.find_field("id").required
True
Using a dict (all fields optional by default):
>>> schema = Schema.of({"id": IntegerType(), "name": StringType()})
>>> schema.column_names()
['id', 'name']

Args: *args: Either NestedField objects or a single dict[str, DucklakeType].

Returns: A new Schema with field IDs assigned sequentially starting from 1.

Raises: TypeError: If args are mixed types or invalid. ValueError: If duplicate field names are provided.

fields: tuple[NestedField, ...]
199    @property
200    def fields(self) -> tuple[NestedField, ...]:
201        return self._fields
schema_id: int
203    @property
204    def schema_id(self) -> int:
205        return self._schema_id
def find_field( self, name_or_id: str | int, case_sensitive: bool = True) -> NestedField:
209    def find_field(self, name_or_id: str | int, case_sensitive: bool = True) -> NestedField:
210        """Find a field by name or field_id. Raises ValueError if not found."""
211        if isinstance(name_or_id, int):
212            field = self._id_to_field.get(name_or_id)
213            if field is None:
214                raise ValueError(f"Field with field_id {name_or_id} not found")
215            return field
216
217        if case_sensitive:
218            field = self._name_to_field.get(name_or_id)
219            if field is None:
220                raise ValueError(f"Field {name_or_id!r} not found")
221            return field
222
223        # Case-insensitive
224        lower = name_or_id.lower()
225        for f in self._fields:
226            if f.name.lower() == lower:
227                return f
228        raise ValueError(f"Field {name_or_id!r} not found")

Find a field by name or field_id. Raises ValueError if not found.

def find_type( self, name_or_id: str | int, case_sensitive: bool = True) -> DucklakeType:
230    def find_type(self, name_or_id: str | int, case_sensitive: bool = True) -> DucklakeType:
231        """Find a field's type by name or field_id."""
232        return self.find_field(name_or_id, case_sensitive=case_sensitive).field_type

Find a field's type by name or field_id.

def find_column_name(self, field_id: int) -> str | None:
234    def find_column_name(self, field_id: int) -> str | None:
235        """Find a column name by field_id. Returns None if not found."""
236        field = self._id_to_field.get(field_id)
237        return field.name if field is not None else None

Find a column name by field_id. Returns None if not found.

def column_names(self) -> list[str]:
241    def column_names(self) -> list[str]:
242        return [f.name for f in self._fields]
def field_ids(self) -> set[int]:
244    def field_ids(self) -> set[int]:
245        return {f.field_id for f in self._fields}
highest_field_id: int
247    @property
248    def highest_field_id(self) -> int:
249        """Highest field_id in the schema (useful for generating new IDs)."""
250        if not self._fields:
251            return 0
252        return max(f.field_id for f in self._fields)

Highest field_id in the schema (useful for generating new IDs).

def as_struct(self) -> StructType:
256    def as_struct(self) -> StructType:
257        """Convert to a StructType."""
258        return StructType(fields=self._fields)

Convert to a StructType.

def as_arrow(self) -> pyarrow.lib.Schema:
260    def as_arrow(self) -> pa.Schema:
261        """Convert to a PyArrow Schema.
262
263        Each field carries ``PARQUET:field_id`` metadata matching Ducklake/Iceberg
264        conventions for Parquet files.
265        """
266        arrow_fields: list[pa.Field[pa.DataType]] = []
267        for f in self._fields:
268            arrow_type = ducklake_type_to_arrow(f.field_type)
269            arrow_field = pa.field(
270                f.name,
271                arrow_type,
272                nullable=not f.required,
273                metadata={b"PARQUET:field_id": str(f.field_id).encode()},
274            )
275            arrow_fields.append(arrow_field)
276        return pa.schema(arrow_fields)

Convert to a PyArrow Schema.

Each field carries PARQUET:field_id metadata matching Ducklake/Iceberg conventions for Parquet files.

def select( self, *names: str, case_sensitive: bool = True) -> Schema:
280    def select(self, *names: str, case_sensitive: bool = True) -> Schema:
281        """Return a new Schema with only the selected columns.
282
283        Field order is preserved from the original schema.
284        Raises ValueError for unknown names.
285        """
286        # Validate all names exist first
287        if case_sensitive:
288            selected: set[str] = set()
289            for n in names:
290                if n not in self._name_to_field:
291                    raise ValueError(f"Field {n!r} not found")
292                selected.add(n)
293            kept = tuple(f for f in self._fields if f.name in selected)
294        else:
295            lower_names = {n.lower() for n in names}
296            # Validate
297            known_lower = {f.name.lower() for f in self._fields}
298            for n in names:
299                if n.lower() not in known_lower:
300                    raise ValueError(f"Field {n!r} not found")
301            kept = tuple(f for f in self._fields if f.name.lower() in lower_names)
302
303        return Schema(*kept, schema_id=self._schema_id)

Return a new Schema with only the selected columns.

Field order is preserved from the original schema. Raises ValueError for unknown names.

def optional( name: str, field_type: DucklakeType, doc: str | None = None) -> NestedField:
47def optional(name: str, field_type: DucklakeType, doc: str | None = None) -> NestedField:
48    """Create an optional (nullable) field for use with :meth:`Schema.of`.
49
50    The ``field_id`` is set to a sentinel value and will be auto-assigned
51    by :meth:`Schema.of`.
52
53    Args:
54        name: Column name.
55        field_type: Column type.
56        doc: Optional documentation string.
57
58    Example:
59
60    ```pycon
61    >>> from pyducklake import Schema, required, optional, IntegerType, StringType
62    >>> schema = Schema.of(
63    ...     required("id", IntegerType()),
64    ...     optional("name", StringType()),
65    ... )
66    >>> schema.column_names()
67    ['id', 'name']
68
69    ```
70    """
71    return NestedField(field_id=_SENTINEL_FIELD_ID, name=name, field_type=field_type, required=False, doc=doc)

Create an optional (nullable) field for use with Schema.of().

The field_id is set to a sentinel value and will be auto-assigned by Schema.of().

Args: name: Column name. field_type: Column type. doc: Optional documentation string.

Example:

>>> from pyducklake import Schema, required, optional, IntegerType, StringType
>>> schema = Schema.of(
...     required("id", IntegerType()),
...     optional("name", StringType()),
... )
>>> schema.column_names()
['id', 'name']
def required( name: str, field_type: DucklakeType, doc: str | None = None) -> NestedField:
22def required(name: str, field_type: DucklakeType, doc: str | None = None) -> NestedField:
23    """Create a required field for use with :meth:`Schema.of`.
24
25    The ``field_id`` is set to a sentinel value and will be auto-assigned
26    by :meth:`Schema.of`.
27
28    Args:
29        name: Column name.
30        field_type: Column type.
31        doc: Optional documentation string.
32
33    Example:
34
35    ```pycon
36    >>> from pyducklake import Schema, required, optional, IntegerType, StringType
37    >>> schema = Schema.of(
38    ...     required("id", IntegerType()),
39    ...     optional("name", StringType()),
40    ... )
41
42    ```
43    """
44    return NestedField(field_id=_SENTINEL_FIELD_ID, name=name, field_type=field_type, required=True, doc=doc)

Create a required field for use with Schema.of().

The field_id is set to a sentinel value and will be auto-assigned by Schema.of().

Args: name: Column name. field_type: Column type. doc: Optional documentation string.

Example:

>>> from pyducklake import Schema, required, optional, IntegerType, StringType
>>> schema = Schema.of(
...     required("id", IntegerType()),
...     optional("name", StringType()),
... )
@dataclass(frozen=True)
class Snapshot:
12@dataclass(frozen=True)
13class Snapshot:
14    """Represents a Ducklake snapshot (committed transaction)."""
15
16    snapshot_id: int
17    timestamp: datetime
18    schema_version: int | None = None
19    changes: str | None = None
20    author: str | None = None
21    commit_message: str | None = None

Represents a Ducklake snapshot (committed transaction).

Snapshot( snapshot_id: int, timestamp: datetime.datetime, schema_version: int | None = None, changes: str | None = None, author: str | None = None, commit_message: str | None = None)
snapshot_id: int
timestamp: datetime.datetime
schema_version: int | None = None
changes: str | None = None
author: str | None = None
commit_message: str | None = None
class Table:
 53class Table:
 54    """Represents a loaded Ducklake table."""
 55
 56    def __init__(
 57        self,
 58        identifier: tuple[str, str],
 59        schema: Schema,
 60        catalog: Catalog,
 61        *,
 62        sort_order: SortOrder | None = None,
 63    ) -> None:
 64        self._identifier = identifier
 65        self._schema = schema
 66        self._catalog = catalog
 67        self._sort_order_cache: SortOrder | None = sort_order
 68
 69    @property
 70    def name(self) -> str:
 71        """Table name (without namespace)."""
 72        return self._identifier[1]
 73
 74    @property
 75    def namespace(self) -> str:
 76        """Namespace (schema) name."""
 77        return self._identifier[0]
 78
 79    @property
 80    def identifier(self) -> tuple[str, str]:
 81        """(namespace, table_name) tuple."""
 82        return self._identifier
 83
 84    @property
 85    def schema(self) -> Schema:
 86        """Current table schema."""
 87        return self._schema
 88
 89    @property
 90    def catalog(self) -> Catalog:
 91        """The catalog this table belongs to."""
 92        return self._catalog
 93
 94    @property
 95    def fully_qualified_name(self) -> str:
 96        """catalog.namespace.table_name"""
 97        return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1])
 98
 99    def current_snapshot(self) -> Snapshot | None:
100        """Get current snapshot info. Returns None if table has no data."""
101        snapshots = self.snapshots()
102        if not snapshots:
103            return None
104        return snapshots[-1]
105
106    def snapshots(self) -> list[Snapshot]:
107        """List all catalog-level snapshots.
108
109        Ducklake snapshots are catalog-wide (not per-table). Each snapshot
110        represents a transaction that may have modified any table in the
111        catalog. The table may not have changed in every returned snapshot.
112        """
113        catalog_name = self._catalog.name
114        meta_schema = f"__ducklake_metadata_{catalog_name}"
115        try:
116            rows: list[tuple[Any, ...]] = self._catalog.fetchall(
117                f"SELECT snapshot_id, snapshot_time, schema_version "
118                f'FROM "{meta_schema}".ducklake_snapshot '
119                f"ORDER BY snapshot_id"
120            )
121        except (duckdb.CatalogException, duckdb.BinderException):
122            return []
123
124        snapshots: list[Snapshot] = []
125        for row in rows:
126            snapshot_id = int(row[0])
127            ts = row[1]
128            if isinstance(ts, datetime):
129                timestamp = ts
130            else:
131                timestamp = datetime.fromtimestamp(0, tz=timezone.utc)
132
133            schema_version = int(row[2]) if len(row) > 2 and row[2] is not None else None
134
135            snapshots.append(
136                Snapshot(
137                    snapshot_id=snapshot_id,
138                    timestamp=timestamp,
139                    schema_version=schema_version,
140                )
141            )
142        return snapshots
143
144    def refresh(self) -> Table:
145        """Reload schema and metadata from the catalog. Returns self."""
146        self._schema = self._catalog.build_schema_from_describe(self._identifier[0], self._identifier[1])
147        self._sort_order_cache = None
148        return self
149
150    # -- Rollback --------------------------------------------------------------
151
152    def rollback_to_snapshot(self, snapshot_id: int) -> None:
153        """Roll back the table to a previous snapshot.
154
155        This creates a new snapshot that matches the state at the given snapshot_id.
156        Data written after that snapshot becomes inaccessible (but files aren't
157        deleted until maintenance operations run).
158
159        Args:
160            snapshot_id: The snapshot ID to roll back to.
161
162        Raises:
163            ValueError: If snapshot_id doesn't exist.
164        """
165        known_ids = {s.snapshot_id for s in self.snapshots()}
166        if snapshot_id not in known_ids:
167            raise ValueError(f"Snapshot {snapshot_id} does not exist")
168
169        fqn = self.fully_qualified_name
170        conn = self._catalog.connection
171        conn.execute(
172            f"CREATE OR REPLACE TEMP TABLE _pyducklake_rollback AS SELECT * FROM {fqn} AT (VERSION => {snapshot_id})"
173        )
174        conn.execute(f"DELETE FROM {fqn}")
175        conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_rollback")
176        conn.execute("DROP TABLE _pyducklake_rollback")
177
178    def rollback_to_timestamp(self, timestamp: datetime) -> None:
179        """Roll back the table to the state at a given timestamp.
180
181        Finds the latest snapshot at or before the timestamp and rolls back to it.
182
183        Args:
184            timestamp: The point in time to roll back to.
185
186        Raises:
187            ValueError: If no snapshot exists at or before the timestamp.
188        """
189        snapshots = self.snapshots()
190        # Normalize timestamp once to avoid mutating across iterations
191        ts = timestamp
192        candidate: Snapshot | None = None
193        for snap in snapshots:
194            snap_ts = snap.timestamp
195            # Make both offset-aware or both naive for comparison
196            if ts.tzinfo is not None and snap_ts.tzinfo is None:
197                snap_ts = snap_ts.replace(tzinfo=ts.tzinfo)
198            elif ts.tzinfo is None and snap_ts.tzinfo is not None:
199                ts = ts.replace(tzinfo=snap_ts.tzinfo)
200            if snap_ts <= ts:
201                candidate = snap
202        if candidate is None:
203            raise ValueError(f"No snapshot exists at or before {timestamp}")
204        self.rollback_to_snapshot(candidate.snapshot_id)
205
206    # -- Scan ------------------------------------------------------------------
207
208    def scan(
209        self,
210        row_filter: BooleanExpression | str = AlwaysTrue(),
211        selected_fields: tuple[str, ...] = ("*",),
212        snapshot_id: int | None = None,
213        limit: int | None = None,
214    ) -> DataScan:
215        """Create a scan on this table.
216
217        Examples:
218
219        ```pycon
220        >>> import tempfile, os, pyarrow as pa
221        >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
222        >>> tmp = tempfile.mkdtemp()
223        >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
224        >>> table = cat.create_table("users", Schema.of(
225        ...     required("id", IntegerType()), optional("name", StringType()),
226        ... ))
227        >>> table.append(pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]}))
228        >>> table.scan().count()
229        3
230        >>> table.scan().select("name").to_arrow().column("name").to_pylist()
231        ['alice', 'bob', 'carol']
232        >>> table.scan('"id" > 1').count()
233        2
234
235        ```
236        """
237        from pyducklake.scan import DataScan, RawSQL
238
239        if isinstance(row_filter, str):
240            actual_filter: BooleanExpression = RawSQL(row_filter)
241        else:
242            actual_filter = row_filter
243
244        return DataScan(
245            table=self,
246            row_filter=actual_filter,
247            selected_fields=selected_fields,
248            snapshot_id=snapshot_id,
249            limit=limit,
250        )
251
252    # -- Add Files -------------------------------------------------------------
253
254    def add_files(
255        self,
256        file_paths: list[str] | str,
257        *,
258        allow_missing: bool = False,
259        ignore_extra_columns: bool = False,
260    ) -> None:
261        """Register external Parquet files with this table.
262
263        Args:
264            file_paths: Path(s) to Parquet files to register.
265            allow_missing: If True, columns present in the table but
266                missing from the file are filled with their initial
267                default value.
268            ignore_extra_columns: If True, extra columns in the file
269                that aren't in the table are silently ignored.
270
271        The files must be valid Parquet files with a compatible schema.
272        """
273        from pyducklake.catalog import escape_string_literal
274
275        if isinstance(file_paths, str):
276            file_paths = [file_paths]
277
278        catalog_name = self._catalog.name
279        namespace = self._identifier[0]
280        table_name = self._identifier[1]
281
282        opts = ""
283        if namespace != "main":
284            opts += f", schema := '{escape_string_literal(namespace)}'"
285        if allow_missing:
286            opts += ", allow_missing := true"
287        if ignore_extra_columns:
288            opts += ", ignore_extra_columns := true"
289
290        esc_cat = escape_string_literal(catalog_name)
291        esc_tbl = escape_string_literal(table_name)
292        for path in file_paths:
293            esc_path = escape_string_literal(path)
294            self._catalog.connection.execute(
295                f"CALL ducklake_add_data_files('{esc_cat}', '{esc_tbl}', '{esc_path}'{opts})"
296            )
297
298    # -- Write -----------------------------------------------------------------
299
300    @staticmethod
301    def _to_arrow_table(df: ArrowCompatible) -> pa.Table:
302        """Convert input to pyarrow.Table, supporting the Arrow PyCapsule interface.
303
304        Accepts ``pyarrow.Table`` directly, or any object implementing
305        ``__arrow_c_stream__`` (e.g. Polars DataFrame, nanoarrow array,
306        arro3 table).
307        """
308        if isinstance(df, pa.Table):
309            return df
310        if hasattr(df, "__arrow_c_stream__"):
311            return pa.RecordBatchReader.from_stream(df).read_all()
312        raise TypeError(f"Expected pyarrow.Table or object implementing __arrow_c_stream__, got {type(df).__name__}")
313
314    def _sort_order_clause(self) -> str | None:
315        """Return an ORDER BY clause if the table has a sort order, else None."""
316        sort = self.sort_order
317        if sort.is_unsorted:
318            return None
319        return ", ".join(f.to_sql() for f in sort.fields)
320
321    def append(self, df: ArrowCompatible) -> None:
322        """Append data to the table.
323
324        Accepts ``pyarrow.Table`` or any object implementing the Arrow
325        PyCapsule interface (``__arrow_c_stream__``).
326
327        If the table has a sort order, data is sorted before insertion.
328
329        Examples:
330
331        ```pycon
332        >>> import tempfile, os, pyarrow as pa
333        >>> from pyducklake import Catalog, Schema, required, IntegerType
334        >>> tmp = tempfile.mkdtemp()
335        >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
336        >>> table = cat.create_table("nums", Schema.of(required("n", IntegerType())))
337        >>> table.append(pa.table({"n": [1, 2, 3]}))
338        >>> table.append(pa.table({"n": [4, 5]}))
339        >>> table.scan().count()
340        5
341
342        ```
343        """
344        arrow_table = self._to_arrow_table(df)
345        order = self._sort_order_clause()
346        conn = self._catalog.connection
347        conn.register("_pyducklake_tmp_append", arrow_table)
348        try:
349            fqn = self.fully_qualified_name
350            if order:
351                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append ORDER BY {order}")
352            else:
353                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append")
354        finally:
355            conn.unregister("_pyducklake_tmp_append")
356
357    def append_batches(
358        self,
359        batches: pa.RecordBatchReader | Iterator[pa.RecordBatch],
360        *,
361        schema: pa.Schema | None = None,
362    ) -> None:
363        """Append data from a stream of record batches.
364
365        Memory-efficient alternative to :meth:`append` — processes batches
366        without materializing the full dataset in memory.
367
368        Args:
369            batches: RecordBatchReader or iterator of RecordBatch.
370            schema: Required if passing an iterator (not needed for RecordBatchReader).
371        """
372        if isinstance(batches, pa.RecordBatchReader):
373            reader = batches
374        else:
375            if schema is None:
376                raise ValueError("schema is required when passing an iterator of RecordBatch")
377            reader = pa.RecordBatchReader.from_batches(schema, batches)
378
379        order = self._sort_order_clause()
380        conn = self._catalog.connection
381        conn.register("_pyducklake_tmp_batches", reader)
382        try:
383            fqn = self.fully_qualified_name
384            if order:
385                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches ORDER BY {order}")
386            else:
387                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches")
388        finally:
389            conn.unregister("_pyducklake_tmp_batches")
390
391    def overwrite(
392        self,
393        df: ArrowCompatible,
394        overwrite_filter: BooleanExpression | str = AlwaysTrue(),
395    ) -> None:
396        """Overwrite data matching the filter, then insert new data.
397
398        Accepts ``pyarrow.Table`` or any object implementing the Arrow
399        PyCapsule interface (``__arrow_c_stream__``).
400
401        If overwrite_filter is AlwaysTrue or not provided, truncates and inserts.
402        Otherwise, deletes matching rows then inserts.
403
404        Does not manage its own transaction — relies on the caller (or DuckDB
405        auto-commit) for atomicity.  This allows overwrite to be used inside
406        an explicit :class:`Transaction` without nesting conflicts.
407        """
408        arrow_table = self._to_arrow_table(df)
409        order = self._sort_order_clause()
410        conn = self._catalog.connection
411        conn.register("_pyducklake_tmp_overwrite", arrow_table)
412        try:
413            if isinstance(overwrite_filter, str):
414                where: str | None = overwrite_filter
415            elif overwrite_filter == AlwaysTrue():
416                where = None
417            else:
418                where = overwrite_filter.to_sql()
419
420            fqn = self.fully_qualified_name
421            if where:
422                conn.execute(f"DELETE FROM {fqn} WHERE {where}")
423            else:
424                conn.execute(f"DELETE FROM {fqn}")
425
426            if order:
427                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite ORDER BY {order}")
428            else:
429                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite")
430        finally:
431            conn.unregister("_pyducklake_tmp_overwrite")
432
433    # -- Delete ----------------------------------------------------------------
434
435    def delete(self, delete_filter: BooleanExpression | str) -> None:
436        """Delete rows matching the filter.
437
438        Args:
439            delete_filter: Rows matching this filter are deleted.
440        """
441        if isinstance(delete_filter, str):
442            filter_sql = delete_filter
443        elif delete_filter == AlwaysFalse():
444            return
445        elif delete_filter == AlwaysTrue():
446            filter_sql = None
447        else:
448            filter_sql = delete_filter.to_sql()
449
450        fqn = self.fully_qualified_name
451        if filter_sql is None:
452            self._catalog.connection.execute(f"DELETE FROM {fqn}")
453        else:
454            self._catalog.connection.execute(f"DELETE FROM {fqn} WHERE {filter_sql}")
455
456    # -- Upsert ----------------------------------------------------------------
457
458    def upsert(
459        self,
460        df: ArrowCompatible,
461        join_cols: tuple[str, ...] | list[str],
462    ) -> UpsertResult:
463        """Upsert data: update existing rows matching on join_cols, insert new rows.
464
465        Accepts ``pyarrow.Table`` or any object implementing the Arrow
466        PyCapsule interface (``__arrow_c_stream__``).
467
468        Uses DuckDB's MERGE statement.
469        """
470        arrow_table = self._to_arrow_table(df)
471        conn = self._catalog.connection
472        fqn = self.fully_qualified_name
473        join_col_set = set(join_cols)
474
475        # Count rows before to determine updated vs inserted
476        count_before = self.scan().count()
477
478        conn.register("_pyducklake_tmp_upsert", arrow_table)
479        try:
480            on_clause = " AND ".join(f'target."{col}" = source."{col}"' for col in join_cols)
481
482            all_cols = [field.name for field in arrow_table.schema]  # pyright: ignore[reportUnknownVariableType]
483            non_join_cols = [c for c in all_cols if c not in join_col_set]
484
485            update_set = ", ".join(f'"{col}" = source."{col}"' for col in non_join_cols)
486
487            insert_cols = ", ".join(f'source."{col}"' for col in all_cols)
488
489            merge_sql = f"MERGE INTO {fqn} AS target USING _pyducklake_tmp_upsert AS source ON {on_clause}"
490            if update_set:
491                merge_sql += f" WHEN MATCHED THEN UPDATE SET {update_set}"
492            merge_sql += f" WHEN NOT MATCHED THEN INSERT VALUES ({insert_cols})"
493
494            conn.execute(merge_sql)
495        finally:
496            conn.unregister("_pyducklake_tmp_upsert")
497
498        count_after = self.scan().count()
499        rows_inserted = count_after - count_before
500        rows_updated = arrow_table.num_rows - rows_inserted
501
502        return UpsertResult(rows_updated=rows_updated, rows_inserted=rows_inserted)
503
504    # -- Partitioning ----------------------------------------------------------
505
506    @property
507    def spec(self) -> PartitionSpec:
508        """Current partition spec. Returns UNPARTITIONED if not partitioned."""
509        from pyducklake.catalog import escape_string_literal
510        from pyducklake.partitioning import (
511            DAY,
512            HOUR,
513            IDENTITY,
514            MONTH,
515            UNPARTITIONED,
516            YEAR,
517            PartitionField,
518            PartitionSpec,
519        )
520
521        catalog_name = self._catalog.name
522        meta_schema = f"__ducklake_metadata_{catalog_name}"
523        try:
524            rows: list[tuple[Any, ...]] = self._catalog.fetchall(
525                f"SELECT c.column_name, pc.transform "
526                f'FROM "{meta_schema}".ducklake_partition_column pc '
527                f'JOIN "{meta_schema}".ducklake_partition_info pi '
528                f"ON pc.partition_id = pi.partition_id "
529                f"AND pc.table_id = pi.table_id "
530                f'JOIN "{meta_schema}".ducklake_table t '
531                f"ON pi.table_id = t.table_id "
532                f'JOIN "{meta_schema}".ducklake_schema s '
533                f"ON t.schema_id = s.schema_id "
534                f'JOIN "{meta_schema}".ducklake_column c '
535                f"ON pc.column_id = c.column_id "
536                f"AND pc.table_id = c.table_id "
537                f"WHERE t.table_name = '{escape_string_literal(self._identifier[1])}' "
538                f"AND s.schema_name = '{escape_string_literal(self._identifier[0])}' "
539                f"AND pi.end_snapshot IS NULL "
540                f"ORDER BY pc.partition_key_index"
541            )
542        except (duckdb.CatalogException, duckdb.BinderException):
543            return UNPARTITIONED
544
545        if not rows:
546            return UNPARTITIONED
547
548        _transform_map = {
549            "identity": IDENTITY,
550            "year": YEAR,
551            "month": MONTH,
552            "day": DAY,
553            "hour": HOUR,
554        }
555
556        fields: list[PartitionField] = []
557        for col_name, transform_name in rows:
558            transform = _transform_map.get(str(transform_name).lower(), IDENTITY)
559            fields.append(PartitionField(source_column=str(col_name), transform=transform))
560
561        return PartitionSpec(*fields)
562
563    def update_spec(self) -> UpdateSpec:
564        """Begin partition spec evolution. Returns an UpdateSpec builder."""
565        from pyducklake.partitioning import UpdateSpec
566
567        return UpdateSpec(self)
568
569    # -- Sorting ---------------------------------------------------------------
570
571    @property
572    def sort_order(self) -> SortOrder:
573        """Current sort order. Returns UNSORTED if not sorted."""
574        if self._sort_order_cache is not None:
575            return self._sort_order_cache
576
577        result = self._fetch_sort_order()
578        self._sort_order_cache = result
579        return result
580
581    def _fetch_sort_order(self) -> SortOrder:
582        """Query DuckLake metadata for the current sort order."""
583        from pyducklake.catalog import escape_string_literal
584        from pyducklake.sorting import (
585            UNSORTED,
586            NullOrder,
587            SortDirection,
588            SortField,
589            SortOrder,
590        )
591
592        catalog_name = self._catalog.name
593        meta_schema = f"__ducklake_metadata_{catalog_name}"
594        try:
595            rows: list[tuple[Any, ...]] = self._catalog.fetchall(
596                f"SELECT se.expression, se.sort_direction, se.null_order "
597                f'FROM "{meta_schema}".ducklake_sort_expression se '
598                f'JOIN "{meta_schema}".ducklake_sort_info si '
599                f"ON se.sort_id = si.sort_id "
600                f"AND se.table_id = si.table_id "
601                f'JOIN "{meta_schema}".ducklake_table t '
602                f"ON si.table_id = t.table_id "
603                f'JOIN "{meta_schema}".ducklake_schema s '
604                f"ON t.schema_id = s.schema_id "
605                f"WHERE t.table_name = '{escape_string_literal(self._identifier[1])}' "
606                f"AND s.schema_name = '{escape_string_literal(self._identifier[0])}' "
607                f"AND si.end_snapshot IS NULL "
608                f"ORDER BY se.sort_key_index"
609            )
610        except (duckdb.CatalogException, duckdb.BinderException):
611            return UNSORTED
612
613        if not rows:
614            return UNSORTED
615
616        fields: list[SortField] = []
617        for expr, sort_dir, null_ord in rows:
618            # expression is a quoted identifier like '"name"' — strip quotes
619            col_name = str(expr).strip('"')
620            direction = SortDirection.DESC if str(sort_dir).upper() == "DESC" else SortDirection.ASC
621            null_order = NullOrder.NULLS_FIRST if "FIRST" in str(null_ord).upper() else NullOrder.NULLS_LAST
622            fields.append(
623                SortField(
624                    source_column=col_name,
625                    direction=direction,
626                    null_order=null_order,
627                )
628            )
629
630        return SortOrder(fields=tuple(fields))
631
632    def update_sort_order(self) -> UpdateSortOrder:
633        """Begin sort order evolution. Returns an UpdateSortOrder builder."""
634        from pyducklake.sorting import UpdateSortOrder
635
636        return UpdateSortOrder(self)
637
638    # -- Arrow Dataset ---------------------------------------------------------
639
640    def to_arrow_dataset(self, *, snapshot_id: int | None = None) -> ds.Dataset:
641        """Return this table as a PyArrow Dataset.
642
643        Enables interop with engines that consume the PyArrow dataset API
644        (DuckDB, Polars, DataFusion, Dask, etc.).
645
646        Args:
647            snapshot_id: Optional snapshot for time travel.
648
649        Returns:
650            A pyarrow.dataset.Dataset wrapping this table's data.
651        """
652        import pyarrow.dataset as ds
653
654        scan = self.scan(snapshot_id=snapshot_id)
655        return ds.dataset(scan.to_arrow())  # pyright: ignore[reportUnknownMemberType]
656
657    # -- Inspect ---------------------------------------------------------------
658
659    def inspect(self) -> InspectTable:
660        """Return an :class:`InspectTable` for metadata introspection."""
661        from pyducklake.inspect import InspectTable
662
663        return InspectTable(self)
664
665    # -- Maintenance -----------------------------------------------------------
666
667    def maintenance(self) -> MaintenanceTable:
668        """Get maintenance operations for this table."""
669        from pyducklake.maintenance import MaintenanceTable
670
671        return MaintenanceTable(self)
672
673    # -- CDC (Change Data Capture) ---------------------------------------------
674
675    def table_changes(
676        self,
677        start_snapshot: int | None = None,
678        end_snapshot: int | None = None,
679        *,
680        start_time: datetime | None = None,
681        end_time: datetime | None = None,
682        columns: tuple[str, ...] | list[str] | None = None,
683        filter_expr: str | None = None,
684    ) -> ChangeSet:
685        """Query all changes between two snapshots or timestamps.
686
687        Returns a ChangeSet with inserts, deletes, and update pre/post images.
688        """
689        from pyducklake.cdc import ChangeSet
690
691        return ChangeSet(
692            self._cdc_query(
693                "ducklake_table_changes",
694                start_snapshot,
695                end_snapshot,
696                start_time=start_time,
697                end_time=end_time,
698                columns=columns,
699                filter_expr=filter_expr,
700                meta_cols=("snapshot_id", "rowid", "change_type"),
701            ),
702            change_type_col="change_type",
703        )
704
705    def table_insertions(
706        self,
707        start_snapshot: int | None = None,
708        end_snapshot: int | None = None,
709        *,
710        start_time: datetime | None = None,
711        end_time: datetime | None = None,
712        columns: tuple[str, ...] | list[str] | None = None,
713        filter_expr: str | None = None,
714    ) -> ChangeSet:
715        """Query inserted rows between two snapshots or timestamps."""
716        from pyducklake.cdc import ChangeSet
717
718        return ChangeSet(
719            self._cdc_query(
720                "ducklake_table_insertions",
721                start_snapshot,
722                end_snapshot,
723                start_time=start_time,
724                end_time=end_time,
725                columns=columns,
726                filter_expr=filter_expr,
727                meta_cols=(),
728            ),
729            change_type_col=None,
730        )
731
732    def table_deletions(
733        self,
734        start_snapshot: int | None = None,
735        end_snapshot: int | None = None,
736        *,
737        start_time: datetime | None = None,
738        end_time: datetime | None = None,
739        columns: tuple[str, ...] | list[str] | None = None,
740        filter_expr: str | None = None,
741    ) -> ChangeSet:
742        """Query deleted rows between two snapshots or timestamps."""
743        from pyducklake.cdc import ChangeSet
744
745        return ChangeSet(
746            self._cdc_query(
747                "ducklake_table_deletions",
748                start_snapshot,
749                end_snapshot,
750                start_time=start_time,
751                end_time=end_time,
752                columns=columns,
753                filter_expr=filter_expr,
754                meta_cols=(),
755            ),
756            change_type_col=None,
757        )
758
759    @staticmethod
760    def _validate_cdc_bounds(
761        start_snapshot: int | None,
762        end_snapshot: int | None,
763        start_time: datetime | None,
764        end_time: datetime | None,
765    ) -> None:
766        """Validate CDC bound arguments."""
767        has_snapshot = start_snapshot is not None or end_snapshot is not None
768        has_time = start_time is not None or end_time is not None
769
770        if has_snapshot and has_time:
771            raise ValueError(
772                "Cannot mix snapshot and timestamp bounds. "
773                "Use either (start_snapshot, end_snapshot) or (start_time, end_time)."
774            )
775
776        if not has_snapshot and not has_time:
777            raise ValueError("Must provide either (start_snapshot, end_snapshot) or (start_time, end_time).")
778
779        if has_snapshot and start_snapshot is None:
780            raise ValueError("start_snapshot is required when using snapshot bounds.")
781
782        if has_time and start_time is None:
783            raise ValueError("start_time is required when using timestamp bounds.")
784
785    def _cdc_query(
786        self,
787        func_name: str,
788        start_snapshot: int | None,
789        end_snapshot: int | None,
790        *,
791        start_time: datetime | None = None,
792        end_time: datetime | None = None,
793        columns: tuple[str, ...] | list[str] | None = None,
794        filter_expr: str | None = None,
795        meta_cols: tuple[str, ...] = (),
796    ) -> pa.Table:
797        """Execute a CDC function and return the result as an Arrow table."""
798        self._validate_cdc_bounds(start_snapshot, end_snapshot, start_time, end_time)
799
800        catalog_name = self._catalog.name
801        namespace = self._identifier[0]
802        table_name = self._identifier[1]
803
804        # Build bound arguments
805        if start_time is not None:
806            start_arg = f"'{start_time.strftime('%Y-%m-%d %H:%M:%S.%f')}'::TIMESTAMP"
807            if end_time is not None:
808                end_arg = f"'{end_time.strftime('%Y-%m-%d %H:%M:%S.%f')}'::TIMESTAMP"
809            else:
810                end_arg = "CURRENT_TIMESTAMP"
811        else:
812            assert start_snapshot is not None  # validated above
813            start_arg = f"{start_snapshot}::BIGINT"
814            if end_snapshot is not None:
815                end_arg = f"{end_snapshot}::BIGINT"
816            else:
817                snap = self.current_snapshot()
818                resolved_end = snap.snapshot_id if snap is not None else start_snapshot
819                end_arg = f"{resolved_end}::BIGINT"
820
821        # Build column list
822        if columns is not None:
823            col_list = ", ".join([f'"{c}"' for c in meta_cols] + [f'"{c}"' for c in columns])
824        else:
825            col_list = "*"
826
827        from pyducklake.catalog import escape_string_literal
828
829        esc_cat = escape_string_literal(catalog_name)
830        esc_ns = escape_string_literal(namespace)
831        esc_tbl = escape_string_literal(table_name)
832        sql = f"SELECT {col_list} FROM {func_name}('{esc_cat}', '{esc_ns}', '{esc_tbl}', {start_arg}, {end_arg})"
833
834        if filter_expr is not None:
835            sql += f" WHERE {filter_expr}"
836
837        result: Any = self._catalog.connection.execute(sql)
838        arrow_obj: Any = result.arrow()
839        if isinstance(arrow_obj, pa.Table):
840            return arrow_obj
841        tbl: pa.Table = arrow_obj.read_all()
842        return tbl
843
844    # -- Schema Evolution ------------------------------------------------------
845
846    def update_schema(self) -> UpdateSchema:
847        """Begin schema evolution. Returns an UpdateSchema builder."""
848        from pyducklake.schema_evolution import UpdateSchema
849
850        return UpdateSchema(self)
851
852    def __repr__(self) -> str:
853        return f"Table(identifier={self._identifier!r}, schema={self._schema!r})"
854
855    def __eq__(self, other: object) -> bool:
856        if not isinstance(other, Table):
857            return NotImplemented
858        return self._identifier == other._identifier and self._catalog.name == other._catalog.name

Represents a loaded Ducklake table.

Table( identifier: tuple[str, str], schema: Schema, catalog: Catalog, *, sort_order: SortOrder | None = None)
56    def __init__(
57        self,
58        identifier: tuple[str, str],
59        schema: Schema,
60        catalog: Catalog,
61        *,
62        sort_order: SortOrder | None = None,
63    ) -> None:
64        self._identifier = identifier
65        self._schema = schema
66        self._catalog = catalog
67        self._sort_order_cache: SortOrder | None = sort_order
name: str
69    @property
70    def name(self) -> str:
71        """Table name (without namespace)."""
72        return self._identifier[1]

Table name (without namespace).

namespace: str
74    @property
75    def namespace(self) -> str:
76        """Namespace (schema) name."""
77        return self._identifier[0]

Namespace (schema) name.

identifier: tuple[str, str]
79    @property
80    def identifier(self) -> tuple[str, str]:
81        """(namespace, table_name) tuple."""
82        return self._identifier

(namespace, table_name) tuple.

schema: Schema
84    @property
85    def schema(self) -> Schema:
86        """Current table schema."""
87        return self._schema

Current table schema.

catalog: Catalog
89    @property
90    def catalog(self) -> Catalog:
91        """The catalog this table belongs to."""
92        return self._catalog

The catalog this table belongs to.

fully_qualified_name: str
94    @property
95    def fully_qualified_name(self) -> str:
96        """catalog.namespace.table_name"""
97        return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1])

catalog.namespace.table_name

def current_snapshot(self) -> Snapshot | None:
 99    def current_snapshot(self) -> Snapshot | None:
100        """Get current snapshot info. Returns None if table has no data."""
101        snapshots = self.snapshots()
102        if not snapshots:
103            return None
104        return snapshots[-1]

Get current snapshot info. Returns None if table has no data.

def snapshots(self) -> list[Snapshot]:
106    def snapshots(self) -> list[Snapshot]:
107        """List all catalog-level snapshots.
108
109        Ducklake snapshots are catalog-wide (not per-table). Each snapshot
110        represents a transaction that may have modified any table in the
111        catalog. The table may not have changed in every returned snapshot.
112        """
113        catalog_name = self._catalog.name
114        meta_schema = f"__ducklake_metadata_{catalog_name}"
115        try:
116            rows: list[tuple[Any, ...]] = self._catalog.fetchall(
117                f"SELECT snapshot_id, snapshot_time, schema_version "
118                f'FROM "{meta_schema}".ducklake_snapshot '
119                f"ORDER BY snapshot_id"
120            )
121        except (duckdb.CatalogException, duckdb.BinderException):
122            return []
123
124        snapshots: list[Snapshot] = []
125        for row in rows:
126            snapshot_id = int(row[0])
127            ts = row[1]
128            if isinstance(ts, datetime):
129                timestamp = ts
130            else:
131                timestamp = datetime.fromtimestamp(0, tz=timezone.utc)
132
133            schema_version = int(row[2]) if len(row) > 2 and row[2] is not None else None
134
135            snapshots.append(
136                Snapshot(
137                    snapshot_id=snapshot_id,
138                    timestamp=timestamp,
139                    schema_version=schema_version,
140                )
141            )
142        return snapshots

List all catalog-level snapshots.

Ducklake snapshots are catalog-wide (not per-table). Each snapshot represents a transaction that may have modified any table in the catalog. The table may not have changed in every returned snapshot.

def refresh(self) -> Table:
144    def refresh(self) -> Table:
145        """Reload schema and metadata from the catalog. Returns self."""
146        self._schema = self._catalog.build_schema_from_describe(self._identifier[0], self._identifier[1])
147        self._sort_order_cache = None
148        return self

Reload schema and metadata from the catalog. Returns self.

def rollback_to_snapshot(self, snapshot_id: int) -> None:
152    def rollback_to_snapshot(self, snapshot_id: int) -> None:
153        """Roll back the table to a previous snapshot.
154
155        This creates a new snapshot that matches the state at the given snapshot_id.
156        Data written after that snapshot becomes inaccessible (but files aren't
157        deleted until maintenance operations run).
158
159        Args:
160            snapshot_id: The snapshot ID to roll back to.
161
162        Raises:
163            ValueError: If snapshot_id doesn't exist.
164        """
165        known_ids = {s.snapshot_id for s in self.snapshots()}
166        if snapshot_id not in known_ids:
167            raise ValueError(f"Snapshot {snapshot_id} does not exist")
168
169        fqn = self.fully_qualified_name
170        conn = self._catalog.connection
171        conn.execute(
172            f"CREATE OR REPLACE TEMP TABLE _pyducklake_rollback AS SELECT * FROM {fqn} AT (VERSION => {snapshot_id})"
173        )
174        conn.execute(f"DELETE FROM {fqn}")
175        conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_rollback")
176        conn.execute("DROP TABLE _pyducklake_rollback")

Roll back the table to a previous snapshot.

This creates a new snapshot that matches the state at the given snapshot_id. Data written after that snapshot becomes inaccessible (but files aren't deleted until maintenance operations run).

Args: snapshot_id: The snapshot ID to roll back to.

Raises: ValueError: If snapshot_id doesn't exist.

def rollback_to_timestamp(self, timestamp: datetime.datetime) -> None:
178    def rollback_to_timestamp(self, timestamp: datetime) -> None:
179        """Roll back the table to the state at a given timestamp.
180
181        Finds the latest snapshot at or before the timestamp and rolls back to it.
182
183        Args:
184            timestamp: The point in time to roll back to.
185
186        Raises:
187            ValueError: If no snapshot exists at or before the timestamp.
188        """
189        snapshots = self.snapshots()
190        # Normalize timestamp once to avoid mutating across iterations
191        ts = timestamp
192        candidate: Snapshot | None = None
193        for snap in snapshots:
194            snap_ts = snap.timestamp
195            # Make both offset-aware or both naive for comparison
196            if ts.tzinfo is not None and snap_ts.tzinfo is None:
197                snap_ts = snap_ts.replace(tzinfo=ts.tzinfo)
198            elif ts.tzinfo is None and snap_ts.tzinfo is not None:
199                ts = ts.replace(tzinfo=snap_ts.tzinfo)
200            if snap_ts <= ts:
201                candidate = snap
202        if candidate is None:
203            raise ValueError(f"No snapshot exists at or before {timestamp}")
204        self.rollback_to_snapshot(candidate.snapshot_id)

Roll back the table to the state at a given timestamp.

Finds the latest snapshot at or before the timestamp and rolls back to it.

Args: timestamp: The point in time to roll back to.

Raises: ValueError: If no snapshot exists at or before the timestamp.

def scan( self, row_filter: BooleanExpression | str = AlwaysTrue(), selected_fields: tuple[str, ...] = ('*',), snapshot_id: int | None = None, limit: int | None = None) -> DataScan:
208    def scan(
209        self,
210        row_filter: BooleanExpression | str = AlwaysTrue(),
211        selected_fields: tuple[str, ...] = ("*",),
212        snapshot_id: int | None = None,
213        limit: int | None = None,
214    ) -> DataScan:
215        """Create a scan on this table.
216
217        Examples:
218
219        ```pycon
220        >>> import tempfile, os, pyarrow as pa
221        >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
222        >>> tmp = tempfile.mkdtemp()
223        >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
224        >>> table = cat.create_table("users", Schema.of(
225        ...     required("id", IntegerType()), optional("name", StringType()),
226        ... ))
227        >>> table.append(pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]}))
228        >>> table.scan().count()
229        3
230        >>> table.scan().select("name").to_arrow().column("name").to_pylist()
231        ['alice', 'bob', 'carol']
232        >>> table.scan('"id" > 1').count()
233        2
234
235        ```
236        """
237        from pyducklake.scan import DataScan, RawSQL
238
239        if isinstance(row_filter, str):
240            actual_filter: BooleanExpression = RawSQL(row_filter)
241        else:
242            actual_filter = row_filter
243
244        return DataScan(
245            table=self,
246            row_filter=actual_filter,
247            selected_fields=selected_fields,
248            snapshot_id=snapshot_id,
249            limit=limit,
250        )

Create a scan on this table.

Examples:

>>> import tempfile, os, pyarrow as pa
>>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
>>> table = cat.create_table("users", Schema.of(
...     required("id", IntegerType()), optional("name", StringType()),
... ))
>>> table.append(pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]}))
>>> table.scan().count()
3
>>> table.scan().select("name").to_arrow().column("name").to_pylist()
['alice', 'bob', 'carol']
>>> table.scan('"id" > 1').count()
2
def add_files( self, file_paths: list[str] | str, *, allow_missing: bool = False, ignore_extra_columns: bool = False) -> None:
254    def add_files(
255        self,
256        file_paths: list[str] | str,
257        *,
258        allow_missing: bool = False,
259        ignore_extra_columns: bool = False,
260    ) -> None:
261        """Register external Parquet files with this table.
262
263        Args:
264            file_paths: Path(s) to Parquet files to register.
265            allow_missing: If True, columns present in the table but
266                missing from the file are filled with their initial
267                default value.
268            ignore_extra_columns: If True, extra columns in the file
269                that aren't in the table are silently ignored.
270
271        The files must be valid Parquet files with a compatible schema.
272        """
273        from pyducklake.catalog import escape_string_literal
274
275        if isinstance(file_paths, str):
276            file_paths = [file_paths]
277
278        catalog_name = self._catalog.name
279        namespace = self._identifier[0]
280        table_name = self._identifier[1]
281
282        opts = ""
283        if namespace != "main":
284            opts += f", schema := '{escape_string_literal(namespace)}'"
285        if allow_missing:
286            opts += ", allow_missing := true"
287        if ignore_extra_columns:
288            opts += ", ignore_extra_columns := true"
289
290        esc_cat = escape_string_literal(catalog_name)
291        esc_tbl = escape_string_literal(table_name)
292        for path in file_paths:
293            esc_path = escape_string_literal(path)
294            self._catalog.connection.execute(
295                f"CALL ducklake_add_data_files('{esc_cat}', '{esc_tbl}', '{esc_path}'{opts})"
296            )

Register external Parquet files with this table.

Args: file_paths: Path(s) to Parquet files to register. allow_missing: If True, columns present in the table but missing from the file are filled with their initial default value. ignore_extra_columns: If True, extra columns in the file that aren't in the table are silently ignored.

The files must be valid Parquet files with a compatible schema.

def append( self, df: pyarrow.lib.Table | ArrowStreamExportable) -> None:
321    def append(self, df: ArrowCompatible) -> None:
322        """Append data to the table.
323
324        Accepts ``pyarrow.Table`` or any object implementing the Arrow
325        PyCapsule interface (``__arrow_c_stream__``).
326
327        If the table has a sort order, data is sorted before insertion.
328
329        Examples:
330
331        ```pycon
332        >>> import tempfile, os, pyarrow as pa
333        >>> from pyducklake import Catalog, Schema, required, IntegerType
334        >>> tmp = tempfile.mkdtemp()
335        >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
336        >>> table = cat.create_table("nums", Schema.of(required("n", IntegerType())))
337        >>> table.append(pa.table({"n": [1, 2, 3]}))
338        >>> table.append(pa.table({"n": [4, 5]}))
339        >>> table.scan().count()
340        5
341
342        ```
343        """
344        arrow_table = self._to_arrow_table(df)
345        order = self._sort_order_clause()
346        conn = self._catalog.connection
347        conn.register("_pyducklake_tmp_append", arrow_table)
348        try:
349            fqn = self.fully_qualified_name
350            if order:
351                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append ORDER BY {order}")
352            else:
353                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append")
354        finally:
355            conn.unregister("_pyducklake_tmp_append")

Append data to the table.

Accepts pyarrow.Table or any object implementing the Arrow PyCapsule interface (__arrow_c_stream__).

If the table has a sort order, data is sorted before insertion.

Examples:

>>> import tempfile, os, pyarrow as pa
>>> from pyducklake import Catalog, Schema, required, IntegerType
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
>>> table = cat.create_table("nums", Schema.of(required("n", IntegerType())))
>>> table.append(pa.table({"n": [1, 2, 3]}))
>>> table.append(pa.table({"n": [4, 5]}))
>>> table.scan().count()
5
def append_batches( self, batches: pyarrow.lib.RecordBatchReader | Iterator[pyarrow.lib.RecordBatch], *, schema: pyarrow.lib.Schema | None = None) -> None:
357    def append_batches(
358        self,
359        batches: pa.RecordBatchReader | Iterator[pa.RecordBatch],
360        *,
361        schema: pa.Schema | None = None,
362    ) -> None:
363        """Append data from a stream of record batches.
364
365        Memory-efficient alternative to :meth:`append` — processes batches
366        without materializing the full dataset in memory.
367
368        Args:
369            batches: RecordBatchReader or iterator of RecordBatch.
370            schema: Required if passing an iterator (not needed for RecordBatchReader).
371        """
372        if isinstance(batches, pa.RecordBatchReader):
373            reader = batches
374        else:
375            if schema is None:
376                raise ValueError("schema is required when passing an iterator of RecordBatch")
377            reader = pa.RecordBatchReader.from_batches(schema, batches)
378
379        order = self._sort_order_clause()
380        conn = self._catalog.connection
381        conn.register("_pyducklake_tmp_batches", reader)
382        try:
383            fqn = self.fully_qualified_name
384            if order:
385                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches ORDER BY {order}")
386            else:
387                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches")
388        finally:
389            conn.unregister("_pyducklake_tmp_batches")

Append data from a stream of record batches.

Memory-efficient alternative to append() — processes batches without materializing the full dataset in memory.

Args: batches: RecordBatchReader or iterator of RecordBatch. schema: Required if passing an iterator (not needed for RecordBatchReader).

def overwrite( self, df: pyarrow.lib.Table | ArrowStreamExportable, overwrite_filter: BooleanExpression | str = AlwaysTrue()) -> None:
391    def overwrite(
392        self,
393        df: ArrowCompatible,
394        overwrite_filter: BooleanExpression | str = AlwaysTrue(),
395    ) -> None:
396        """Overwrite data matching the filter, then insert new data.
397
398        Accepts ``pyarrow.Table`` or any object implementing the Arrow
399        PyCapsule interface (``__arrow_c_stream__``).
400
401        If overwrite_filter is AlwaysTrue or not provided, truncates and inserts.
402        Otherwise, deletes matching rows then inserts.
403
404        Does not manage its own transaction — relies on the caller (or DuckDB
405        auto-commit) for atomicity.  This allows overwrite to be used inside
406        an explicit :class:`Transaction` without nesting conflicts.
407        """
408        arrow_table = self._to_arrow_table(df)
409        order = self._sort_order_clause()
410        conn = self._catalog.connection
411        conn.register("_pyducklake_tmp_overwrite", arrow_table)
412        try:
413            if isinstance(overwrite_filter, str):
414                where: str | None = overwrite_filter
415            elif overwrite_filter == AlwaysTrue():
416                where = None
417            else:
418                where = overwrite_filter.to_sql()
419
420            fqn = self.fully_qualified_name
421            if where:
422                conn.execute(f"DELETE FROM {fqn} WHERE {where}")
423            else:
424                conn.execute(f"DELETE FROM {fqn}")
425
426            if order:
427                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite ORDER BY {order}")
428            else:
429                conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite")
430        finally:
431            conn.unregister("_pyducklake_tmp_overwrite")

Overwrite data matching the filter, then insert new data.

Accepts pyarrow.Table or any object implementing the Arrow PyCapsule interface (__arrow_c_stream__).

If overwrite_filter is AlwaysTrue or not provided, truncates and inserts. Otherwise, deletes matching rows then inserts.

Does not manage its own transaction — relies on the caller (or DuckDB auto-commit) for atomicity. This allows overwrite to be used inside an explicit Transaction without nesting conflicts.

def delete( self, delete_filter: BooleanExpression | str) -> None:
435    def delete(self, delete_filter: BooleanExpression | str) -> None:
436        """Delete rows matching the filter.
437
438        Args:
439            delete_filter: Rows matching this filter are deleted.
440        """
441        if isinstance(delete_filter, str):
442            filter_sql = delete_filter
443        elif delete_filter == AlwaysFalse():
444            return
445        elif delete_filter == AlwaysTrue():
446            filter_sql = None
447        else:
448            filter_sql = delete_filter.to_sql()
449
450        fqn = self.fully_qualified_name
451        if filter_sql is None:
452            self._catalog.connection.execute(f"DELETE FROM {fqn}")
453        else:
454            self._catalog.connection.execute(f"DELETE FROM {fqn} WHERE {filter_sql}")

Delete rows matching the filter.

Args: delete_filter: Rows matching this filter are deleted.

def upsert( self, df: pyarrow.lib.Table | ArrowStreamExportable, join_cols: tuple[str, ...] | list[str]) -> UpsertResult:
458    def upsert(
459        self,
460        df: ArrowCompatible,
461        join_cols: tuple[str, ...] | list[str],
462    ) -> UpsertResult:
463        """Upsert data: update existing rows matching on join_cols, insert new rows.
464
465        Accepts ``pyarrow.Table`` or any object implementing the Arrow
466        PyCapsule interface (``__arrow_c_stream__``).
467
468        Uses DuckDB's MERGE statement.
469        """
470        arrow_table = self._to_arrow_table(df)
471        conn = self._catalog.connection
472        fqn = self.fully_qualified_name
473        join_col_set = set(join_cols)
474
475        # Count rows before to determine updated vs inserted
476        count_before = self.scan().count()
477
478        conn.register("_pyducklake_tmp_upsert", arrow_table)
479        try:
480            on_clause = " AND ".join(f'target."{col}" = source."{col}"' for col in join_cols)
481
482            all_cols = [field.name for field in arrow_table.schema]  # pyright: ignore[reportUnknownVariableType]
483            non_join_cols = [c for c in all_cols if c not in join_col_set]
484
485            update_set = ", ".join(f'"{col}" = source."{col}"' for col in non_join_cols)
486
487            insert_cols = ", ".join(f'source."{col}"' for col in all_cols)
488
489            merge_sql = f"MERGE INTO {fqn} AS target USING _pyducklake_tmp_upsert AS source ON {on_clause}"
490            if update_set:
491                merge_sql += f" WHEN MATCHED THEN UPDATE SET {update_set}"
492            merge_sql += f" WHEN NOT MATCHED THEN INSERT VALUES ({insert_cols})"
493
494            conn.execute(merge_sql)
495        finally:
496            conn.unregister("_pyducklake_tmp_upsert")
497
498        count_after = self.scan().count()
499        rows_inserted = count_after - count_before
500        rows_updated = arrow_table.num_rows - rows_inserted
501
502        return UpsertResult(rows_updated=rows_updated, rows_inserted=rows_inserted)

Upsert data: update existing rows matching on join_cols, insert new rows.

Accepts pyarrow.Table or any object implementing the Arrow PyCapsule interface (__arrow_c_stream__).

Uses DuckDB's MERGE statement.

spec: PartitionSpec
506    @property
507    def spec(self) -> PartitionSpec:
508        """Current partition spec. Returns UNPARTITIONED if not partitioned."""
509        from pyducklake.catalog import escape_string_literal
510        from pyducklake.partitioning import (
511            DAY,
512            HOUR,
513            IDENTITY,
514            MONTH,
515            UNPARTITIONED,
516            YEAR,
517            PartitionField,
518            PartitionSpec,
519        )
520
521        catalog_name = self._catalog.name
522        meta_schema = f"__ducklake_metadata_{catalog_name}"
523        try:
524            rows: list[tuple[Any, ...]] = self._catalog.fetchall(
525                f"SELECT c.column_name, pc.transform "
526                f'FROM "{meta_schema}".ducklake_partition_column pc '
527                f'JOIN "{meta_schema}".ducklake_partition_info pi '
528                f"ON pc.partition_id = pi.partition_id "
529                f"AND pc.table_id = pi.table_id "
530                f'JOIN "{meta_schema}".ducklake_table t '
531                f"ON pi.table_id = t.table_id "
532                f'JOIN "{meta_schema}".ducklake_schema s '
533                f"ON t.schema_id = s.schema_id "
534                f'JOIN "{meta_schema}".ducklake_column c '
535                f"ON pc.column_id = c.column_id "
536                f"AND pc.table_id = c.table_id "
537                f"WHERE t.table_name = '{escape_string_literal(self._identifier[1])}' "
538                f"AND s.schema_name = '{escape_string_literal(self._identifier[0])}' "
539                f"AND pi.end_snapshot IS NULL "
540                f"ORDER BY pc.partition_key_index"
541            )
542        except (duckdb.CatalogException, duckdb.BinderException):
543            return UNPARTITIONED
544
545        if not rows:
546            return UNPARTITIONED
547
548        _transform_map = {
549            "identity": IDENTITY,
550            "year": YEAR,
551            "month": MONTH,
552            "day": DAY,
553            "hour": HOUR,
554        }
555
556        fields: list[PartitionField] = []
557        for col_name, transform_name in rows:
558            transform = _transform_map.get(str(transform_name).lower(), IDENTITY)
559            fields.append(PartitionField(source_column=str(col_name), transform=transform))
560
561        return PartitionSpec(*fields)

Current partition spec. Returns UNPARTITIONED if not partitioned.

def update_spec(self) -> UpdateSpec:
563    def update_spec(self) -> UpdateSpec:
564        """Begin partition spec evolution. Returns an UpdateSpec builder."""
565        from pyducklake.partitioning import UpdateSpec
566
567        return UpdateSpec(self)

Begin partition spec evolution. Returns an UpdateSpec builder.

sort_order: SortOrder
571    @property
572    def sort_order(self) -> SortOrder:
573        """Current sort order. Returns UNSORTED if not sorted."""
574        if self._sort_order_cache is not None:
575            return self._sort_order_cache
576
577        result = self._fetch_sort_order()
578        self._sort_order_cache = result
579        return result

Current sort order. Returns UNSORTED if not sorted.

def update_sort_order(self) -> UpdateSortOrder:
632    def update_sort_order(self) -> UpdateSortOrder:
633        """Begin sort order evolution. Returns an UpdateSortOrder builder."""
634        from pyducklake.sorting import UpdateSortOrder
635
636        return UpdateSortOrder(self)

Begin sort order evolution. Returns an UpdateSortOrder builder.

def to_arrow_dataset(self, *, snapshot_id: int | None = None) -> pyarrow._dataset.Dataset:
640    def to_arrow_dataset(self, *, snapshot_id: int | None = None) -> ds.Dataset:
641        """Return this table as a PyArrow Dataset.
642
643        Enables interop with engines that consume the PyArrow dataset API
644        (DuckDB, Polars, DataFusion, Dask, etc.).
645
646        Args:
647            snapshot_id: Optional snapshot for time travel.
648
649        Returns:
650            A pyarrow.dataset.Dataset wrapping this table's data.
651        """
652        import pyarrow.dataset as ds
653
654        scan = self.scan(snapshot_id=snapshot_id)
655        return ds.dataset(scan.to_arrow())  # pyright: ignore[reportUnknownMemberType]

Return this table as a PyArrow Dataset.

Enables interop with engines that consume the PyArrow dataset API (DuckDB, Polars, DataFusion, Dask, etc.).

Args: snapshot_id: Optional snapshot for time travel.

Returns: A pyarrow.dataset.Dataset wrapping this table's data.

def inspect(self) -> InspectTable:
659    def inspect(self) -> InspectTable:
660        """Return an :class:`InspectTable` for metadata introspection."""
661        from pyducklake.inspect import InspectTable
662
663        return InspectTable(self)

Return an InspectTable for metadata introspection.

def maintenance(self) -> MaintenanceTable:
667    def maintenance(self) -> MaintenanceTable:
668        """Get maintenance operations for this table."""
669        from pyducklake.maintenance import MaintenanceTable
670
671        return MaintenanceTable(self)

Get maintenance operations for this table.

def table_changes( self, start_snapshot: int | None = None, end_snapshot: int | None = None, *, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, columns: tuple[str, ...] | list[str] | None = None, filter_expr: str | None = None) -> ChangeSet:
675    def table_changes(
676        self,
677        start_snapshot: int | None = None,
678        end_snapshot: int | None = None,
679        *,
680        start_time: datetime | None = None,
681        end_time: datetime | None = None,
682        columns: tuple[str, ...] | list[str] | None = None,
683        filter_expr: str | None = None,
684    ) -> ChangeSet:
685        """Query all changes between two snapshots or timestamps.
686
687        Returns a ChangeSet with inserts, deletes, and update pre/post images.
688        """
689        from pyducklake.cdc import ChangeSet
690
691        return ChangeSet(
692            self._cdc_query(
693                "ducklake_table_changes",
694                start_snapshot,
695                end_snapshot,
696                start_time=start_time,
697                end_time=end_time,
698                columns=columns,
699                filter_expr=filter_expr,
700                meta_cols=("snapshot_id", "rowid", "change_type"),
701            ),
702            change_type_col="change_type",
703        )

Query all changes between two snapshots or timestamps.

Returns a ChangeSet with inserts, deletes, and update pre/post images.

def table_insertions( self, start_snapshot: int | None = None, end_snapshot: int | None = None, *, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, columns: tuple[str, ...] | list[str] | None = None, filter_expr: str | None = None) -> ChangeSet:
705    def table_insertions(
706        self,
707        start_snapshot: int | None = None,
708        end_snapshot: int | None = None,
709        *,
710        start_time: datetime | None = None,
711        end_time: datetime | None = None,
712        columns: tuple[str, ...] | list[str] | None = None,
713        filter_expr: str | None = None,
714    ) -> ChangeSet:
715        """Query inserted rows between two snapshots or timestamps."""
716        from pyducklake.cdc import ChangeSet
717
718        return ChangeSet(
719            self._cdc_query(
720                "ducklake_table_insertions",
721                start_snapshot,
722                end_snapshot,
723                start_time=start_time,
724                end_time=end_time,
725                columns=columns,
726                filter_expr=filter_expr,
727                meta_cols=(),
728            ),
729            change_type_col=None,
730        )

Query inserted rows between two snapshots or timestamps.

def table_deletions( self, start_snapshot: int | None = None, end_snapshot: int | None = None, *, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, columns: tuple[str, ...] | list[str] | None = None, filter_expr: str | None = None) -> ChangeSet:
732    def table_deletions(
733        self,
734        start_snapshot: int | None = None,
735        end_snapshot: int | None = None,
736        *,
737        start_time: datetime | None = None,
738        end_time: datetime | None = None,
739        columns: tuple[str, ...] | list[str] | None = None,
740        filter_expr: str | None = None,
741    ) -> ChangeSet:
742        """Query deleted rows between two snapshots or timestamps."""
743        from pyducklake.cdc import ChangeSet
744
745        return ChangeSet(
746            self._cdc_query(
747                "ducklake_table_deletions",
748                start_snapshot,
749                end_snapshot,
750                start_time=start_time,
751                end_time=end_time,
752                columns=columns,
753                filter_expr=filter_expr,
754                meta_cols=(),
755            ),
756            change_type_col=None,
757        )

Query deleted rows between two snapshots or timestamps.

def update_schema(self) -> UpdateSchema:
846    def update_schema(self) -> UpdateSchema:
847        """Begin schema evolution. Returns an UpdateSchema builder."""
848        from pyducklake.schema_evolution import UpdateSchema
849
850        return UpdateSchema(self)

Begin schema evolution. Returns an UpdateSchema builder.

class TableAlreadyExistsError(pyducklake.DucklakeError):
27class TableAlreadyExistsError(DucklakeError):
28    """Raised when attempting to create a table that already exists."""

Raised when attempting to create a table that already exists.

class View:
 22class View:
 23    """Represents a Ducklake view."""
 24
 25    def __init__(
 26        self,
 27        identifier: tuple[str, str],
 28        schema: Schema,
 29        sql: str,
 30        catalog: Catalog,
 31    ) -> None:
 32        self._identifier = identifier
 33        self._schema = schema
 34        self._sql = sql
 35        self._catalog = catalog
 36
 37    @property
 38    def name(self) -> str:
 39        """View name (without namespace)."""
 40        return self._identifier[1]
 41
 42    @property
 43    def namespace(self) -> str:
 44        """Namespace (schema) name."""
 45        return self._identifier[0]
 46
 47    @property
 48    def identifier(self) -> tuple[str, str]:
 49        """(namespace, view_name) tuple."""
 50        return self._identifier
 51
 52    @property
 53    def schema(self) -> Schema:
 54        """View's output schema."""
 55        return self._schema
 56
 57    @property
 58    def sql_text(self) -> str:
 59        """The SQL definition of the view."""
 60        return self._sql
 61
 62    @property
 63    def fully_qualified_name(self) -> str:
 64        """catalog.namespace.view_name"""
 65        return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1])
 66
 67    @property
 68    def catalog(self) -> Catalog:
 69        """The catalog this view belongs to."""
 70        return self._catalog
 71
 72    def scan(
 73        self,
 74        row_filter: BooleanExpression | str = AlwaysTrue(),
 75        selected_fields: tuple[str, ...] = ("*",),
 76        limit: int | None = None,
 77    ) -> DataScan:
 78        """Create a scan on this view.
 79
 80        Works exactly like Table.scan() -- the view is queryable
 81        with filters, projections, and limits.
 82        """
 83        from pyducklake.scan import DataScan, RawSQL
 84
 85        if isinstance(row_filter, str):
 86            actual_filter: BooleanExpression = RawSQL(row_filter)
 87        else:
 88            actual_filter = row_filter
 89
 90        return DataScan(
 91            table=self,
 92            row_filter=actual_filter,
 93            selected_fields=selected_fields,
 94            limit=limit,
 95        )
 96
 97    def to_arrow(self) -> pa.Table:
 98        """Read the entire view as an Arrow table. Shorthand for scan().to_arrow()."""
 99        return self.scan().to_arrow()
100
101    def to_pandas(self) -> pd.DataFrame:
102        """Read the entire view as a pandas DataFrame."""
103        return self.scan().to_pandas()
104
105    def to_arrow_dataset(self) -> ds.Dataset:
106        """Return view results as a PyArrow Dataset."""
107        return self.scan().to_arrow_dataset()
108
109    def refresh(self) -> View:
110        """Reload schema from catalog. Returns self."""
111        self._schema = self._catalog.build_schema_from_describe(self._identifier[0], self._identifier[1])
112        return self
113
114    def __repr__(self) -> str:
115        return f"View(identifier={self._identifier!r}, sql={self._sql!r})"
116
117    def __eq__(self, other: object) -> bool:
118        if not isinstance(other, View):
119            return NotImplemented
120        return self._identifier == other._identifier and self._catalog.name == other._catalog.name

Represents a Ducklake view.

View( identifier: tuple[str, str], schema: Schema, sql: str, catalog: Catalog)
25    def __init__(
26        self,
27        identifier: tuple[str, str],
28        schema: Schema,
29        sql: str,
30        catalog: Catalog,
31    ) -> None:
32        self._identifier = identifier
33        self._schema = schema
34        self._sql = sql
35        self._catalog = catalog
name: str
37    @property
38    def name(self) -> str:
39        """View name (without namespace)."""
40        return self._identifier[1]

View name (without namespace).

namespace: str
42    @property
43    def namespace(self) -> str:
44        """Namespace (schema) name."""
45        return self._identifier[0]

Namespace (schema) name.

identifier: tuple[str, str]
47    @property
48    def identifier(self) -> tuple[str, str]:
49        """(namespace, view_name) tuple."""
50        return self._identifier

(namespace, view_name) tuple.

schema: Schema
52    @property
53    def schema(self) -> Schema:
54        """View's output schema."""
55        return self._schema

View's output schema.

sql_text: str
57    @property
58    def sql_text(self) -> str:
59        """The SQL definition of the view."""
60        return self._sql

The SQL definition of the view.

fully_qualified_name: str
62    @property
63    def fully_qualified_name(self) -> str:
64        """catalog.namespace.view_name"""
65        return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1])

catalog.namespace.view_name

catalog: Catalog
67    @property
68    def catalog(self) -> Catalog:
69        """The catalog this view belongs to."""
70        return self._catalog

The catalog this view belongs to.

def scan( self, row_filter: BooleanExpression | str = AlwaysTrue(), selected_fields: tuple[str, ...] = ('*',), limit: int | None = None) -> DataScan:
72    def scan(
73        self,
74        row_filter: BooleanExpression | str = AlwaysTrue(),
75        selected_fields: tuple[str, ...] = ("*",),
76        limit: int | None = None,
77    ) -> DataScan:
78        """Create a scan on this view.
79
80        Works exactly like Table.scan() -- the view is queryable
81        with filters, projections, and limits.
82        """
83        from pyducklake.scan import DataScan, RawSQL
84
85        if isinstance(row_filter, str):
86            actual_filter: BooleanExpression = RawSQL(row_filter)
87        else:
88            actual_filter = row_filter
89
90        return DataScan(
91            table=self,
92            row_filter=actual_filter,
93            selected_fields=selected_fields,
94            limit=limit,
95        )

Create a scan on this view.

Works exactly like Table.scan() -- the view is queryable with filters, projections, and limits.

def to_arrow(self) -> pyarrow.lib.Table:
97    def to_arrow(self) -> pa.Table:
98        """Read the entire view as an Arrow table. Shorthand for scan().to_arrow()."""
99        return self.scan().to_arrow()

Read the entire view as an Arrow table. Shorthand for scan().to_arrow().

def to_pandas(self) -> pandas.DataFrame:
101    def to_pandas(self) -> pd.DataFrame:
102        """Read the entire view as a pandas DataFrame."""
103        return self.scan().to_pandas()

Read the entire view as a pandas DataFrame.

def to_arrow_dataset(self) -> pyarrow._dataset.Dataset:
105    def to_arrow_dataset(self) -> ds.Dataset:
106        """Return view results as a PyArrow Dataset."""
107        return self.scan().to_arrow_dataset()

Return view results as a PyArrow Dataset.

def refresh(self) -> View:
109    def refresh(self) -> View:
110        """Reload schema from catalog. Returns self."""
111        self._schema = self._catalog.build_schema_from_describe(self._identifier[0], self._identifier[1])
112        return self

Reload schema from catalog. Returns self.

class ViewAlreadyExistsError(pyducklake.DucklakeError):
51class ViewAlreadyExistsError(DucklakeError):
52    """Raised when attempting to create a view that already exists."""

Raised when attempting to create a view that already exists.

class Transaction:
17class Transaction:
18    """Multi-operation transaction on a Ducklake catalog.
19
20    Groups multiple write operations (append, overwrite, delete, schema changes)
21    into a single atomic commit = single snapshot.
22
23    Usage:
24        with catalog.begin_transaction() as txn:
25            tbl = txn.load_table("my_table")
26            tbl.append(df1)
27            tbl.delete(EqualTo("status", "old"))
28
29            tbl2 = txn.load_table("other_table")
30            tbl2.append(df2)
31        # auto-commits on clean exit, rolls back on exception
32
33    Or without context manager:
34        txn = catalog.begin_transaction()
35        # ... operations ...
36        txn.commit()  # or txn.rollback()
37    """
38
39    def __init__(self, catalog: Catalog) -> None:
40        """Begin a transaction on the catalog's connection."""
41        self._catalog = catalog
42        self._committed = False
43        self._rolled_back = False
44        self._catalog.connection.execute("BEGIN TRANSACTION")
45
46    def load_table(self, identifier: str | tuple[str, str]) -> Table:
47        """Load a table within this transaction context."""
48        return self._catalog.load_table(identifier)
49
50    def commit(self) -> None:
51        """Commit the transaction."""
52        if self._committed or self._rolled_back:
53            raise DucklakeError("Transaction already finalized")
54        self._catalog.connection.execute("COMMIT")
55        self._committed = True
56
57    def rollback(self) -> None:
58        """Roll back the transaction."""
59        if self._committed or self._rolled_back:
60            raise DucklakeError("Transaction already finalized")
61        self._catalog.connection.execute("ROLLBACK")
62        self._rolled_back = True
63
64    @property
65    def is_active(self) -> bool:
66        """True if the transaction has not been committed or rolled back."""
67        return not self._committed and not self._rolled_back
68
69    def __enter__(self) -> Transaction:
70        return self
71
72    def __exit__(
73        self,
74        exc_type: type[BaseException] | None,
75        exc_val: BaseException | None,
76        exc_tb: object,
77    ) -> None:
78        if self._committed or self._rolled_back:
79            return
80        if exc_type is not None:
81            self.rollback()
82        else:
83            self.commit()

Multi-operation transaction on a Ducklake catalog.

Groups multiple write operations (append, overwrite, delete, schema changes) into a single atomic commit = single snapshot.

Usage: with catalog.begin_transaction() as txn: tbl = txn.load_table("my_table") tbl.append(df1) tbl.delete(EqualTo("status", "old"))

    tbl2 = txn.load_table("other_table")
    tbl2.append(df2)
# auto-commits on clean exit, rolls back on exception

Or without context manager: txn = catalog.begin_transaction() # ... operations ... txn.commit() # or txn.rollback()

Transaction(catalog: Catalog)
39    def __init__(self, catalog: Catalog) -> None:
40        """Begin a transaction on the catalog's connection."""
41        self._catalog = catalog
42        self._committed = False
43        self._rolled_back = False
44        self._catalog.connection.execute("BEGIN TRANSACTION")

Begin a transaction on the catalog's connection.

def load_table(self, identifier: str | tuple[str, str]) -> Table:
46    def load_table(self, identifier: str | tuple[str, str]) -> Table:
47        """Load a table within this transaction context."""
48        return self._catalog.load_table(identifier)

Load a table within this transaction context.

def commit(self) -> None:
50    def commit(self) -> None:
51        """Commit the transaction."""
52        if self._committed or self._rolled_back:
53            raise DucklakeError("Transaction already finalized")
54        self._catalog.connection.execute("COMMIT")
55        self._committed = True

Commit the transaction.

def rollback(self) -> None:
57    def rollback(self) -> None:
58        """Roll back the transaction."""
59        if self._committed or self._rolled_back:
60            raise DucklakeError("Transaction already finalized")
61        self._catalog.connection.execute("ROLLBACK")
62        self._rolled_back = True

Roll back the transaction.

is_active: bool
64    @property
65    def is_active(self) -> bool:
66        """True if the transaction has not been committed or rolled back."""
67        return not self._committed and not self._rolled_back

True if the transaction has not been committed or rolled back.

class UpdateSchema:
 52class UpdateSchema:
 53    """Builder for schema evolution operations.
 54
 55    Obtained via table.update_schema(). Collects changes, applies on commit().
 56    """
 57
 58    __slots__ = ("_table", "_changes")
 59
 60    def __init__(self, table: Table) -> None:
 61        self._table = table
 62        self._changes: list[_SchemaChange] = []
 63
 64    def add_column(
 65        self,
 66        name: str,
 67        field_type: DucklakeType,
 68        doc: str | None = None,
 69        required: bool = False,
 70    ) -> UpdateSchema:
 71        """Add a column. Returns self for chaining."""
 72        self._changes.append(_AddColumn(name=name, field_type=field_type, doc=doc, required=required))
 73        return self
 74
 75    def drop_column(self, name: str) -> UpdateSchema:
 76        """Drop a column. Returns self for chaining."""
 77        self._changes.append(_DropColumn(name=name))
 78        return self
 79
 80    def rename_column(self, name: str, new_name: str) -> UpdateSchema:
 81        """Rename a column. Returns self for chaining."""
 82        self._changes.append(_RenameColumn(name=name, new_name=new_name))
 83        return self
 84
 85    def update_column(self, name: str, new_type: DucklakeType) -> UpdateSchema:
 86        """Change column type (lossless promotions only). Returns self for chaining."""
 87        self._changes.append(_UpdateColumnType(name=name, new_type=new_type))
 88        return self
 89
 90    def set_nullability(self, name: str, required: bool) -> UpdateSchema:
 91        """Set or drop NOT NULL. Returns self for chaining."""
 92        self._changes.append(_SetNullability(name=name, required=required))
 93        return self
 94
 95    def commit(self) -> None:
 96        """Execute all pending schema changes as ALTER TABLE statements.
 97
 98        Refreshes the table schema afterward.
 99        """
100        fqn = self._table.fully_qualified_name
101        conn = self._table.catalog.connection
102
103        for change in self._changes:
104            sql = self._change_to_sql(fqn, change)
105            conn.execute(sql)
106
107        self._changes.clear()
108        self._table.refresh()
109
110    def _change_to_sql(self, fqn: str, change: _SchemaChange) -> str:
111        if isinstance(change, _AddColumn):
112            type_sql = ducklake_type_to_sql(change.field_type)
113            sql = f"ALTER TABLE {fqn} ADD COLUMN {quote_identifier(change.name)} {type_sql}"
114            if change.required:
115                sql += " NOT NULL"
116            return sql
117        if isinstance(change, _DropColumn):
118            return f"ALTER TABLE {fqn} DROP COLUMN {quote_identifier(change.name)}"
119        if isinstance(change, _RenameColumn):
120            return f"ALTER TABLE {fqn} RENAME {quote_identifier(change.name)} TO {quote_identifier(change.new_name)}"
121        if isinstance(change, _UpdateColumnType):
122            type_sql = ducklake_type_to_sql(change.new_type)
123            return f"ALTER TABLE {fqn} ALTER {quote_identifier(change.name)} SET TYPE {type_sql}"
124        # At this point, change must be _SetNullability (exhaustive match)
125        if change.required:
126            return f"ALTER TABLE {fqn} ALTER {quote_identifier(change.name)} SET NOT NULL"
127        return f"ALTER TABLE {fqn} ALTER {quote_identifier(change.name)} DROP NOT NULL"
128
129    # -- Context manager -------------------------------------------------------
130
131    def __enter__(self) -> UpdateSchema:
132        return self
133
134    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
135        """Auto-commit on clean exit."""
136        if exc_type is None:
137            self.commit()

Builder for schema evolution operations.

Obtained via table.update_schema(). Collects changes, applies on commit().

UpdateSchema(table: Table)
60    def __init__(self, table: Table) -> None:
61        self._table = table
62        self._changes: list[_SchemaChange] = []
def add_column( self, name: str, field_type: DucklakeType, doc: str | None = None, required: bool = False) -> UpdateSchema:
64    def add_column(
65        self,
66        name: str,
67        field_type: DucklakeType,
68        doc: str | None = None,
69        required: bool = False,
70    ) -> UpdateSchema:
71        """Add a column. Returns self for chaining."""
72        self._changes.append(_AddColumn(name=name, field_type=field_type, doc=doc, required=required))
73        return self

Add a column. Returns self for chaining.

def drop_column(self, name: str) -> UpdateSchema:
75    def drop_column(self, name: str) -> UpdateSchema:
76        """Drop a column. Returns self for chaining."""
77        self._changes.append(_DropColumn(name=name))
78        return self

Drop a column. Returns self for chaining.

def rename_column( self, name: str, new_name: str) -> UpdateSchema:
80    def rename_column(self, name: str, new_name: str) -> UpdateSchema:
81        """Rename a column. Returns self for chaining."""
82        self._changes.append(_RenameColumn(name=name, new_name=new_name))
83        return self

Rename a column. Returns self for chaining.

def update_column( self, name: str, new_type: DucklakeType) -> UpdateSchema:
85    def update_column(self, name: str, new_type: DucklakeType) -> UpdateSchema:
86        """Change column type (lossless promotions only). Returns self for chaining."""
87        self._changes.append(_UpdateColumnType(name=name, new_type=new_type))
88        return self

Change column type (lossless promotions only). Returns self for chaining.

def set_nullability( self, name: str, required: bool) -> UpdateSchema:
90    def set_nullability(self, name: str, required: bool) -> UpdateSchema:
91        """Set or drop NOT NULL. Returns self for chaining."""
92        self._changes.append(_SetNullability(name=name, required=required))
93        return self

Set or drop NOT NULL. Returns self for chaining.

def commit(self) -> None:
 95    def commit(self) -> None:
 96        """Execute all pending schema changes as ALTER TABLE statements.
 97
 98        Refreshes the table schema afterward.
 99        """
100        fqn = self._table.fully_qualified_name
101        conn = self._table.catalog.connection
102
103        for change in self._changes:
104            sql = self._change_to_sql(fqn, change)
105            conn.execute(sql)
106
107        self._changes.clear()
108        self._table.refresh()

Execute all pending schema changes as ALTER TABLE statements.

Refreshes the table schema afterward.

class UpdateSortOrder:
 83class UpdateSortOrder:
 84    """Builder for sort order changes. Obtained via table.update_sort_order()."""
 85
 86    __slots__ = ("_table", "_fields", "_clear")
 87
 88    def __init__(self, table: Table) -> None:
 89        self._table = table
 90        self._fields: list[SortField] = []
 91        self._clear = False
 92
 93    def add_field(
 94        self,
 95        source_column: str,
 96        direction: SortDirection = SortDirection.ASC,
 97        null_order: NullOrder = NullOrder.NULLS_LAST,
 98    ) -> UpdateSortOrder:
 99        """Add a sort field. Returns self for chaining."""
100        self._fields.append(
101            SortField(
102                source_column=source_column,
103                direction=direction,
104                null_order=null_order,
105            )
106        )
107        return self
108
109    def clear(self) -> UpdateSortOrder:
110        """Remove all sorting (RESET SORTED BY). Returns self."""
111        self._clear = True
112        self._fields.clear()
113        return self
114
115    def commit(self) -> None:
116        """Apply sort order changes via ALTER TABLE."""
117        fqn = self._table.fully_qualified_name
118        conn = self._table.catalog.connection
119
120        if self._clear:
121            conn.execute(f"ALTER TABLE {fqn} RESET SORTED BY")
122        elif self._fields:
123            parts = [f.to_sql() for f in self._fields]
124            sort_expr = ", ".join(parts)
125            conn.execute(f"ALTER TABLE {fqn} SET SORTED BY ({sort_expr})")
126
127        self._fields.clear()
128        self._clear = False
129        self._table.refresh()
130
131    # -- Context manager -------------------------------------------------------
132
133    def __enter__(self) -> UpdateSortOrder:
134        return self
135
136    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
137        """Auto-commit on clean exit."""
138        if exc_type is None:
139            self.commit()

Builder for sort order changes. Obtained via table.update_sort_order().

UpdateSortOrder(table: Table)
88    def __init__(self, table: Table) -> None:
89        self._table = table
90        self._fields: list[SortField] = []
91        self._clear = False
def add_field( self, source_column: str, direction: SortDirection = <SortDirection.ASC: 'ASC'>, null_order: NullOrder = <NullOrder.NULLS_LAST: 'NULLS LAST'>) -> UpdateSortOrder:
 93    def add_field(
 94        self,
 95        source_column: str,
 96        direction: SortDirection = SortDirection.ASC,
 97        null_order: NullOrder = NullOrder.NULLS_LAST,
 98    ) -> UpdateSortOrder:
 99        """Add a sort field. Returns self for chaining."""
100        self._fields.append(
101            SortField(
102                source_column=source_column,
103                direction=direction,
104                null_order=null_order,
105            )
106        )
107        return self

Add a sort field. Returns self for chaining.

def clear(self) -> UpdateSortOrder:
109    def clear(self) -> UpdateSortOrder:
110        """Remove all sorting (RESET SORTED BY). Returns self."""
111        self._clear = True
112        self._fields.clear()
113        return self

Remove all sorting (RESET SORTED BY). Returns self.

def commit(self) -> None:
115    def commit(self) -> None:
116        """Apply sort order changes via ALTER TABLE."""
117        fqn = self._table.fully_qualified_name
118        conn = self._table.catalog.connection
119
120        if self._clear:
121            conn.execute(f"ALTER TABLE {fqn} RESET SORTED BY")
122        elif self._fields:
123            parts = [f.to_sql() for f in self._fields]
124            sort_expr = ", ".join(parts)
125            conn.execute(f"ALTER TABLE {fqn} SET SORTED BY ({sort_expr})")
126
127        self._fields.clear()
128        self._clear = False
129        self._table.refresh()

Apply sort order changes via ALTER TABLE.

class UpdateSpec:
187class UpdateSpec:
188    """Builder for partition spec changes.
189
190    Obtained via table.update_spec().
191    """
192
193    __slots__ = ("_table", "_fields", "_clear")
194
195    def __init__(self, table: Table) -> None:
196        self._table = table
197        self._fields: list[PartitionField] = []
198        self._clear = False
199
200    def add_field(
201        self,
202        source_column: str,
203        transform: Transform = IDENTITY,
204    ) -> UpdateSpec:
205        """Add a partition field."""
206        self._fields.append(PartitionField(source_column=source_column, transform=transform))
207        return self
208
209    def clear(self) -> UpdateSpec:
210        """Remove all partitioning (RESET PARTITIONED BY)."""
211        self._clear = True
212        self._fields.clear()
213        return self
214
215    def commit(self) -> None:
216        """Apply partition changes via ALTER TABLE."""
217        fqn = self._table.fully_qualified_name
218        conn = self._table.catalog.connection
219
220        if self._clear:
221            conn.execute(f"ALTER TABLE {fqn} RESET PARTITIONED BY")
222        elif self._fields:
223            parts: list[str] = []
224            for field in self._fields:
225                col = _quote_identifier(field.source_column)
226                func = field.transform.to_sql()
227                if func:
228                    parts.append(f"{func}({col})")
229                else:
230                    parts.append(col)
231            partition_expr = ", ".join(parts)
232            conn.execute(f"ALTER TABLE {fqn} SET PARTITIONED BY ({partition_expr})")
233
234        self._fields.clear()
235        self._clear = False
236        self._table.refresh()
237
238    # -- Context manager -------------------------------------------------------
239
240    def __enter__(self) -> UpdateSpec:
241        return self
242
243    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
244        """Auto-commit on clean exit."""
245        if exc_type is None:
246            self.commit()

Builder for partition spec changes.

Obtained via table.update_spec().

UpdateSpec(table: Table)
195    def __init__(self, table: Table) -> None:
196        self._table = table
197        self._fields: list[PartitionField] = []
198        self._clear = False
def add_field( self, source_column: str, transform: Transform = IdentityTransform()) -> UpdateSpec:
200    def add_field(
201        self,
202        source_column: str,
203        transform: Transform = IDENTITY,
204    ) -> UpdateSpec:
205        """Add a partition field."""
206        self._fields.append(PartitionField(source_column=source_column, transform=transform))
207        return self

Add a partition field.

def clear(self) -> UpdateSpec:
209    def clear(self) -> UpdateSpec:
210        """Remove all partitioning (RESET PARTITIONED BY)."""
211        self._clear = True
212        self._fields.clear()
213        return self

Remove all partitioning (RESET PARTITIONED BY).

def commit(self) -> None:
215    def commit(self) -> None:
216        """Apply partition changes via ALTER TABLE."""
217        fqn = self._table.fully_qualified_name
218        conn = self._table.catalog.connection
219
220        if self._clear:
221            conn.execute(f"ALTER TABLE {fqn} RESET PARTITIONED BY")
222        elif self._fields:
223            parts: list[str] = []
224            for field in self._fields:
225                col = _quote_identifier(field.source_column)
226                func = field.transform.to_sql()
227                if func:
228                    parts.append(f"{func}({col})")
229                else:
230                    parts.append(col)
231            partition_expr = ", ".join(parts)
232            conn.execute(f"ALTER TABLE {fqn} SET PARTITIONED BY ({partition_expr})")
233
234        self._fields.clear()
235        self._clear = False
236        self._table.refresh()

Apply partition changes via ALTER TABLE.

UNSORTED = SortOrder(UNSORTED)
@dataclass(frozen=True)
class UpsertResult:
45@dataclass(frozen=True)
46class UpsertResult:
47    """Result of an upsert operation."""
48
49    rows_updated: int
50    rows_inserted: int

Result of an upsert operation.

UpsertResult(rows_updated: int, rows_inserted: int)
rows_updated: int
rows_inserted: int
class NullOrder(enum.Enum):
30class NullOrder(Enum):
31    """Null ordering for a sort field."""
32
33    NULLS_FIRST = "NULLS FIRST"
34    NULLS_LAST = "NULLS LAST"

Null ordering for a sort field.

NULLS_FIRST = <NullOrder.NULLS_FIRST: 'NULLS FIRST'>
NULLS_LAST = <NullOrder.NULLS_LAST: 'NULLS LAST'>
class SortDirection(enum.Enum):
23class SortDirection(Enum):
24    """Sort direction for a sort field."""
25
26    ASC = "ASC"
27    DESC = "DESC"

Sort direction for a sort field.

ASC = <SortDirection.ASC: 'ASC'>
DESC = <SortDirection.DESC: 'DESC'>
@dataclass(frozen=True)
class SortField:
42@dataclass(frozen=True)
43class SortField:
44    """A single sort field."""
45
46    source_column: str
47    direction: SortDirection = SortDirection.ASC
48    null_order: NullOrder = NullOrder.NULLS_LAST
49
50    def to_sql(self) -> str:
51        """E.g. '"col_name" ASC NULLS LAST'"""
52        return f"{_quote_identifier(self.source_column)} {self.direction.value} {self.null_order.value}"

A single sort field.

SortField( source_column: str, direction: SortDirection = <SortDirection.ASC: 'ASC'>, null_order: NullOrder = <NullOrder.NULLS_LAST: 'NULLS LAST'>)
source_column: str
direction: SortDirection = <SortDirection.ASC: 'ASC'>
null_order: NullOrder = <NullOrder.NULLS_LAST: 'NULLS LAST'>
def to_sql(self) -> str:
50    def to_sql(self) -> str:
51        """E.g. '"col_name" ASC NULLS LAST'"""
52        return f"{_quote_identifier(self.source_column)} {self.direction.value} {self.null_order.value}"

E.g. '"col_name" ASC NULLS LAST'

@dataclass(frozen=True)
class SortOrder:
55@dataclass(frozen=True)
56class SortOrder:
57    """Sort order specification for a table."""
58
59    fields: tuple[SortField, ...] = ()
60
61    @property
62    def is_unsorted(self) -> bool:
63        return len(self.fields) == 0
64
65    def __repr__(self) -> str:
66        if self.is_unsorted:
67            return "SortOrder(UNSORTED)"
68        fields_repr = ", ".join(f.to_sql() for f in self.fields)
69        return f"SortOrder({fields_repr})"
70
71    def __eq__(self, other: object) -> bool:
72        if not isinstance(other, SortOrder):
73            return NotImplemented
74        return self.fields == other.fields
75
76    def __hash__(self) -> int:
77        return hash(self.fields)

Sort order specification for a table.

SortOrder(fields: tuple[SortField, ...] = ())
fields: tuple[SortField, ...] = ()
is_unsorted: bool
61    @property
62    def is_unsorted(self) -> bool:
63        return len(self.fields) == 0
DAY = DayTransform()
class DayTransform(pyducklake.Transform):
 95class DayTransform(Transform):
 96    """Extract day from a date/timestamp column."""
 97
 98    __slots__ = ()
 99
100    def to_sql(self) -> str:
101        return "day"
102
103    def __repr__(self) -> str:
104        return "DayTransform()"

Extract day from a date/timestamp column.

def to_sql(self) -> str:
100    def to_sql(self) -> str:
101        return "day"

Return SQL function name for use in SET PARTITIONED BY.

Identity returns empty string (no function wrapper).

HOUR = HourTransform()
class HourTransform(pyducklake.Transform):
107class HourTransform(Transform):
108    """Extract hour from a timestamp column."""
109
110    __slots__ = ()
111
112    def to_sql(self) -> str:
113        return "hour"
114
115    def __repr__(self) -> str:
116        return "HourTransform()"

Extract hour from a timestamp column.

def to_sql(self) -> str:
112    def to_sql(self) -> str:
113        return "hour"

Return SQL function name for use in SET PARTITIONED BY.

Identity returns empty string (no function wrapper).

IDENTITY = IdentityTransform()
class IdentityTransform(pyducklake.Transform):
59class IdentityTransform(Transform):
60    """Identity transform -- partition by raw column value."""
61
62    __slots__ = ()
63
64    def to_sql(self) -> str:
65        return ""
66
67    def __repr__(self) -> str:
68        return "IdentityTransform()"

Identity transform -- partition by raw column value.

def to_sql(self) -> str:
64    def to_sql(self) -> str:
65        return ""

Return SQL function name for use in SET PARTITIONED BY.

Identity returns empty string (no function wrapper).

MONTH = MonthTransform()
class MonthTransform(pyducklake.Transform):
83class MonthTransform(Transform):
84    """Extract month from a date/timestamp column."""
85
86    __slots__ = ()
87
88    def to_sql(self) -> str:
89        return "month"
90
91    def __repr__(self) -> str:
92        return "MonthTransform()"

Extract month from a date/timestamp column.

def to_sql(self) -> str:
88    def to_sql(self) -> str:
89        return "month"

Return SQL function name for use in SET PARTITIONED BY.

Identity returns empty string (no function wrapper).

@dataclass(frozen=True)
class PartitionField:
132@dataclass(frozen=True)
133class PartitionField:
134    """A single partition field: a source column plus a transform."""
135
136    source_column: str
137    transform: Transform

A single partition field: a source column plus a transform.

PartitionField(source_column: str, transform: Transform)
source_column: str
transform: Transform
class PartitionSpec:
140class PartitionSpec:
141    """Partition specification for a table."""
142
143    __slots__ = ("_fields",)
144
145    def __init__(self, *fields: PartitionField) -> None:
146        self._fields = fields
147
148    @property
149    def fields(self) -> tuple[PartitionField, ...]:
150        return self._fields
151
152    @property
153    def is_unpartitioned(self) -> bool:
154        return len(self._fields) == 0
155
156    def __repr__(self) -> str:
157        if self.is_unpartitioned:
158            return "PartitionSpec(UNPARTITIONED)"
159        fields_repr = ", ".join(
160            f"{f.transform.to_sql()}({f.source_column!r})" if f.transform.to_sql() else f.source_column
161            for f in self._fields
162        )
163        return f"PartitionSpec({fields_repr})"
164
165    def __eq__(self, other: object) -> bool:
166        if not isinstance(other, PartitionSpec):
167            return NotImplemented
168        return self._fields == other._fields
169
170    def __hash__(self) -> int:
171        return hash(self._fields)

Partition specification for a table.

PartitionSpec(*fields: PartitionField)
145    def __init__(self, *fields: PartitionField) -> None:
146        self._fields = fields
fields: tuple[PartitionField, ...]
148    @property
149    def fields(self) -> tuple[PartitionField, ...]:
150        return self._fields
is_unpartitioned: bool
152    @property
153    def is_unpartitioned(self) -> bool:
154        return len(self._fields) == 0
class Transform(abc.ABC):
37class Transform(ABC):
38    """Base for partition transforms."""
39
40    __slots__ = ()
41
42    @abstractmethod
43    def to_sql(self) -> str:
44        """Return SQL function name for use in SET PARTITIONED BY.
45
46        Identity returns empty string (no function wrapper).
47        """
48
49    def __eq__(self, other: object) -> bool:
50        return type(self) is type(other)
51
52    def __hash__(self) -> int:
53        return hash(type(self))
54
55    @abstractmethod
56    def __repr__(self) -> str: ...

Base for partition transforms.

@abstractmethod
def to_sql(self) -> str:
42    @abstractmethod
43    def to_sql(self) -> str:
44        """Return SQL function name for use in SET PARTITIONED BY.
45
46        Identity returns empty string (no function wrapper).
47        """

Return SQL function name for use in SET PARTITIONED BY.

Identity returns empty string (no function wrapper).

UNPARTITIONED = PartitionSpec(UNPARTITIONED)
YEAR = YearTransform()
class YearTransform(pyducklake.Transform):
71class YearTransform(Transform):
72    """Extract year from a date/timestamp column."""
73
74    __slots__ = ()
75
76    def to_sql(self) -> str:
77        return "year"
78
79    def __repr__(self) -> str:
80        return "YearTransform()"

Extract year from a date/timestamp column.

def to_sql(self) -> str:
76    def to_sql(self) -> str:
77        return "year"

Return SQL function name for use in SET PARTITIONED BY.

Identity returns empty string (no function wrapper).

class BigIntType(pyducklake.types.PrimitiveType):
135class BigIntType(PrimitiveType):
136    __slots__ = ()
137
138    @property
139    def _name(self) -> str:
140        return "BigIntType"

Base for singleton primitive types.

class BinaryType(pyducklake.types.PrimitiveType):
207class BinaryType(PrimitiveType):
208    __slots__ = ()
209
210    @property
211    def _name(self) -> str:
212        return "BinaryType"

Base for singleton primitive types.

class BooleanType(pyducklake.types.PrimitiveType):
103class BooleanType(PrimitiveType):
104    __slots__ = ()
105
106    @property
107    def _name(self) -> str:
108        return "BooleanType"

Base for singleton primitive types.

class DateType(pyducklake.types.PrimitiveType):
215class DateType(PrimitiveType):
216    __slots__ = ()
217
218    @property
219    def _name(self) -> str:
220        return "DateType"

Base for singleton primitive types.

@dataclass(frozen=True, slots=True)
class DecimalType(pyducklake.DucklakeType):
276@dataclass(frozen=True, slots=True)
277class DecimalType(DucklakeType):
278    """DECIMAL(precision, scale)."""
279
280    precision: int
281    scale: int
282
283    def __post_init__(self) -> None:
284        if self.precision < 1 or self.precision > 38:
285            raise ValueError(f"Decimal precision must be between 1 and 38, got {self.precision}")
286        if self.scale < 0:
287            raise ValueError(f"Decimal scale must be non-negative, got {self.scale}")
288        if self.scale > self.precision:
289            raise ValueError(f"Decimal scale ({self.scale}) cannot exceed precision ({self.precision})")
290
291    def __repr__(self) -> str:
292        return f"DecimalType(precision={self.precision}, scale={self.scale})"

DECIMAL(precision, scale).

DecimalType(precision: int, scale: int)
precision: int
scale: int
class DoubleType(pyducklake.types.PrimitiveType):
191class DoubleType(PrimitiveType):
192    __slots__ = ()
193
194    @property
195    def _name(self) -> str:
196        return "DoubleType"

Base for singleton primitive types.

class DucklakeType(abc.ABC):
58class DucklakeType(ABC):
59    """Abstract base class for all Ducklake types."""
60
61    __slots__ = ()
62
63    @abstractmethod
64    def __repr__(self) -> str: ...
65
66    def __str__(self) -> str:
67        return repr(self)

Abstract base class for all Ducklake types.

class FloatType(pyducklake.types.PrimitiveType):
183class FloatType(PrimitiveType):
184    __slots__ = ()
185
186    @property
187    def _name(self) -> str:
188        return "FloatType"

Base for singleton primitive types.

class HugeIntType(pyducklake.types.PrimitiveType):
143class HugeIntType(PrimitiveType):
144    __slots__ = ()
145
146    @property
147    def _name(self) -> str:
148        return "HugeIntType"

Base for singleton primitive types.

class IntegerType(pyducklake.types.PrimitiveType):
127class IntegerType(PrimitiveType):
128    __slots__ = ()
129
130    @property
131    def _name(self) -> str:
132        return "IntegerType"

Base for singleton primitive types.

class IntervalType(pyducklake.types.PrimitiveType):
263class IntervalType(PrimitiveType):
264    __slots__ = ()
265
266    @property
267    def _name(self) -> str:
268        return "IntervalType"

Base for singleton primitive types.

class JSONType(pyducklake.types.PrimitiveType):
255class JSONType(PrimitiveType):
256    __slots__ = ()
257
258    @property
259    def _name(self) -> str:
260        return "JSONType"

Base for singleton primitive types.

@dataclass(frozen=True, slots=True)
class ListType(pyducklake.DucklakeType):
329@dataclass(frozen=True, slots=True)
330class ListType(DucklakeType):
331    """LIST type with a typed element."""
332
333    element_id: int
334    element_type: DucklakeType
335    element_required: bool = True
336
337    def __repr__(self) -> str:
338        return (
339            f"ListType(element_id={self.element_id}, element_type={self.element_type}, "
340            f"element_required={self.element_required})"
341        )

LIST type with a typed element.

ListType( element_id: int, element_type: DucklakeType, element_required: bool = True)
element_id: int
element_type: DucklakeType
element_required: bool
@dataclass(frozen=True, slots=True)
class MapType(pyducklake.DucklakeType):
344@dataclass(frozen=True, slots=True)
345class MapType(DucklakeType):
346    """MAP type with typed key and value."""
347
348    key_id: int
349    key_type: DucklakeType
350    value_id: int
351    value_type: DucklakeType
352    value_required: bool = True
353
354    def __repr__(self) -> str:
355        return (
356            f"MapType(key_id={self.key_id}, key_type={self.key_type}, "
357            f"value_id={self.value_id}, value_type={self.value_type}, "
358            f"value_required={self.value_required})"
359        )

MAP type with typed key and value.

MapType( key_id: int, key_type: DucklakeType, value_id: int, value_type: DucklakeType, value_required: bool = True)
key_id: int
key_type: DucklakeType
value_id: int
value_type: DucklakeType
value_required: bool
@dataclass(frozen=True, slots=True)
class NestedField:
300@dataclass(frozen=True, slots=True)
301class NestedField:
302    """A named field within a struct, with an ID and optional documentation."""
303
304    field_id: int
305    name: str
306    field_type: DucklakeType
307    required: bool = False
308    doc: str | None = None
309
310    def __repr__(self) -> str:
311        opt = "required" if self.required else "optional"
312        return f"NestedField(field_id={self.field_id}, name={self.name!r}, type={self.field_type}, {opt})"

A named field within a struct, with an ID and optional documentation.

NestedField( field_id: int, name: str, field_type: DucklakeType, required: bool = False, doc: str | None = None)
field_id: int
name: str
field_type: DucklakeType
required: bool
doc: str | None
class SmallIntType(pyducklake.types.PrimitiveType):
119class SmallIntType(PrimitiveType):
120    __slots__ = ()
121
122    @property
123    def _name(self) -> str:
124        return "SmallIntType"

Base for singleton primitive types.

class StringType(pyducklake.types.PrimitiveType):
199class StringType(PrimitiveType):
200    __slots__ = ()
201
202    @property
203    def _name(self) -> str:
204        return "StringType"

Base for singleton primitive types.

@dataclass(frozen=True, slots=True)
class StructType(pyducklake.DucklakeType):
315@dataclass(frozen=True, slots=True)
316class StructType(DucklakeType):
317    """STRUCT type containing named fields."""
318
319    fields: tuple[NestedField, ...]
320
321    def __iter__(self) -> Iterator[NestedField]:
322        return iter(self.fields)
323
324    def __repr__(self) -> str:
325        field_strs = ", ".join(repr(f) for f in self.fields)
326        return f"StructType(fields=({field_strs}))"

STRUCT type containing named fields.

StructType(fields: tuple[NestedField, ...])
fields: tuple[NestedField, ...]
class TimeType(pyducklake.types.PrimitiveType):
223class TimeType(PrimitiveType):
224    __slots__ = ()
225
226    @property
227    def _name(self) -> str:
228        return "TimeType"

Base for singleton primitive types.

class TimestampTZType(pyducklake.types.PrimitiveType):
239class TimestampTZType(PrimitiveType):
240    __slots__ = ()
241
242    @property
243    def _name(self) -> str:
244        return "TimestampTZType"

Base for singleton primitive types.

class TimestampType(pyducklake.types.PrimitiveType):
231class TimestampType(PrimitiveType):
232    __slots__ = ()
233
234    @property
235    def _name(self) -> str:
236        return "TimestampType"

Base for singleton primitive types.

class TinyIntType(pyducklake.types.PrimitiveType):
111class TinyIntType(PrimitiveType):
112    __slots__ = ()
113
114    @property
115    def _name(self) -> str:
116        return "TinyIntType"

Base for singleton primitive types.

class UBigIntType(pyducklake.types.PrimitiveType):
175class UBigIntType(PrimitiveType):
176    __slots__ = ()
177
178    @property
179    def _name(self) -> str:
180        return "UBigIntType"

Base for singleton primitive types.

class UIntegerType(pyducklake.types.PrimitiveType):
167class UIntegerType(PrimitiveType):
168    __slots__ = ()
169
170    @property
171    def _name(self) -> str:
172        return "UIntegerType"

Base for singleton primitive types.

class USmallIntType(pyducklake.types.PrimitiveType):
159class USmallIntType(PrimitiveType):
160    __slots__ = ()
161
162    @property
163    def _name(self) -> str:
164        return "USmallIntType"

Base for singleton primitive types.

class UTinyIntType(pyducklake.types.PrimitiveType):
151class UTinyIntType(PrimitiveType):
152    __slots__ = ()
153
154    @property
155    def _name(self) -> str:
156        return "UTinyIntType"

Base for singleton primitive types.

class UUIDType(pyducklake.types.PrimitiveType):
247class UUIDType(PrimitiveType):
248    __slots__ = ()
249
250    @property
251    def _name(self) -> str:
252        return "UUIDType"

Base for singleton primitive types.

def arrow_type_to_ducklake(t: pyarrow.lib.DataType) -> DucklakeType:
441def arrow_type_to_ducklake(t: pa.DataType) -> DucklakeType:
442    """Convert a PyArrow DataType to a DucklakeType."""
443    direct = _ARROW_TO_PRIMITIVE.get(t)
444    if direct is not None:
445        return direct
446
447    if isinstance(t, pa.Decimal128Type):
448        prec: int = cast(int, t.precision)
449        sc: int = cast(int, t.scale)
450        if prec == 38 and sc == 0:
451            return HugeIntType()
452        return DecimalType(prec, sc)
453
454    if isinstance(t, pa.TimestampType):
455        unit: str = cast(str, t.unit)
456        tz: str | None = cast("str | None", t.tz)
457        if unit != "us":
458            raise TypeError(f"Unsupported timestamp unit '{unit}': only 'us' (microsecond) is supported")
459        if tz is not None and tz != "UTC":
460            raise TypeError(f"Unsupported timezone '{tz}': only UTC is supported")
461        if tz is None:
462            return TimestampType()
463        return TimestampTZType()
464
465    if isinstance(t, pa.ListType):
466        elem_type = arrow_type_to_ducklake(t.value_type)
467        return ListType(element_id=0, element_type=elem_type, element_required=not t.value_field.nullable)
468
469    if isinstance(t, pa.MapType):
470        key_type = arrow_type_to_ducklake(t.key_type)
471        value_type = arrow_type_to_ducklake(t.item_type)
472        return MapType(
473            key_id=0,
474            key_type=key_type,
475            value_id=0,
476            value_type=value_type,
477            value_required=not t.item_field.nullable,
478        )
479
480    if isinstance(t, pa.StructType):
481        fields = tuple(
482            NestedField(
483                field_id=i,
484                name=t.field(i).name,
485                field_type=arrow_type_to_ducklake(t.field(i).type),
486                required=not t.field(i).nullable,
487            )
488            for i in range(t.num_fields)
489        )
490        return StructType(fields)
491
492    raise TypeError(f"Cannot convert Arrow type {t} to DucklakeType")

Convert a PyArrow DataType to a DucklakeType.

def ducklake_type_to_arrow(t: DucklakeType) -> pyarrow.lib.DataType:
391def ducklake_type_to_arrow(t: DucklakeType) -> pa.DataType:
392    """Convert a DucklakeType to a PyArrow DataType."""
393    if isinstance(t, PrimitiveType):
394        arrow_type = _PRIMITIVE_TO_ARROW.get(type(t))
395        if arrow_type is not None:
396            return arrow_type
397
398    if isinstance(t, DecimalType):
399        return pa.decimal128(t.precision, t.scale)
400
401    if isinstance(t, ListType):
402        element_arrow = ducklake_type_to_arrow(t.element_type)
403        return pa.list_(pa.field("element", element_arrow, nullable=not t.element_required))
404
405    if isinstance(t, MapType):
406        key_arrow = ducklake_type_to_arrow(t.key_type)
407        value_arrow = ducklake_type_to_arrow(t.value_type)
408        return pa.map_(key_arrow, pa.field("value", value_arrow, nullable=not t.value_required))  # type: ignore[call-overload, no-any-return]
409
410    if isinstance(t, StructType):
411        fields = [pa.field(f.name, ducklake_type_to_arrow(f.field_type), nullable=not f.required) for f in t.fields]
412        return pa.struct(fields)
413
414    raise TypeError(f"Cannot convert {t} to Arrow type")

Convert a DucklakeType to a PyArrow DataType.

def ducklake_type_to_sql(t: DucklakeType) -> str:
524def ducklake_type_to_sql(t: DucklakeType) -> str:
525    """Convert a DucklakeType to a DuckDB SQL type string."""
526    if isinstance(t, PrimitiveType):
527        sql = _PRIMITIVE_TO_SQL.get(type(t))
528        if sql is not None:
529            return sql
530
531    if isinstance(t, DecimalType):
532        return f"DECIMAL({t.precision}, {t.scale})"
533
534    if isinstance(t, ListType):
535        return f"{ducklake_type_to_sql(t.element_type)}[]"
536
537    if isinstance(t, MapType):
538        return f"MAP({ducklake_type_to_sql(t.key_type)}, {ducklake_type_to_sql(t.value_type)})"
539
540    if isinstance(t, StructType):
541        field_strs = ", ".join(
542            f'"{f.name.replace(chr(34), chr(34) + chr(34))}" {ducklake_type_to_sql(f.field_type)}' for f in t.fields
543        )
544        return f"STRUCT({field_strs})"
545
546    raise TypeError(f"Cannot convert {t} to SQL string")

Convert a DucklakeType to a DuckDB SQL type string.