pyducklake
pyducklake
A Python SDK for Ducklake, providing a pyiceberg-like API for the Ducklake lakehouse format.
Quick Start
from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
# Connect to a Ducklake catalog (DuckDB, PostgreSQL, MySQL, or SQLite metadata)
catalog = Catalog("my_lake", "metadata.duckdb", data_path="./data")
# Ergonomic schema creation
schema = Schema.of(
required("id", IntegerType()),
optional("name", StringType()),
)
# Or with a dict (all fields optional):
schema = Schema.of({"id": IntegerType(), "name": StringType()})
# Explicit field IDs (used internally when loading from catalog):
from pyducklake import NestedField
schema = Schema(
NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),
NestedField(field_id=2, name="name", field_type=StringType()),
)
table = catalog.create_table("users", schema)
# Write data
import pyarrow as pa
df = pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]})
table.append(df)
# Read data
result = table.scan().to_arrow()
print(result)
Metadata Backends
Connect to any supported metadata backend by changing the URI:
| Backend | URI |
|---|---|
| DuckDB | Catalog("lake", "meta.duckdb") |
| PostgreSQL | Catalog("lake", "postgres:dbname=mydb host=localhost") |
| MySQL | Catalog("lake", "mysql:host=localhost database=mydb") |
| SQLite | Catalog("lake", "sqlite:meta.sqlite") |
Key Features
Catalog management —
Catalogprovides namespace and table CRUD, views, configuration, and commit metadata.Table operations —
Tablesupportsappend(),overwrite(),delete(),upsert(), andadd_files()for data mutation.Scan API —
DataScanoffers an immutable builder for filtered, projected, time-traveling reads with output to Arrow, pandas, Polars, DuckDB, or streamingRecordBatchReader.Schema evolution —
UpdateSchemaprovidesadd_column(),drop_column(),rename_column(), andupdate_column()with a builder/context-manager pattern.Partitioning —
UpdateSpecmanages hidden partitioning with identity, year, month, day, and hour transforms.Sort orders —
UpdateSortOrderconfigures sort orders applied during compaction.Transactions —
Transactiongroups multiple operations across tables into a single atomic commit.Time travel — Scan at any snapshot version or timestamp via
scan().with_snapshot(id)orscan().with_timestamp(ts).Change data capture —
table_changes(),table_insertions(), andtable_deletions()query row-level changes between snapshots.Inspect API —
InspectTableexposes snapshot history, data files, and partition metadata as Arrow tables.Maintenance —
MaintenanceTableprovidescompact(),expire_snapshots(),rewrite_data_files(),cleanup_files(), andcheckpoint().CLI — The
pyducklakecommand-line tool provides catalog inspection and maintenance from the terminal.
Modules
| Module | Description |
|---|---|
pyducklake.catalog |
Catalog connection and namespace/table/view management |
pyducklake.table |
Table class with read, write, and metadata operations |
pyducklake.scan |
DataScan builder for filtered, projected reads |
pyducklake.schema |
Schema representation and field lookup |
pyducklake.schema_evolution |
UpdateSchema builder for schema changes |
pyducklake.types |
Ducklake type system and Arrow/SQL conversion |
pyducklake.expressions |
Boolean expression tree for filters |
pyducklake.partitioning |
Partition transforms, specs, and UpdateSpec builder |
pyducklake.sorting |
Sort orders and UpdateSortOrder builder |
pyducklake.transaction |
Multi-operation atomic transactions |
pyducklake.inspect |
Metadata introspection (snapshots, files, partitions) |
pyducklake.maintenance |
Table maintenance (compaction, expiration, cleanup) |
pyducklake.snapshot |
Snapshot dataclass |
pyducklake.exceptions |
Exception hierarchy |
pyducklake.cli |
Command-line interface |
1""" 2# pyducklake 3 4A Python SDK for [Ducklake](https://ducklake.select), providing a 5[pyiceberg](https://py.iceberg.apache.org/)-like API for the Ducklake 6lakehouse format. 7 8## Quick Start 9 10```python 11from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType 12 13# Connect to a Ducklake catalog (DuckDB, PostgreSQL, MySQL, or SQLite metadata) 14catalog = Catalog("my_lake", "metadata.duckdb", data_path="./data") 15 16# Ergonomic schema creation 17schema = Schema.of( 18 required("id", IntegerType()), 19 optional("name", StringType()), 20) 21 22# Or with a dict (all fields optional): 23schema = Schema.of({"id": IntegerType(), "name": StringType()}) 24 25# Explicit field IDs (used internally when loading from catalog): 26from pyducklake import NestedField 27schema = Schema( 28 NestedField(field_id=1, name="id", field_type=IntegerType(), required=True), 29 NestedField(field_id=2, name="name", field_type=StringType()), 30) 31 32table = catalog.create_table("users", schema) 33 34# Write data 35import pyarrow as pa 36df = pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]}) 37table.append(df) 38 39# Read data 40result = table.scan().to_arrow() 41print(result) 42``` 43 44## Metadata Backends 45 46Connect to any supported metadata backend by changing the URI: 47 48| Backend | URI | 49|---------|-----| 50| DuckDB | `Catalog("lake", "meta.duckdb")` | 51| PostgreSQL | `Catalog("lake", "postgres:dbname=mydb host=localhost")` | 52| MySQL | `Catalog("lake", "mysql:host=localhost database=mydb")` | 53| SQLite | `Catalog("lake", "sqlite:meta.sqlite")` | 54 55## Key Features 56 57- **Catalog management** — `Catalog` provides namespace and table CRUD, 58 views, configuration, and commit metadata. 59 60- **Table operations** — `Table` supports `append()`, `overwrite()`, `delete()`, 61 `upsert()`, and `add_files()` for data mutation. 62 63- **Scan API** — `DataScan` offers an immutable builder for filtered, projected, 64 time-traveling reads with output to Arrow, pandas, Polars, DuckDB, or 65 streaming `RecordBatchReader`. 66 67- **Schema evolution** — `UpdateSchema` provides `add_column()`, `drop_column()`, 68 `rename_column()`, and `update_column()` with a builder/context-manager pattern. 69 70- **Partitioning** — `UpdateSpec` manages hidden partitioning with identity, 71 year, month, day, and hour transforms. 72 73- **Sort orders** — `UpdateSortOrder` configures sort orders applied during 74 compaction. 75 76- **Transactions** — `Transaction` groups multiple operations across tables 77 into a single atomic commit. 78 79- **Time travel** — Scan at any snapshot version or timestamp via 80 `scan().with_snapshot(id)` or `scan().with_timestamp(ts)`. 81 82- **Change data capture** — `table_changes()`, `table_insertions()`, and 83 `table_deletions()` query row-level changes between snapshots. 84 85- **Inspect API** — `InspectTable` exposes snapshot history, data files, 86 and partition metadata as Arrow tables. 87 88- **Maintenance** — `MaintenanceTable` provides `compact()`, 89 `expire_snapshots()`, `rewrite_data_files()`, `cleanup_files()`, and 90 `checkpoint()`. 91 92- **CLI** — The `pyducklake` command-line tool provides catalog inspection 93 and maintenance from the terminal. 94 95## Modules 96 97| Module | Description | 98|--------|-------------| 99| `pyducklake.catalog` | Catalog connection and namespace/table/view management | 100| `pyducklake.table` | Table class with read, write, and metadata operations | 101| `pyducklake.scan` | DataScan builder for filtered, projected reads | 102| `pyducklake.schema` | Schema representation and field lookup | 103| `pyducklake.schema_evolution` | UpdateSchema builder for schema changes | 104| `pyducklake.types` | Ducklake type system and Arrow/SQL conversion | 105| `pyducklake.expressions` | Boolean expression tree for filters | 106| `pyducklake.partitioning` | Partition transforms, specs, and UpdateSpec builder | 107| `pyducklake.sorting` | Sort orders and UpdateSortOrder builder | 108| `pyducklake.transaction` | Multi-operation atomic transactions | 109| `pyducklake.inspect` | Metadata introspection (snapshots, files, partitions) | 110| `pyducklake.maintenance` | Table maintenance (compaction, expiration, cleanup) | 111| `pyducklake.snapshot` | Snapshot dataclass | 112| `pyducklake.exceptions` | Exception hierarchy | 113| `pyducklake.cli` | Command-line interface | 114""" 115 116__version__ = "0.1.0" 117 118from pyducklake.catalog import Catalog 119from pyducklake.cdc import ChangeSet 120from pyducklake.exceptions import ( 121 CommitFailedError, 122 DucklakeError, 123 NamespaceAlreadyExistsError, 124 NamespaceNotEmptyError, 125 NoSuchNamespaceError, 126 NoSuchTableError, 127 NoSuchViewError, 128 TableAlreadyExistsError, 129 ViewAlreadyExistsError, 130) 131from pyducklake.expressions import ( 132 AlwaysFalse, 133 AlwaysTrue, 134 And, 135 BooleanExpression, 136 EqualTo, 137 GreaterThan, 138 GreaterThanOrEqual, 139 In, 140 IsNaN, 141 IsNull, 142 LessThan, 143 LessThanOrEqual, 144 Not, 145 NotEqualTo, 146 NotIn, 147 NotNaN, 148 NotNull, 149 Or, 150 Reference, 151) 152from pyducklake.inspect import InspectTable 153from pyducklake.maintenance import MaintenanceTable 154from pyducklake.partitioning import ( 155 DAY, 156 HOUR, 157 IDENTITY, 158 MONTH, 159 UNPARTITIONED, 160 YEAR, 161 DayTransform, 162 HourTransform, 163 IdentityTransform, 164 MonthTransform, 165 PartitionField, 166 PartitionSpec, 167 Transform, 168 UpdateSpec, 169 YearTransform, 170) 171from pyducklake.scan import DataScan 172from pyducklake.schema import Schema, optional, required 173from pyducklake.schema_evolution import UpdateSchema 174from pyducklake.snapshot import Snapshot 175from pyducklake.sorting import ( 176 UNSORTED, 177 NullOrder, 178 SortDirection, 179 SortField, 180 SortOrder, 181 UpdateSortOrder, 182) 183from pyducklake.table import ArrowCompatible, ArrowStreamExportable, Table, UpsertResult 184from pyducklake.transaction import Transaction 185from pyducklake.types import ( 186 BigIntType, 187 BinaryType, 188 BooleanType, 189 DateType, 190 DecimalType, 191 DoubleType, 192 DucklakeType, 193 FloatType, 194 HugeIntType, 195 IntegerType, 196 IntervalType, 197 JSONType, 198 ListType, 199 MapType, 200 NestedField, 201 SmallIntType, 202 StringType, 203 StructType, 204 TimestampType, 205 TimestampTZType, 206 TimeType, 207 TinyIntType, 208 UBigIntType, 209 UIntegerType, 210 USmallIntType, 211 UTinyIntType, 212 UUIDType, 213 arrow_type_to_ducklake, 214 ducklake_type_to_arrow, 215 ducklake_type_to_sql, 216) 217from pyducklake.view import View 218 219__all__ = [ 220 "ArrowCompatible", 221 "ArrowStreamExportable", 222 "AlwaysFalse", 223 "AlwaysTrue", 224 "And", 225 "BooleanExpression", 226 "Catalog", 227 "ChangeSet", 228 "CommitFailedError", 229 "DataScan", 230 "DucklakeError", 231 "EqualTo", 232 "GreaterThan", 233 "GreaterThanOrEqual", 234 "In", 235 "InspectTable", 236 "MaintenanceTable", 237 "IsNaN", 238 "IsNull", 239 "LessThan", 240 "LessThanOrEqual", 241 "NamespaceAlreadyExistsError", 242 "NamespaceNotEmptyError", 243 "NoSuchNamespaceError", 244 "NoSuchTableError", 245 "NoSuchViewError", 246 "Not", 247 "NotEqualTo", 248 "NotIn", 249 "NotNaN", 250 "NotNull", 251 "Or", 252 "Reference", 253 "Schema", 254 "optional", 255 "required", 256 "Snapshot", 257 "Table", 258 "TableAlreadyExistsError", 259 "View", 260 "ViewAlreadyExistsError", 261 "Transaction", 262 "UpdateSchema", 263 "UpdateSortOrder", 264 "UpdateSpec", 265 "UNSORTED", 266 "UpsertResult", 267 "NullOrder", 268 "SortDirection", 269 "SortField", 270 "SortOrder", 271 "DAY", 272 "DayTransform", 273 "HOUR", 274 "HourTransform", 275 "IDENTITY", 276 "IdentityTransform", 277 "MONTH", 278 "MonthTransform", 279 "PartitionField", 280 "PartitionSpec", 281 "Transform", 282 "UNPARTITIONED", 283 "YEAR", 284 "YearTransform", 285 "BigIntType", 286 "BinaryType", 287 "BooleanType", 288 "DateType", 289 "DecimalType", 290 "DoubleType", 291 "DucklakeType", 292 "FloatType", 293 "HugeIntType", 294 "IntegerType", 295 "IntervalType", 296 "JSONType", 297 "ListType", 298 "MapType", 299 "NestedField", 300 "SmallIntType", 301 "StringType", 302 "StructType", 303 "TimeType", 304 "TimestampTZType", 305 "TimestampType", 306 "TinyIntType", 307 "UBigIntType", 308 "UIntegerType", 309 "USmallIntType", 310 "UTinyIntType", 311 "UUIDType", 312 "arrow_type_to_ducklake", 313 "ducklake_type_to_arrow", 314 "ducklake_type_to_sql", 315]
33@runtime_checkable 34class ArrowStreamExportable(Protocol): 35 """Protocol for objects implementing the Arrow PyCapsule interface.""" 36 37 def __arrow_c_stream__(self, requested_schema: Any = None) -> Any: ...
Protocol for objects implementing the Arrow PyCapsule interface.
1739def _no_init_or_replace_init(self, *args, **kwargs): 1740 cls = type(self) 1741 1742 if cls._is_protocol: 1743 raise TypeError('Protocols cannot be instantiated') 1744 1745 # Already using a custom `__init__`. No need to calculate correct 1746 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1747 if cls.__init__ is not _no_init_or_replace_init: 1748 return 1749 1750 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1751 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1752 # searches for a proper new `__init__` in the MRO. The new `__init__` 1753 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1754 # instantiation of the protocol subclass will thus use the new 1755 # `__init__` and no longer call `_no_init_or_replace_init`. 1756 for base in cls.__mro__: 1757 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1758 if init is not _no_init_or_replace_init: 1759 cls.__init__ = init 1760 break 1761 else: 1762 # should not happen 1763 cls.__init__ = object.__init__ 1764 1765 cls.__init__(self, *args, **kwargs)
128def AlwaysFalse() -> _AlwaysFalse: # noqa: N802 129 """Return the AlwaysFalse singleton.""" 130 return _AlwaysFalse()
Return the AlwaysFalse singleton.
123def AlwaysTrue() -> _AlwaysTrue: # noqa: N802 124 """Return the AlwaysTrue singleton.""" 125 return _AlwaysTrue()
Return the AlwaysTrue singleton.
166@dataclass(frozen=True) 167class And(BooleanExpression): 168 """Logical AND with short-circuit simplification.""" 169 170 left: BooleanExpression 171 right: BooleanExpression 172 173 def __new__(cls, left: BooleanExpression, right: BooleanExpression) -> BooleanExpression: # type: ignore[misc] 174 if isinstance(left, _AlwaysTrue): 175 return right 176 if isinstance(right, _AlwaysTrue): 177 return left 178 if isinstance(left, _AlwaysFalse) or isinstance(right, _AlwaysFalse): 179 return AlwaysFalse() 180 instance = super().__new__(cls) 181 return instance 182 183 def to_sql(self) -> str: 184 return f"({self.left.to_sql()} AND {self.right.to_sql()})" 185 186 def __repr__(self) -> str: 187 return f"And(left={self.left!r}, right={self.right!r})"
Logical AND with short-circuit simplification.
61class BooleanExpression(ABC): 62 """Base for all filter expressions.""" 63 64 @abstractmethod 65 def to_sql(self) -> str: ... 66 67 def __and__(self, other: BooleanExpression) -> BooleanExpression: 68 return And(self, other) 69 70 def __or__(self, other: BooleanExpression) -> BooleanExpression: 71 return Or(self, other) 72 73 def __invert__(self) -> BooleanExpression: 74 return Not(self)
Base for all filter expressions.
195class Catalog: 196 """Ducklake catalog backed by DuckDB + ducklake extension. 197 198 Examples: 199 200 ```pycon 201 >>> import tempfile, os 202 >>> tmp = tempfile.mkdtemp() 203 >>> cat = Catalog("my_lake", os.path.join(tmp, "meta.duckdb"), data_path=os.path.join(tmp, "data")) 204 >>> cat.name 205 'my_lake' 206 207 ``` 208 """ 209 210 def __init__( 211 self, 212 name: str, 213 uri: str, 214 *, 215 data_path: str | None = None, 216 properties: dict[str, str] | None = None, 217 encrypted: bool = False, 218 ) -> None: 219 self._name = name 220 self._uri = uri 221 self._data_path = data_path 222 self._encrypted = encrypted 223 224 self._conn = duckdb.connect() 225 self._conn.execute("INSTALL ducklake; LOAD ducklake;") 226 227 # Apply connection properties (e.g., S3 configuration) 228 for key, value in (properties or {}).items(): 229 if not self._OPTION_KEY_RE.match(key): 230 raise ValueError( 231 f"Invalid property key: {key!r}. Keys must contain only alphanumeric characters and underscores." 232 ) 233 self._conn.execute(f"SET {key} = '{escape_string_literal(value)}'") 234 235 attach_sql = f"ATTACH 'ducklake:{uri}' AS {quote_identifier(name)}" 236 options: list[str] = [] 237 if data_path is not None: 238 options.append(f"DATA_PATH '{escape_string_literal(data_path)}'") 239 if encrypted: 240 options.append("ENCRYPTED") 241 if options: 242 attach_sql += f" ({', '.join(options)})" 243 self._conn.execute(attach_sql) 244 245 @property 246 def name(self) -> str: 247 return self._name 248 249 @property 250 def encrypted(self) -> bool: 251 """Whether this catalog uses Parquet encryption.""" 252 return self._encrypted 253 254 @property 255 def connection(self) -> duckdb.DuckDBPyConnection: 256 """Access the underlying DuckDB connection.""" 257 return self._conn 258 259 # -- Namespace operations ------------------------------------------------ 260 261 def list_namespaces(self) -> list[str]: 262 """List all namespaces (schemas).""" 263 rows = self.fetchall( 264 "SELECT schema_name FROM information_schema.schemata " 265 f"WHERE catalog_name = '{escape_string_literal(self._name)}' " 266 "ORDER BY schema_name" 267 ) 268 return [row[0] for row in rows] 269 270 def create_namespace(self, namespace: str) -> None: 271 """CREATE SCHEMA. Raises NamespaceAlreadyExistsError if exists.""" 272 if self.namespace_exists(namespace): 273 raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}") 274 self._execute(f"CREATE SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}") 275 276 def create_namespace_if_not_exists(self, namespace: str) -> None: 277 """Create namespace, no-op if already exists.""" 278 self._execute(f"CREATE SCHEMA IF NOT EXISTS {quote_identifier(self._name)}.{quote_identifier(namespace)}") 279 280 def drop_namespace(self, namespace: str) -> None: 281 """DROP SCHEMA. Raises NamespaceNotEmptyError if non-empty, NoSuchNamespaceError if not found.""" 282 if not self.namespace_exists(namespace): 283 raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") 284 tables = self.list_tables(namespace) 285 views = self.list_views(namespace) 286 if tables or views: 287 raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}") 288 self._execute(f"DROP SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}") 289 290 def namespace_exists(self, namespace: str) -> bool: 291 rows = self.fetchall( 292 "SELECT 1 FROM information_schema.schemata " 293 f"WHERE catalog_name = '{escape_string_literal(self._name)}' " 294 f"AND schema_name = '{escape_string_literal(namespace)}'" 295 ) 296 return len(rows) > 0 297 298 # -- Table operations ---------------------------------------------------- 299 300 def list_tables(self, namespace: str = "main") -> list[tuple[str, str]]: 301 """List tables in namespace. Returns list of (namespace, table_name) tuples.""" 302 rows = self.fetchall( 303 "SELECT table_schema, table_name FROM information_schema.tables " 304 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 305 f"AND table_schema = '{escape_string_literal(namespace)}' " 306 "AND table_type != 'VIEW' " 307 "ORDER BY table_name" 308 ) 309 return [(row[0], row[1]) for row in rows] 310 311 def create_table( 312 self, 313 identifier: str | tuple[str, str], 314 schema: Schema, 315 ) -> Table: 316 """CREATE TABLE with the given schema. Raises TableAlreadyExistsError if exists. 317 318 Examples: 319 320 ```pycon 321 >>> import tempfile, os 322 >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType 323 >>> tmp = tempfile.mkdtemp() 324 >>> cat = Catalog("test", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d")) 325 >>> schema = Schema.of(required("id", IntegerType()), optional("name", StringType())) 326 >>> table = cat.create_table("users", schema) 327 >>> table.name 328 'users' 329 >>> [f.name for f in table.schema.fields] 330 ['id', 'name'] 331 332 ``` 333 """ 334 namespace, table_name = self._resolve_identifier(identifier) 335 336 if self.table_exists((namespace, table_name)): 337 raise TableAlreadyExistsError(f"Table already exists: {namespace}.{table_name}") 338 339 fqn = self.fully_qualified_name(namespace, table_name) 340 col_defs = ", ".join( 341 f"{quote_identifier(f.name)} {ducklake_type_to_sql(f.field_type)}{' NOT NULL' if f.required else ''}" 342 for f in schema.fields 343 ) 344 self._execute(f"CREATE TABLE {fqn} ({col_defs})") 345 346 # Reload schema from DuckDB to get canonical types 347 loaded_schema = self.build_schema_from_describe(namespace, table_name) 348 from pyducklake.sorting import UNSORTED 349 350 return Table(identifier=(namespace, table_name), schema=loaded_schema, catalog=self, sort_order=UNSORTED) 351 352 def create_table_if_not_exists( 353 self, 354 identifier: str | tuple[str, str], 355 schema: Schema, 356 ) -> Table: 357 """Create table if it doesn't exist, otherwise load and return existing.""" 358 namespace, table_name = self._resolve_identifier(identifier) 359 if self.table_exists((namespace, table_name)): 360 return self.load_table((namespace, table_name)) 361 return self.create_table((namespace, table_name), schema) 362 363 def load_table(self, identifier: str | tuple[str, str]) -> Table: 364 """Load table metadata. Raises NoSuchTableError if not found.""" 365 namespace, table_name = self._resolve_identifier(identifier) 366 if not self.table_exists((namespace, table_name)): 367 raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") 368 schema = self.build_schema_from_describe(namespace, table_name) 369 return Table(identifier=(namespace, table_name), schema=schema, catalog=self) 370 371 def drop_table(self, identifier: str | tuple[str, str]) -> None: 372 """DROP TABLE. Raises NoSuchTableError if not found.""" 373 namespace, table_name = self._resolve_identifier(identifier) 374 if not self.table_exists((namespace, table_name)): 375 raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") 376 fqn = self.fully_qualified_name(namespace, table_name) 377 self._execute(f"DROP TABLE {fqn}") 378 379 def rename_table(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> Table: 380 """ALTER TABLE RENAME TO. Returns the renamed table.""" 381 from_ns, from_name = self._resolve_identifier(from_identifier) 382 to_ns, to_name = self._resolve_identifier(to_identifier) 383 384 if not self.table_exists((from_ns, from_name)): 385 raise NoSuchTableError(f"Table does not exist: {from_ns}.{from_name}") 386 387 if from_ns != to_ns: 388 raise DucklakeError("Cross-namespace rename is not supported") 389 390 from_fqn = self.fully_qualified_name(from_ns, from_name) 391 # DuckDB RENAME TO expects just the new table name, not fully qualified 392 self._execute(f"ALTER TABLE {from_fqn} RENAME TO {quote_identifier(to_name)}") 393 return self.load_table((to_ns, to_name)) 394 395 def table_exists(self, identifier: str | tuple[str, str]) -> bool: 396 namespace, table_name = self._resolve_identifier(identifier) 397 rows = self.fetchall( 398 "SELECT 1 FROM information_schema.tables " 399 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 400 f"AND table_schema = '{escape_string_literal(namespace)}' " 401 f"AND table_name = '{escape_string_literal(table_name)}'" 402 ) 403 return len(rows) > 0 404 405 # -- View operations ----------------------------------------------------- 406 407 def create_view( 408 self, 409 identifier: str | tuple[str, str], 410 sql: str, 411 ) -> View: 412 """Create a view in the catalog. 413 414 Uses: CREATE VIEW {fqn} AS {sql} 415 416 Raises ViewAlreadyExistsError if the view already exists. 417 """ 418 namespace, view_name = self._resolve_identifier(identifier) 419 if self.view_exists((namespace, view_name)): 420 raise ViewAlreadyExistsError(f"View already exists: {namespace}.{view_name}") 421 fqn = self.fully_qualified_name(namespace, view_name) 422 self._execute(f"CREATE VIEW {fqn} AS {sql}") 423 return self.load_view((namespace, view_name)) 424 425 def create_or_replace_view( 426 self, 427 identifier: str | tuple[str, str], 428 sql: str, 429 ) -> View: 430 """Create or replace a view. Returns the View object.""" 431 namespace, view_name = self._resolve_identifier(identifier) 432 fqn = self.fully_qualified_name(namespace, view_name) 433 self._execute(f"CREATE OR REPLACE VIEW {fqn} AS {sql}") 434 return self.load_view((namespace, view_name)) 435 436 def load_view(self, identifier: str | tuple[str, str]) -> View: 437 """Load a view. Raises NoSuchViewError if not found. 438 439 Queries the view's schema and SQL definition. 440 """ 441 namespace, view_name = self._resolve_identifier(identifier) 442 if not self.view_exists((namespace, view_name)): 443 raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}") 444 schema = self.build_schema_from_describe(namespace, view_name) 445 # Get the SQL definition 446 rows = self.fetchall( 447 "SELECT view_definition FROM information_schema.views " 448 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 449 f"AND table_schema = '{escape_string_literal(namespace)}' " 450 f"AND table_name = '{escape_string_literal(view_name)}'" 451 ) 452 sql_text = rows[0][0] if rows else "" 453 return View(identifier=(namespace, view_name), schema=schema, sql=sql_text, catalog=self) 454 455 def rename_view(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> View: 456 """Rename a view. Raises NoSuchViewError if not found.""" 457 from_ns, from_name = self._resolve_identifier(from_identifier) 458 to_ns, to_name = self._resolve_identifier(to_identifier) 459 460 if not self.view_exists((from_ns, from_name)): 461 raise NoSuchViewError(f"View does not exist: {from_ns}.{from_name}") 462 463 if from_ns != to_ns: 464 raise DucklakeError("Cross-namespace rename is not supported") 465 466 from_fqn = self.fully_qualified_name(from_ns, from_name) 467 self._execute(f"ALTER VIEW {from_fqn} RENAME TO {quote_identifier(to_name)}") 468 return self.load_view((to_ns, to_name)) 469 470 def drop_view(self, identifier: str | tuple[str, str]) -> None: 471 """Drop a view. Raises NoSuchViewError if not found.""" 472 namespace, view_name = self._resolve_identifier(identifier) 473 if not self.view_exists((namespace, view_name)): 474 raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}") 475 fqn = self.fully_qualified_name(namespace, view_name) 476 self._execute(f"DROP VIEW {fqn}") 477 478 def list_views(self, namespace: str = "main") -> list[tuple[str, str]]: 479 """List views in namespace. Returns list of (namespace, view_name) tuples.""" 480 rows = self.fetchall( 481 "SELECT table_schema, table_name FROM information_schema.tables " 482 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 483 f"AND table_schema = '{escape_string_literal(namespace)}' " 484 "AND table_type = 'VIEW' " 485 "ORDER BY table_name" 486 ) 487 return [(row[0], row[1]) for row in rows] 488 489 def view_exists(self, identifier: str | tuple[str, str]) -> bool: 490 """Check if a view exists.""" 491 namespace, view_name = self._resolve_identifier(identifier) 492 rows = self.fetchall( 493 "SELECT 1 FROM information_schema.tables " 494 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 495 f"AND table_schema = '{escape_string_literal(namespace)}' " 496 f"AND table_name = '{escape_string_literal(view_name)}' " 497 "AND table_type = 'VIEW'" 498 ) 499 return len(rows) > 0 500 501 # -- Internal helpers ---------------------------------------------------- 502 503 def _resolve_identifier(self, identifier: str | tuple[str, str]) -> tuple[str, str]: 504 """Parse identifier into (namespace, table_name). Default namespace is 'main'.""" 505 if isinstance(identifier, tuple): 506 return identifier 507 parts = identifier.split(".", 1) 508 if len(parts) == 2: 509 return (parts[0], parts[1]) 510 return ("main", parts[0]) 511 512 def fully_qualified_name(self, namespace: str, table_name: str) -> str: 513 """Returns 'catalog_name.namespace.table_name' with proper quoting.""" 514 return f"{quote_identifier(self._name)}.{quote_identifier(namespace)}.{quote_identifier(table_name)}" 515 516 def _execute(self, sql: str, params: list[Any] | None = None) -> duckdb.DuckDBPyConnection: 517 """Execute SQL on the connection.""" 518 if params: 519 return self._conn.execute(sql, params) 520 return self._conn.execute(sql) 521 522 def fetchall(self, sql: str) -> list[tuple[Any, ...]]: 523 """Execute and fetchall.""" 524 result = self._conn.execute(sql) 525 rows: list[tuple[Any, ...]] = result.fetchall() 526 return rows 527 528 def build_schema_from_describe(self, namespace: str, table_name: str) -> Schema: 529 """Build a Schema from information_schema.columns for an existing table.""" 530 rows = self.fetchall( 531 "SELECT column_name, data_type, is_nullable FROM information_schema.columns " 532 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 533 f"AND table_schema = '{escape_string_literal(namespace)}' " 534 f"AND table_name = '{escape_string_literal(table_name)}' " 535 "ORDER BY ordinal_position" 536 ) 537 # field_id_counter tracks IDs for nested types as well 538 field_id_counter = [1] 539 fields: list[NestedField] = [] 540 for col_name, col_type, is_nullable in rows: 541 dtype = _duckdb_type_to_ducklake(col_type, field_id_counter) 542 fid = field_id_counter[0] 543 field_id_counter[0] += 1 544 required = is_nullable == "NO" 545 fields.append(NestedField(field_id=fid, name=col_name, field_type=dtype, required=required)) 546 return Schema(*fields) 547 548 # -- Commit Metadata ----------------------------------------------------- 549 550 def set_commit_message(self, message: str, *, author: str | None = None) -> None: 551 """Set commit message and optional author for the next transaction. 552 553 Must be called inside an explicit ``BEGIN TRANSACTION`` / ``COMMIT`` 554 block for the metadata to be recorded on the snapshot. 555 556 Uses: ``CALL {catalog}.set_commit_message(author, commit_message)`` 557 """ 558 cat = quote_identifier(self._name) 559 author_sql = f"'{escape_string_literal(author)}'" if author is not None else "NULL" 560 msg_sql = f"'{escape_string_literal(message)}'" 561 self._execute(f"CALL {cat}.set_commit_message({author_sql}, {msg_sql})") 562 563 # -- Configuration ------------------------------------------------------- 564 565 _OPTION_KEY_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") 566 _SCOPE_PART_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") 567 568 def set_option( 569 self, 570 key: str, 571 value: str, 572 *, 573 scope: str | None = None, 574 ) -> None: 575 """Set a Ducklake configuration option. 576 577 Args: 578 key: Option key (e.g., ``'target_file_size'``). 579 value: Option value. 580 scope: Optional ``'schema.table'`` for table-level scope. 581 ``None`` for global scope. 582 583 Uses: ``CALL {catalog}.set_option(option, value, table_name, schema)`` 584 585 Raises: 586 ValueError: If ``key`` or ``scope`` contain invalid characters. 587 """ 588 if not self._OPTION_KEY_RE.match(key): 589 raise ValueError( 590 f"Invalid option key: {key!r}. Keys must contain only alphanumeric characters and underscores." 591 ) 592 593 cat = quote_identifier(self._name) 594 key_sql = f"'{escape_string_literal(key)}'" 595 value_sql = f"'{escape_string_literal(value)}'" 596 if scope is not None: 597 parts = scope.split(".", 1) 598 if len(parts) == 2: 599 schema_name, table_name = parts 600 else: 601 schema_name = "main" 602 table_name = parts[0] 603 for part_label, part_value in [("schema", schema_name), ("table", table_name)]: 604 if not self._SCOPE_PART_RE.match(part_value): 605 raise ValueError( 606 f"Invalid scope {part_label}: {part_value!r}. " 607 "Scope parts must contain only alphanumeric characters and underscores." 608 ) 609 self._execute( 610 f"CALL {cat}.set_option(" 611 f"{key_sql}, {value_sql}, " 612 f"table_name := '{escape_string_literal(table_name)}', " 613 f"schema := '{escape_string_literal(schema_name)}')" 614 ) 615 else: 616 self._execute(f"CALL {cat}.set_option({key_sql}, {value_sql})") 617 618 def get_options(self) -> pa.Table: 619 """Get all configuration options as an Arrow table. 620 621 Columns: option_name, description, value, scope, scope_entry 622 """ 623 cat = quote_identifier(self._name) 624 result: Any = self._conn.execute(f"SELECT * FROM {cat}.options()") 625 arrow_obj: Any = result.arrow() 626 if isinstance(arrow_obj, pa.Table): 627 return arrow_obj 628 tbl: pa.Table = arrow_obj.read_all() 629 return tbl 630 631 # -- Transaction --------------------------------------------------------- 632 633 def begin_transaction(self) -> Transaction: 634 """Begin a multi-operation transaction. 635 636 All table operations within the transaction are committed atomically. 637 """ 638 from pyducklake.transaction import Transaction 639 640 return Transaction(self) 641 642 # -- Context manager ----------------------------------------------------- 643 644 def __enter__(self) -> Catalog: 645 return self 646 647 def __exit__(self, *args: Any) -> None: 648 self.close() 649 650 def close(self) -> None: 651 """Close the DuckDB connection.""" 652 self._conn.close()
Ducklake catalog backed by DuckDB + ducklake extension.
Examples:
>>> import tempfile, os
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("my_lake", os.path.join(tmp, "meta.duckdb"), data_path=os.path.join(tmp, "data"))
>>> cat.name
'my_lake'
210 def __init__( 211 self, 212 name: str, 213 uri: str, 214 *, 215 data_path: str | None = None, 216 properties: dict[str, str] | None = None, 217 encrypted: bool = False, 218 ) -> None: 219 self._name = name 220 self._uri = uri 221 self._data_path = data_path 222 self._encrypted = encrypted 223 224 self._conn = duckdb.connect() 225 self._conn.execute("INSTALL ducklake; LOAD ducklake;") 226 227 # Apply connection properties (e.g., S3 configuration) 228 for key, value in (properties or {}).items(): 229 if not self._OPTION_KEY_RE.match(key): 230 raise ValueError( 231 f"Invalid property key: {key!r}. Keys must contain only alphanumeric characters and underscores." 232 ) 233 self._conn.execute(f"SET {key} = '{escape_string_literal(value)}'") 234 235 attach_sql = f"ATTACH 'ducklake:{uri}' AS {quote_identifier(name)}" 236 options: list[str] = [] 237 if data_path is not None: 238 options.append(f"DATA_PATH '{escape_string_literal(data_path)}'") 239 if encrypted: 240 options.append("ENCRYPTED") 241 if options: 242 attach_sql += f" ({', '.join(options)})" 243 self._conn.execute(attach_sql)
249 @property 250 def encrypted(self) -> bool: 251 """Whether this catalog uses Parquet encryption.""" 252 return self._encrypted
Whether this catalog uses Parquet encryption.
254 @property 255 def connection(self) -> duckdb.DuckDBPyConnection: 256 """Access the underlying DuckDB connection.""" 257 return self._conn
Access the underlying DuckDB connection.
261 def list_namespaces(self) -> list[str]: 262 """List all namespaces (schemas).""" 263 rows = self.fetchall( 264 "SELECT schema_name FROM information_schema.schemata " 265 f"WHERE catalog_name = '{escape_string_literal(self._name)}' " 266 "ORDER BY schema_name" 267 ) 268 return [row[0] for row in rows]
List all namespaces (schemas).
270 def create_namespace(self, namespace: str) -> None: 271 """CREATE SCHEMA. Raises NamespaceAlreadyExistsError if exists.""" 272 if self.namespace_exists(namespace): 273 raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}") 274 self._execute(f"CREATE SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}")
CREATE SCHEMA. Raises NamespaceAlreadyExistsError if exists.
276 def create_namespace_if_not_exists(self, namespace: str) -> None: 277 """Create namespace, no-op if already exists.""" 278 self._execute(f"CREATE SCHEMA IF NOT EXISTS {quote_identifier(self._name)}.{quote_identifier(namespace)}")
Create namespace, no-op if already exists.
280 def drop_namespace(self, namespace: str) -> None: 281 """DROP SCHEMA. Raises NamespaceNotEmptyError if non-empty, NoSuchNamespaceError if not found.""" 282 if not self.namespace_exists(namespace): 283 raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") 284 tables = self.list_tables(namespace) 285 views = self.list_views(namespace) 286 if tables or views: 287 raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}") 288 self._execute(f"DROP SCHEMA {quote_identifier(self._name)}.{quote_identifier(namespace)}")
DROP SCHEMA. Raises NamespaceNotEmptyError if non-empty, NoSuchNamespaceError if not found.
300 def list_tables(self, namespace: str = "main") -> list[tuple[str, str]]: 301 """List tables in namespace. Returns list of (namespace, table_name) tuples.""" 302 rows = self.fetchall( 303 "SELECT table_schema, table_name FROM information_schema.tables " 304 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 305 f"AND table_schema = '{escape_string_literal(namespace)}' " 306 "AND table_type != 'VIEW' " 307 "ORDER BY table_name" 308 ) 309 return [(row[0], row[1]) for row in rows]
List tables in namespace. Returns list of (namespace, table_name) tuples.
311 def create_table( 312 self, 313 identifier: str | tuple[str, str], 314 schema: Schema, 315 ) -> Table: 316 """CREATE TABLE with the given schema. Raises TableAlreadyExistsError if exists. 317 318 Examples: 319 320 ```pycon 321 >>> import tempfile, os 322 >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType 323 >>> tmp = tempfile.mkdtemp() 324 >>> cat = Catalog("test", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d")) 325 >>> schema = Schema.of(required("id", IntegerType()), optional("name", StringType())) 326 >>> table = cat.create_table("users", schema) 327 >>> table.name 328 'users' 329 >>> [f.name for f in table.schema.fields] 330 ['id', 'name'] 331 332 ``` 333 """ 334 namespace, table_name = self._resolve_identifier(identifier) 335 336 if self.table_exists((namespace, table_name)): 337 raise TableAlreadyExistsError(f"Table already exists: {namespace}.{table_name}") 338 339 fqn = self.fully_qualified_name(namespace, table_name) 340 col_defs = ", ".join( 341 f"{quote_identifier(f.name)} {ducklake_type_to_sql(f.field_type)}{' NOT NULL' if f.required else ''}" 342 for f in schema.fields 343 ) 344 self._execute(f"CREATE TABLE {fqn} ({col_defs})") 345 346 # Reload schema from DuckDB to get canonical types 347 loaded_schema = self.build_schema_from_describe(namespace, table_name) 348 from pyducklake.sorting import UNSORTED 349 350 return Table(identifier=(namespace, table_name), schema=loaded_schema, catalog=self, sort_order=UNSORTED)
CREATE TABLE with the given schema. Raises TableAlreadyExistsError if exists.
Examples:
>>> import tempfile, os
>>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("test", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
>>> schema = Schema.of(required("id", IntegerType()), optional("name", StringType()))
>>> table = cat.create_table("users", schema)
>>> table.name
'users'
>>> [f.name for f in table.schema.fields]
['id', 'name']
352 def create_table_if_not_exists( 353 self, 354 identifier: str | tuple[str, str], 355 schema: Schema, 356 ) -> Table: 357 """Create table if it doesn't exist, otherwise load and return existing.""" 358 namespace, table_name = self._resolve_identifier(identifier) 359 if self.table_exists((namespace, table_name)): 360 return self.load_table((namespace, table_name)) 361 return self.create_table((namespace, table_name), schema)
Create table if it doesn't exist, otherwise load and return existing.
363 def load_table(self, identifier: str | tuple[str, str]) -> Table: 364 """Load table metadata. Raises NoSuchTableError if not found.""" 365 namespace, table_name = self._resolve_identifier(identifier) 366 if not self.table_exists((namespace, table_name)): 367 raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") 368 schema = self.build_schema_from_describe(namespace, table_name) 369 return Table(identifier=(namespace, table_name), schema=schema, catalog=self)
Load table metadata. Raises NoSuchTableError if not found.
371 def drop_table(self, identifier: str | tuple[str, str]) -> None: 372 """DROP TABLE. Raises NoSuchTableError if not found.""" 373 namespace, table_name = self._resolve_identifier(identifier) 374 if not self.table_exists((namespace, table_name)): 375 raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") 376 fqn = self.fully_qualified_name(namespace, table_name) 377 self._execute(f"DROP TABLE {fqn}")
DROP TABLE. Raises NoSuchTableError if not found.
379 def rename_table(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> Table: 380 """ALTER TABLE RENAME TO. Returns the renamed table.""" 381 from_ns, from_name = self._resolve_identifier(from_identifier) 382 to_ns, to_name = self._resolve_identifier(to_identifier) 383 384 if not self.table_exists((from_ns, from_name)): 385 raise NoSuchTableError(f"Table does not exist: {from_ns}.{from_name}") 386 387 if from_ns != to_ns: 388 raise DucklakeError("Cross-namespace rename is not supported") 389 390 from_fqn = self.fully_qualified_name(from_ns, from_name) 391 # DuckDB RENAME TO expects just the new table name, not fully qualified 392 self._execute(f"ALTER TABLE {from_fqn} RENAME TO {quote_identifier(to_name)}") 393 return self.load_table((to_ns, to_name))
ALTER TABLE RENAME TO. Returns the renamed table.
395 def table_exists(self, identifier: str | tuple[str, str]) -> bool: 396 namespace, table_name = self._resolve_identifier(identifier) 397 rows = self.fetchall( 398 "SELECT 1 FROM information_schema.tables " 399 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 400 f"AND table_schema = '{escape_string_literal(namespace)}' " 401 f"AND table_name = '{escape_string_literal(table_name)}'" 402 ) 403 return len(rows) > 0
407 def create_view( 408 self, 409 identifier: str | tuple[str, str], 410 sql: str, 411 ) -> View: 412 """Create a view in the catalog. 413 414 Uses: CREATE VIEW {fqn} AS {sql} 415 416 Raises ViewAlreadyExistsError if the view already exists. 417 """ 418 namespace, view_name = self._resolve_identifier(identifier) 419 if self.view_exists((namespace, view_name)): 420 raise ViewAlreadyExistsError(f"View already exists: {namespace}.{view_name}") 421 fqn = self.fully_qualified_name(namespace, view_name) 422 self._execute(f"CREATE VIEW {fqn} AS {sql}") 423 return self.load_view((namespace, view_name))
Create a view in the catalog.
Uses: CREATE VIEW {fqn} AS {sql}
Raises ViewAlreadyExistsError if the view already exists.
425 def create_or_replace_view( 426 self, 427 identifier: str | tuple[str, str], 428 sql: str, 429 ) -> View: 430 """Create or replace a view. Returns the View object.""" 431 namespace, view_name = self._resolve_identifier(identifier) 432 fqn = self.fully_qualified_name(namespace, view_name) 433 self._execute(f"CREATE OR REPLACE VIEW {fqn} AS {sql}") 434 return self.load_view((namespace, view_name))
Create or replace a view. Returns the View object.
436 def load_view(self, identifier: str | tuple[str, str]) -> View: 437 """Load a view. Raises NoSuchViewError if not found. 438 439 Queries the view's schema and SQL definition. 440 """ 441 namespace, view_name = self._resolve_identifier(identifier) 442 if not self.view_exists((namespace, view_name)): 443 raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}") 444 schema = self.build_schema_from_describe(namespace, view_name) 445 # Get the SQL definition 446 rows = self.fetchall( 447 "SELECT view_definition FROM information_schema.views " 448 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 449 f"AND table_schema = '{escape_string_literal(namespace)}' " 450 f"AND table_name = '{escape_string_literal(view_name)}'" 451 ) 452 sql_text = rows[0][0] if rows else "" 453 return View(identifier=(namespace, view_name), schema=schema, sql=sql_text, catalog=self)
Load a view. Raises NoSuchViewError if not found.
Queries the view's schema and SQL definition.
455 def rename_view(self, from_identifier: str | tuple[str, str], to_identifier: str | tuple[str, str]) -> View: 456 """Rename a view. Raises NoSuchViewError if not found.""" 457 from_ns, from_name = self._resolve_identifier(from_identifier) 458 to_ns, to_name = self._resolve_identifier(to_identifier) 459 460 if not self.view_exists((from_ns, from_name)): 461 raise NoSuchViewError(f"View does not exist: {from_ns}.{from_name}") 462 463 if from_ns != to_ns: 464 raise DucklakeError("Cross-namespace rename is not supported") 465 466 from_fqn = self.fully_qualified_name(from_ns, from_name) 467 self._execute(f"ALTER VIEW {from_fqn} RENAME TO {quote_identifier(to_name)}") 468 return self.load_view((to_ns, to_name))
Rename a view. Raises NoSuchViewError if not found.
470 def drop_view(self, identifier: str | tuple[str, str]) -> None: 471 """Drop a view. Raises NoSuchViewError if not found.""" 472 namespace, view_name = self._resolve_identifier(identifier) 473 if not self.view_exists((namespace, view_name)): 474 raise NoSuchViewError(f"View does not exist: {namespace}.{view_name}") 475 fqn = self.fully_qualified_name(namespace, view_name) 476 self._execute(f"DROP VIEW {fqn}")
Drop a view. Raises NoSuchViewError if not found.
478 def list_views(self, namespace: str = "main") -> list[tuple[str, str]]: 479 """List views in namespace. Returns list of (namespace, view_name) tuples.""" 480 rows = self.fetchall( 481 "SELECT table_schema, table_name FROM information_schema.tables " 482 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 483 f"AND table_schema = '{escape_string_literal(namespace)}' " 484 "AND table_type = 'VIEW' " 485 "ORDER BY table_name" 486 ) 487 return [(row[0], row[1]) for row in rows]
List views in namespace. Returns list of (namespace, view_name) tuples.
489 def view_exists(self, identifier: str | tuple[str, str]) -> bool: 490 """Check if a view exists.""" 491 namespace, view_name = self._resolve_identifier(identifier) 492 rows = self.fetchall( 493 "SELECT 1 FROM information_schema.tables " 494 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 495 f"AND table_schema = '{escape_string_literal(namespace)}' " 496 f"AND table_name = '{escape_string_literal(view_name)}' " 497 "AND table_type = 'VIEW'" 498 ) 499 return len(rows) > 0
Check if a view exists.
512 def fully_qualified_name(self, namespace: str, table_name: str) -> str: 513 """Returns 'catalog_name.namespace.table_name' with proper quoting.""" 514 return f"{quote_identifier(self._name)}.{quote_identifier(namespace)}.{quote_identifier(table_name)}"
Returns 'catalog_name.namespace.table_name' with proper quoting.
522 def fetchall(self, sql: str) -> list[tuple[Any, ...]]: 523 """Execute and fetchall.""" 524 result = self._conn.execute(sql) 525 rows: list[tuple[Any, ...]] = result.fetchall() 526 return rows
Execute and fetchall.
528 def build_schema_from_describe(self, namespace: str, table_name: str) -> Schema: 529 """Build a Schema from information_schema.columns for an existing table.""" 530 rows = self.fetchall( 531 "SELECT column_name, data_type, is_nullable FROM information_schema.columns " 532 f"WHERE table_catalog = '{escape_string_literal(self._name)}' " 533 f"AND table_schema = '{escape_string_literal(namespace)}' " 534 f"AND table_name = '{escape_string_literal(table_name)}' " 535 "ORDER BY ordinal_position" 536 ) 537 # field_id_counter tracks IDs for nested types as well 538 field_id_counter = [1] 539 fields: list[NestedField] = [] 540 for col_name, col_type, is_nullable in rows: 541 dtype = _duckdb_type_to_ducklake(col_type, field_id_counter) 542 fid = field_id_counter[0] 543 field_id_counter[0] += 1 544 required = is_nullable == "NO" 545 fields.append(NestedField(field_id=fid, name=col_name, field_type=dtype, required=required)) 546 return Schema(*fields)
Build a Schema from information_schema.columns for an existing table.
550 def set_commit_message(self, message: str, *, author: str | None = None) -> None: 551 """Set commit message and optional author for the next transaction. 552 553 Must be called inside an explicit ``BEGIN TRANSACTION`` / ``COMMIT`` 554 block for the metadata to be recorded on the snapshot. 555 556 Uses: ``CALL {catalog}.set_commit_message(author, commit_message)`` 557 """ 558 cat = quote_identifier(self._name) 559 author_sql = f"'{escape_string_literal(author)}'" if author is not None else "NULL" 560 msg_sql = f"'{escape_string_literal(message)}'" 561 self._execute(f"CALL {cat}.set_commit_message({author_sql}, {msg_sql})")
Set commit message and optional author for the next transaction.
Must be called inside an explicit BEGIN TRANSACTION / COMMIT
block for the metadata to be recorded on the snapshot.
Uses: CALL {catalog}.set_commit_message(author, commit_message)
568 def set_option( 569 self, 570 key: str, 571 value: str, 572 *, 573 scope: str | None = None, 574 ) -> None: 575 """Set a Ducklake configuration option. 576 577 Args: 578 key: Option key (e.g., ``'target_file_size'``). 579 value: Option value. 580 scope: Optional ``'schema.table'`` for table-level scope. 581 ``None`` for global scope. 582 583 Uses: ``CALL {catalog}.set_option(option, value, table_name, schema)`` 584 585 Raises: 586 ValueError: If ``key`` or ``scope`` contain invalid characters. 587 """ 588 if not self._OPTION_KEY_RE.match(key): 589 raise ValueError( 590 f"Invalid option key: {key!r}. Keys must contain only alphanumeric characters and underscores." 591 ) 592 593 cat = quote_identifier(self._name) 594 key_sql = f"'{escape_string_literal(key)}'" 595 value_sql = f"'{escape_string_literal(value)}'" 596 if scope is not None: 597 parts = scope.split(".", 1) 598 if len(parts) == 2: 599 schema_name, table_name = parts 600 else: 601 schema_name = "main" 602 table_name = parts[0] 603 for part_label, part_value in [("schema", schema_name), ("table", table_name)]: 604 if not self._SCOPE_PART_RE.match(part_value): 605 raise ValueError( 606 f"Invalid scope {part_label}: {part_value!r}. " 607 "Scope parts must contain only alphanumeric characters and underscores." 608 ) 609 self._execute( 610 f"CALL {cat}.set_option(" 611 f"{key_sql}, {value_sql}, " 612 f"table_name := '{escape_string_literal(table_name)}', " 613 f"schema := '{escape_string_literal(schema_name)}')" 614 ) 615 else: 616 self._execute(f"CALL {cat}.set_option({key_sql}, {value_sql})")
Set a Ducklake configuration option.
Args:
key: Option key (e.g., 'target_file_size').
value: Option value.
scope: Optional 'schema.table' for table-level scope.
None for global scope.
Uses: CALL {catalog}.set_option(option, value, table_name, schema)
Raises:
ValueError: If key or scope contain invalid characters.
618 def get_options(self) -> pa.Table: 619 """Get all configuration options as an Arrow table. 620 621 Columns: option_name, description, value, scope, scope_entry 622 """ 623 cat = quote_identifier(self._name) 624 result: Any = self._conn.execute(f"SELECT * FROM {cat}.options()") 625 arrow_obj: Any = result.arrow() 626 if isinstance(arrow_obj, pa.Table): 627 return arrow_obj 628 tbl: pa.Table = arrow_obj.read_all() 629 return tbl
Get all configuration options as an Arrow table.
Columns: option_name, description, value, scope, scope_entry
633 def begin_transaction(self) -> Transaction: 634 """Begin a multi-operation transaction. 635 636 All table operations within the transaction are committed atomically. 637 """ 638 from pyducklake.transaction import Transaction 639 640 return Transaction(self)
Begin a multi-operation transaction.
All table operations within the transaction are committed atomically.
19class ChangeSet: 20 """Result of a CDC query with helpers for analyzing changes.""" 21 22 def __init__( 23 self, 24 arrow_table: pa.Table, 25 change_type_col: str | None = "change_type", 26 ) -> None: 27 self._table = arrow_table 28 self._change_type_col = change_type_col 29 30 # -- Accessors ------------------------------------------------------------- 31 32 def to_arrow(self) -> pa.Table: 33 """Return the raw Arrow table.""" 34 return self._table 35 36 def to_pandas(self) -> Any: 37 """Return as pandas DataFrame. 38 39 Requires pandas to be installed separately. 40 """ 41 if importlib.util.find_spec("pandas") is None: 42 raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas") 43 return self._table.to_pandas() 44 45 @property 46 def num_rows(self) -> int: 47 """Number of rows in the result.""" 48 result: int = self._table.num_rows 49 return result 50 51 @property 52 def column_names(self) -> list[str]: 53 """Column names in the result.""" 54 result: list[str] = self._table.column_names 55 return result 56 57 # -- Change type filtering ------------------------------------------------- 58 59 def _require_change_type(self) -> str: 60 if self._change_type_col is None: 61 raise ValueError( 62 "This ChangeSet has no change_type column. " 63 "Change type filtering is only available on table_changes() results." 64 ) 65 return self._change_type_col 66 67 def _filter_by_type(self, change_type: str) -> pa.Table: 68 col = self._require_change_type() 69 mask = pc.equal(self._table.column(col), pa.scalar(change_type)) 70 return self._table.filter(mask) 71 72 def inserts(self) -> pa.Table: 73 """Return only inserted rows (change_type = 'insert').""" 74 return self._filter_by_type("insert") 75 76 def deletes(self) -> pa.Table: 77 """Return only deleted rows (change_type = 'delete').""" 78 return self._filter_by_type("delete") 79 80 def update_preimages(self) -> pa.Table: 81 """Return pre-update row images (change_type = 'update_preimage').""" 82 return self._filter_by_type("update_preimage") 83 84 def update_postimages(self) -> pa.Table: 85 """Return post-update row images (change_type = 'update_postimage').""" 86 return self._filter_by_type("update_postimage") 87 88 def updates(self) -> list[tuple[dict[str, Any], dict[str, Any]]]: 89 """Return paired (old_row, new_row) for each update. 90 91 Correlates update_preimage and update_postimage rows by rowid. 92 Returns list of (preimage_dict, postimage_dict) tuples. 93 """ 94 pre = self.update_preimages() 95 post = self.update_postimages() 96 97 pre_by_rowid: dict[Any, dict[str, Any]] = {} 98 for row in pre.to_pylist(): 99 pre_by_rowid[row.get("rowid")] = row 100 101 pairs: list[tuple[dict[str, Any], dict[str, Any]]] = [] 102 for row in post.to_pylist(): 103 rid = row.get("rowid") 104 if rid in pre_by_rowid: 105 pairs.append((pre_by_rowid[rid], row)) 106 107 return pairs 108 109 def has_updates(self) -> bool: 110 """Whether there are any update changes.""" 111 col = self._require_change_type() 112 types = self._table.column(col).to_pylist() 113 return "update_preimage" in types or "update_postimage" in types 114 115 def summary(self) -> dict[str, int]: 116 """Return count of each change type. 117 118 Example: {"insert": 5, "delete": 2, "update_preimage": 1, "update_postimage": 1} 119 """ 120 col = self._require_change_type() 121 counts: dict[str, int] = {} 122 for ct in self._table.column(col).to_pylist(): 123 key = str(ct) 124 counts[key] = counts.get(key, 0) + 1 125 return counts 126 127 def __repr__(self) -> str: 128 return f"ChangeSet(num_rows={self.num_rows}, columns={self.column_names})"
Result of a CDC query with helpers for analyzing changes.
36 def to_pandas(self) -> Any: 37 """Return as pandas DataFrame. 38 39 Requires pandas to be installed separately. 40 """ 41 if importlib.util.find_spec("pandas") is None: 42 raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas") 43 return self._table.to_pandas()
Return as pandas DataFrame.
Requires pandas to be installed separately.
45 @property 46 def num_rows(self) -> int: 47 """Number of rows in the result.""" 48 result: int = self._table.num_rows 49 return result
Number of rows in the result.
51 @property 52 def column_names(self) -> list[str]: 53 """Column names in the result.""" 54 result: list[str] = self._table.column_names 55 return result
Column names in the result.
72 def inserts(self) -> pa.Table: 73 """Return only inserted rows (change_type = 'insert').""" 74 return self._filter_by_type("insert")
Return only inserted rows (change_type = 'insert').
76 def deletes(self) -> pa.Table: 77 """Return only deleted rows (change_type = 'delete').""" 78 return self._filter_by_type("delete")
Return only deleted rows (change_type = 'delete').
80 def update_preimages(self) -> pa.Table: 81 """Return pre-update row images (change_type = 'update_preimage').""" 82 return self._filter_by_type("update_preimage")
Return pre-update row images (change_type = 'update_preimage').
84 def update_postimages(self) -> pa.Table: 85 """Return post-update row images (change_type = 'update_postimage').""" 86 return self._filter_by_type("update_postimage")
Return post-update row images (change_type = 'update_postimage').
88 def updates(self) -> list[tuple[dict[str, Any], dict[str, Any]]]: 89 """Return paired (old_row, new_row) for each update. 90 91 Correlates update_preimage and update_postimage rows by rowid. 92 Returns list of (preimage_dict, postimage_dict) tuples. 93 """ 94 pre = self.update_preimages() 95 post = self.update_postimages() 96 97 pre_by_rowid: dict[Any, dict[str, Any]] = {} 98 for row in pre.to_pylist(): 99 pre_by_rowid[row.get("rowid")] = row 100 101 pairs: list[tuple[dict[str, Any], dict[str, Any]]] = [] 102 for row in post.to_pylist(): 103 rid = row.get("rowid") 104 if rid in pre_by_rowid: 105 pairs.append((pre_by_rowid[rid], row)) 106 107 return pairs
Return paired (old_row, new_row) for each update.
Correlates update_preimage and update_postimage rows by rowid. Returns list of (preimage_dict, postimage_dict) tuples.
109 def has_updates(self) -> bool: 110 """Whether there are any update changes.""" 111 col = self._require_change_type() 112 types = self._table.column(col).to_pylist() 113 return "update_preimage" in types or "update_postimage" in types
Whether there are any update changes.
115 def summary(self) -> dict[str, int]: 116 """Return count of each change type. 117 118 Example: {"insert": 5, "delete": 2, "update_preimage": 1, "update_postimage": 1} 119 """ 120 col = self._require_change_type() 121 counts: dict[str, int] = {} 122 for ct in self._table.column(col).to_pylist(): 123 key = str(ct) 124 counts[key] = counts.get(key, 0) + 1 125 return counts
Return count of each change type.
Example: {"insert": 5, "delete": 2, "update_preimage": 1, "update_postimage": 1}
Raised when a commit operation fails.
50class DataScan: 51 """Scans a Ducklake table with optional filtering, column selection, and time travel. 52 53 Immutable builder: each method returns a new DataScan instance. 54 """ 55 56 __slots__ = ("_table", "_row_filter", "_selected_fields", "_snapshot_id", "_timestamp", "_limit") 57 58 def __init__( 59 self, 60 table: Table | View, 61 row_filter: BooleanExpression = AlwaysTrue(), 62 selected_fields: tuple[str, ...] = ("*",), 63 snapshot_id: int | None = None, 64 limit: int | None = None, 65 timestamp: datetime | None = None, 66 ) -> None: 67 self._table = table 68 self._row_filter = row_filter 69 self._selected_fields = selected_fields 70 self._snapshot_id = snapshot_id 71 self._timestamp = timestamp 72 self._limit = limit 73 74 # -- Builder methods (return new DataScan) --------------------------------- 75 76 def filter(self, expr: BooleanExpression | str) -> DataScan: 77 """Add a row filter. 78 79 If string, wrap in a raw SQL expression. 80 If BooleanExpression, combine with existing filter via And. 81 """ 82 if isinstance(expr, str): 83 new_filter: BooleanExpression = RawSQL(expr) 84 else: 85 new_filter = expr 86 87 combined = And(self._row_filter, new_filter) 88 return DataScan( 89 table=self._table, 90 row_filter=combined, 91 selected_fields=self._selected_fields, 92 snapshot_id=self._snapshot_id, 93 limit=self._limit, 94 timestamp=self._timestamp, 95 ) 96 97 def select(self, *fields: str) -> DataScan: 98 """Select specific columns. Replaces any previous selection.""" 99 return DataScan( 100 table=self._table, 101 row_filter=self._row_filter, 102 selected_fields=fields, 103 snapshot_id=self._snapshot_id, 104 limit=self._limit, 105 timestamp=self._timestamp, 106 ) 107 108 def with_snapshot(self, snapshot_id: int) -> DataScan: 109 """Time travel to a specific snapshot version.""" 110 return DataScan( 111 table=self._table, 112 row_filter=self._row_filter, 113 selected_fields=self._selected_fields, 114 snapshot_id=snapshot_id, 115 limit=self._limit, 116 timestamp=None, 117 ) 118 119 def with_timestamp(self, timestamp: datetime) -> DataScan: 120 """Time travel to data as of a specific timestamp. 121 122 Uses AT (TIMESTAMP => ...) syntax. 123 """ 124 return DataScan( 125 table=self._table, 126 row_filter=self._row_filter, 127 selected_fields=self._selected_fields, 128 snapshot_id=None, 129 limit=self._limit, 130 timestamp=timestamp, 131 ) 132 133 def with_limit(self, limit: int) -> DataScan: 134 """Limit number of rows returned.""" 135 return DataScan( 136 table=self._table, 137 row_filter=self._row_filter, 138 selected_fields=self._selected_fields, 139 snapshot_id=self._snapshot_id, 140 limit=limit, 141 timestamp=self._timestamp, 142 ) 143 144 # -- Terminal methods ------------------------------------------------------ 145 146 def to_arrow(self) -> pa.Table: 147 """Execute scan and return PyArrow Table.""" 148 sql = self._build_sql() 149 result = self._table.catalog.connection.execute(sql) 150 return result.fetch_arrow_table() 151 152 def to_pandas(self) -> pd.DataFrame: 153 """Execute scan and return pandas DataFrame. 154 155 Requires pandas to be installed separately. 156 """ 157 if importlib.util.find_spec("pandas") is None: 158 raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas") 159 sql = self._build_sql() 160 result = self._table.catalog.connection.execute(sql) 161 df: pd.DataFrame = result.fetchdf() 162 return df 163 164 def to_duckdb(self, *, connection: duckdb.DuckDBPyConnection | None = None) -> duckdb.DuckDBPyRelation: 165 """Execute scan and return a DuckDB relation. 166 167 If connection is None, uses the catalog's connection. 168 """ 169 sql = self._build_sql() 170 conn = connection if connection is not None else self._table.catalog.connection 171 return conn.sql(sql) 172 173 def to_arrow_batch_reader(self) -> pa.RecordBatchReader: 174 """Execute scan and return a streaming Arrow RecordBatchReader.""" 175 sql = self._build_sql() 176 result = self._table.catalog.connection.execute(sql) 177 return result.fetch_record_batch() 178 179 def to_polars(self) -> pl.DataFrame: 180 """Execute scan and return a Polars DataFrame. 181 182 Converts via Arrow (to_arrow() then pl.from_arrow()). 183 Requires polars to be installed separately. 184 """ 185 try: 186 import polars as pl_mod 187 except ImportError: 188 raise ImportError("polars is required for to_polars(). Install it with: pip install polars") from None 189 arrow_table = self.to_arrow() 190 result = pl_mod.from_arrow(arrow_table) 191 assert isinstance(result, pl_mod.DataFrame) 192 return result 193 194 def to_ray(self) -> ray.data.Dataset: 195 """Execute scan and return a Ray Dataset. 196 197 Converts via Arrow. 198 Requires ray to be installed separately. 199 """ 200 try: 201 import ray.data # pyright: ignore[reportMissingImports] 202 except ImportError: 203 raise ImportError("ray is required for to_ray(). Install it with: pip install 'ray[data]'") from None 204 arrow_table = self.to_arrow() 205 dataset: ray.data.Dataset = ray.data.from_arrow(arrow_table) 206 return dataset 207 208 def to_arrow_dataset(self) -> ds.Dataset: 209 """Return scan results as a PyArrow Dataset. 210 211 Preserves any filters, projections, and time travel settings. 212 """ 213 import pyarrow.dataset as ds 214 215 return ds.dataset(self.to_arrow()) 216 217 def count(self) -> int: 218 """Return row count (executes SELECT COUNT(*)).""" 219 sql = self._build_count_sql() 220 rows = self._table.catalog.fetchall(sql) 221 return int(rows[0][0]) 222 223 # -- SQL generation -------------------------------------------------------- 224 225 def _build_sql(self) -> str: 226 """Build the SELECT SQL statement.""" 227 columns = self._format_columns() 228 table_ref = self._format_table_ref() 229 sql = f"SELECT {columns} FROM {table_ref}" 230 sql = self._append_where(sql) 231 sql = self._append_limit(sql) 232 return sql 233 234 def _build_count_sql(self) -> str: 235 """Build a SELECT COUNT(*) SQL statement.""" 236 table_ref = self._format_table_ref() 237 sql = f"SELECT COUNT(*) FROM {table_ref}" 238 sql = self._append_where(sql) 239 sql = self._append_limit(sql) 240 return sql 241 242 def _format_columns(self) -> str: 243 if self._selected_fields == ("*",): 244 return "*" 245 return ", ".join(f'"{col}"' for col in self._selected_fields) 246 247 def _format_table_ref(self) -> str: 248 fqn = self._table.fully_qualified_name 249 if self._snapshot_id is not None and self._timestamp is not None: 250 raise ValueError("Cannot set both snapshot_id and timestamp for time travel") 251 if self._snapshot_id is not None: 252 return f"{fqn} AT (VERSION => {self._snapshot_id})" 253 if self._timestamp is not None: 254 ts_str = self._timestamp.strftime("%Y-%m-%d %H:%M:%S.%f") 255 return f"{fqn} AT (TIMESTAMP => '{ts_str}')" 256 return fqn 257 258 def _append_where(self, sql: str) -> str: 259 if not _is_always_true(self._row_filter): 260 sql += f" WHERE {self._row_filter.to_sql()}" 261 return sql 262 263 def _append_limit(self, sql: str) -> str: 264 if self._limit is not None: 265 sql += f" LIMIT {self._limit}" 266 return sql
Scans a Ducklake table with optional filtering, column selection, and time travel.
Immutable builder: each method returns a new DataScan instance.
58 def __init__( 59 self, 60 table: Table | View, 61 row_filter: BooleanExpression = AlwaysTrue(), 62 selected_fields: tuple[str, ...] = ("*",), 63 snapshot_id: int | None = None, 64 limit: int | None = None, 65 timestamp: datetime | None = None, 66 ) -> None: 67 self._table = table 68 self._row_filter = row_filter 69 self._selected_fields = selected_fields 70 self._snapshot_id = snapshot_id 71 self._timestamp = timestamp 72 self._limit = limit
76 def filter(self, expr: BooleanExpression | str) -> DataScan: 77 """Add a row filter. 78 79 If string, wrap in a raw SQL expression. 80 If BooleanExpression, combine with existing filter via And. 81 """ 82 if isinstance(expr, str): 83 new_filter: BooleanExpression = RawSQL(expr) 84 else: 85 new_filter = expr 86 87 combined = And(self._row_filter, new_filter) 88 return DataScan( 89 table=self._table, 90 row_filter=combined, 91 selected_fields=self._selected_fields, 92 snapshot_id=self._snapshot_id, 93 limit=self._limit, 94 timestamp=self._timestamp, 95 )
Add a row filter.
If string, wrap in a raw SQL expression. If BooleanExpression, combine with existing filter via And.
97 def select(self, *fields: str) -> DataScan: 98 """Select specific columns. Replaces any previous selection.""" 99 return DataScan( 100 table=self._table, 101 row_filter=self._row_filter, 102 selected_fields=fields, 103 snapshot_id=self._snapshot_id, 104 limit=self._limit, 105 timestamp=self._timestamp, 106 )
Select specific columns. Replaces any previous selection.
108 def with_snapshot(self, snapshot_id: int) -> DataScan: 109 """Time travel to a specific snapshot version.""" 110 return DataScan( 111 table=self._table, 112 row_filter=self._row_filter, 113 selected_fields=self._selected_fields, 114 snapshot_id=snapshot_id, 115 limit=self._limit, 116 timestamp=None, 117 )
Time travel to a specific snapshot version.
119 def with_timestamp(self, timestamp: datetime) -> DataScan: 120 """Time travel to data as of a specific timestamp. 121 122 Uses AT (TIMESTAMP => ...) syntax. 123 """ 124 return DataScan( 125 table=self._table, 126 row_filter=self._row_filter, 127 selected_fields=self._selected_fields, 128 snapshot_id=None, 129 limit=self._limit, 130 timestamp=timestamp, 131 )
Time travel to data as of a specific timestamp.
Uses AT (TIMESTAMP => ...) syntax.
133 def with_limit(self, limit: int) -> DataScan: 134 """Limit number of rows returned.""" 135 return DataScan( 136 table=self._table, 137 row_filter=self._row_filter, 138 selected_fields=self._selected_fields, 139 snapshot_id=self._snapshot_id, 140 limit=limit, 141 timestamp=self._timestamp, 142 )
Limit number of rows returned.
146 def to_arrow(self) -> pa.Table: 147 """Execute scan and return PyArrow Table.""" 148 sql = self._build_sql() 149 result = self._table.catalog.connection.execute(sql) 150 return result.fetch_arrow_table()
Execute scan and return PyArrow Table.
152 def to_pandas(self) -> pd.DataFrame: 153 """Execute scan and return pandas DataFrame. 154 155 Requires pandas to be installed separately. 156 """ 157 if importlib.util.find_spec("pandas") is None: 158 raise ImportError("pandas is required for to_pandas(). Install it with: pip install pandas") 159 sql = self._build_sql() 160 result = self._table.catalog.connection.execute(sql) 161 df: pd.DataFrame = result.fetchdf() 162 return df
Execute scan and return pandas DataFrame.
Requires pandas to be installed separately.
164 def to_duckdb(self, *, connection: duckdb.DuckDBPyConnection | None = None) -> duckdb.DuckDBPyRelation: 165 """Execute scan and return a DuckDB relation. 166 167 If connection is None, uses the catalog's connection. 168 """ 169 sql = self._build_sql() 170 conn = connection if connection is not None else self._table.catalog.connection 171 return conn.sql(sql)
Execute scan and return a DuckDB relation.
If connection is None, uses the catalog's connection.
173 def to_arrow_batch_reader(self) -> pa.RecordBatchReader: 174 """Execute scan and return a streaming Arrow RecordBatchReader.""" 175 sql = self._build_sql() 176 result = self._table.catalog.connection.execute(sql) 177 return result.fetch_record_batch()
Execute scan and return a streaming Arrow RecordBatchReader.
179 def to_polars(self) -> pl.DataFrame: 180 """Execute scan and return a Polars DataFrame. 181 182 Converts via Arrow (to_arrow() then pl.from_arrow()). 183 Requires polars to be installed separately. 184 """ 185 try: 186 import polars as pl_mod 187 except ImportError: 188 raise ImportError("polars is required for to_polars(). Install it with: pip install polars") from None 189 arrow_table = self.to_arrow() 190 result = pl_mod.from_arrow(arrow_table) 191 assert isinstance(result, pl_mod.DataFrame) 192 return result
Execute scan and return a Polars DataFrame.
Converts via Arrow (to_arrow() then pl.from_arrow()). Requires polars to be installed separately.
194 def to_ray(self) -> ray.data.Dataset: 195 """Execute scan and return a Ray Dataset. 196 197 Converts via Arrow. 198 Requires ray to be installed separately. 199 """ 200 try: 201 import ray.data # pyright: ignore[reportMissingImports] 202 except ImportError: 203 raise ImportError("ray is required for to_ray(). Install it with: pip install 'ray[data]'") from None 204 arrow_table = self.to_arrow() 205 dataset: ray.data.Dataset = ray.data.from_arrow(arrow_table) 206 return dataset
Execute scan and return a Ray Dataset.
Converts via Arrow. Requires ray to be installed separately.
208 def to_arrow_dataset(self) -> ds.Dataset: 209 """Return scan results as a PyArrow Dataset. 210 211 Preserves any filters, projections, and time travel settings. 212 """ 213 import pyarrow.dataset as ds 214 215 return ds.dataset(self.to_arrow())
Return scan results as a PyArrow Dataset.
Preserves any filters, projections, and time travel settings.
Base exception for all Ducklake errors.
217@dataclass(frozen=True) 218class EqualTo(BooleanExpression): 219 """Column equals value.""" 220 221 term: str 222 value: Any 223 224 def to_sql(self) -> str: 225 return f"{_quote_column(self.term)} = {_format_value(self.value)}" 226 227 def __repr__(self) -> str: 228 return f"EqualTo(term={self.term!r}, value={self.value!r})"
Column equals value.
245@dataclass(frozen=True) 246class GreaterThan(BooleanExpression): 247 """Column greater than value.""" 248 249 term: str 250 value: Any 251 252 def to_sql(self) -> str: 253 return f"{_quote_column(self.term)} > {_format_value(self.value)}" 254 255 def __repr__(self) -> str: 256 return f"GreaterThan(term={self.term!r}, value={self.value!r})"
Column greater than value.
259@dataclass(frozen=True) 260class GreaterThanOrEqual(BooleanExpression): 261 """Column greater than or equal to value.""" 262 263 term: str 264 value: Any 265 266 def to_sql(self) -> str: 267 return f"{_quote_column(self.term)} >= {_format_value(self.value)}" 268 269 def __repr__(self) -> str: 270 return f"GreaterThanOrEqual(term={self.term!r}, value={self.value!r})"
Column greater than or equal to value.
304@dataclass(frozen=True) 305class In(BooleanExpression): 306 """Column value in a set of values.""" 307 308 term: str 309 values: tuple[Any, ...] 310 311 def __new__(cls, term: str, values: tuple[Any, ...]) -> BooleanExpression: # type: ignore[misc] 312 if not values: 313 return AlwaysFalse() 314 instance = super().__new__(cls) 315 return instance 316 317 def to_sql(self) -> str: 318 vals = ", ".join(_format_value(v) for v in self.values) 319 return f"{_quote_column(self.term)} IN ({vals})" 320 321 def __repr__(self) -> str: 322 return f"In(term={self.term!r}, values={self.values!r})"
Column value in a set of values.
31class InspectTable: 32 """Metadata introspection for a Ducklake table. 33 34 Obtained via ``table.inspect()``. 35 """ 36 37 def __init__(self, table: Table) -> None: 38 self._table = table 39 40 def snapshots(self) -> pa.Table: 41 """Return all snapshots as an Arrow table. 42 43 Columns: snapshot_id, snapshot_time, schema_version, changes, 44 author, commit_message, commit_extra_info 45 """ 46 catalog_name = self._table.catalog.name 47 result = self._table.catalog.connection.execute( 48 f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id' 49 ) 50 return _to_arrow_table(result) 51 52 def files( 53 self, 54 snapshot_id: int | None = None, 55 snapshot_time: str | None = None, 56 ) -> pa.Table: 57 """Return data files for the table, optionally at a specific snapshot. 58 59 Columns returned: data_file, data_file_size_bytes, 60 data_file_footer_size, data_file_encryption_key, delete_file, 61 delete_file_size_bytes, delete_file_footer_size, 62 delete_file_encryption_key. 63 64 Args: 65 snapshot_id: Fetch files at a specific snapshot version. 66 snapshot_time: Fetch files at a specific timestamp 67 (e.g. ``'2025-06-16 15:24:30'``). 68 69 Raises: 70 ValueError: If both ``snapshot_id`` and ``snapshot_time`` 71 are provided. 72 73 Uses ``ducklake_list_files()`` function. 74 """ 75 if snapshot_id is not None and snapshot_time is not None: 76 raise ValueError("Cannot specify both snapshot_id and snapshot_time") 77 78 from pyducklake.catalog import escape_string_literal 79 80 catalog_name = self._table.catalog.name 81 table_name = self._table.name 82 schema_name = self._table.namespace 83 sql = ( 84 f"SELECT * FROM ducklake_list_files('{escape_string_literal(catalog_name)}', " 85 f"'{escape_string_literal(table_name)}', schema := '{escape_string_literal(schema_name)}'" 86 ) 87 if snapshot_id is not None: 88 sql += f", snapshot_version := {snapshot_id}::BIGINT" 89 if snapshot_time is not None: 90 sql += f", snapshot_time := '{escape_string_literal(snapshot_time)}'" 91 sql += ")" 92 result = self._table.catalog.connection.execute(sql) 93 return _to_arrow_table(result) 94 95 def history(self) -> pa.Table: 96 """Return snapshot history as an Arrow table, newest-first.""" 97 catalog_name = self._table.catalog.name 98 result = self._table.catalog.connection.execute( 99 f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id DESC' 100 ) 101 return _to_arrow_table(result) 102 103 def partitions(self) -> pa.Table: 104 """Return partition info as an Arrow table. 105 106 Columns: partition_id, column_id, transform 107 If no partitioning, returns an empty table. 108 """ 109 from pyducklake.catalog import escape_string_literal 110 111 catalog_name = self._table.catalog.name 112 meta_schema = f"__ducklake_metadata_{catalog_name}" 113 114 # Get table_id first 115 esc_name = escape_string_literal(self._table.name) 116 esc_ns = escape_string_literal(self._table.namespace) 117 table_rows = self._table.catalog.fetchall( 118 f'SELECT t.table_id FROM "{meta_schema}".ducklake_table t' 119 f' JOIN "{meta_schema}".ducklake_schema s ON t.schema_id = s.schema_id' 120 f" WHERE t.table_name = '{esc_name}'" 121 f" AND s.schema_name = '{esc_ns}'" 122 ) 123 if not table_rows: 124 return pa.table( 125 { 126 "partition_id": pa.array([], type=pa.int64()), 127 "column_id": pa.array([], type=pa.int64()), 128 "transform": pa.array([], type=pa.string()), 129 } 130 ) 131 132 table_id = table_rows[0][0] 133 134 try: 135 rows = self._table.catalog.fetchall( 136 f"SELECT pc.partition_id, pc.column_id, pc.transform " 137 f'FROM "{meta_schema}".ducklake_partition_column pc ' 138 f'JOIN "{meta_schema}".ducklake_partition_info pi ' 139 f"ON pc.partition_id = pi.partition_id " 140 f"AND pc.table_id = pi.table_id " 141 f"WHERE pc.table_id = {table_id} " 142 f"AND pi.end_snapshot IS NULL" 143 ) 144 except (duckdb.CatalogException, duckdb.BinderException): 145 rows = [] 146 147 if not rows: 148 return pa.table( 149 { 150 "partition_id": pa.array([], type=pa.int64()), 151 "column_id": pa.array([], type=pa.int64()), 152 "transform": pa.array([], type=pa.string()), 153 } 154 ) 155 156 return pa.table( 157 { 158 "partition_id": pa.array([r[0] for r in rows], type=pa.int64()), 159 "column_id": pa.array([r[1] for r in rows], type=pa.int64()), 160 "transform": pa.array([str(r[2]) for r in rows], type=pa.string()), 161 } 162 )
Metadata introspection for a Ducklake table.
Obtained via table.inspect().
40 def snapshots(self) -> pa.Table: 41 """Return all snapshots as an Arrow table. 42 43 Columns: snapshot_id, snapshot_time, schema_version, changes, 44 author, commit_message, commit_extra_info 45 """ 46 catalog_name = self._table.catalog.name 47 result = self._table.catalog.connection.execute( 48 f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id' 49 ) 50 return _to_arrow_table(result)
Return all snapshots as an Arrow table.
Columns: snapshot_id, snapshot_time, schema_version, changes, author, commit_message, commit_extra_info
52 def files( 53 self, 54 snapshot_id: int | None = None, 55 snapshot_time: str | None = None, 56 ) -> pa.Table: 57 """Return data files for the table, optionally at a specific snapshot. 58 59 Columns returned: data_file, data_file_size_bytes, 60 data_file_footer_size, data_file_encryption_key, delete_file, 61 delete_file_size_bytes, delete_file_footer_size, 62 delete_file_encryption_key. 63 64 Args: 65 snapshot_id: Fetch files at a specific snapshot version. 66 snapshot_time: Fetch files at a specific timestamp 67 (e.g. ``'2025-06-16 15:24:30'``). 68 69 Raises: 70 ValueError: If both ``snapshot_id`` and ``snapshot_time`` 71 are provided. 72 73 Uses ``ducklake_list_files()`` function. 74 """ 75 if snapshot_id is not None and snapshot_time is not None: 76 raise ValueError("Cannot specify both snapshot_id and snapshot_time") 77 78 from pyducklake.catalog import escape_string_literal 79 80 catalog_name = self._table.catalog.name 81 table_name = self._table.name 82 schema_name = self._table.namespace 83 sql = ( 84 f"SELECT * FROM ducklake_list_files('{escape_string_literal(catalog_name)}', " 85 f"'{escape_string_literal(table_name)}', schema := '{escape_string_literal(schema_name)}'" 86 ) 87 if snapshot_id is not None: 88 sql += f", snapshot_version := {snapshot_id}::BIGINT" 89 if snapshot_time is not None: 90 sql += f", snapshot_time := '{escape_string_literal(snapshot_time)}'" 91 sql += ")" 92 result = self._table.catalog.connection.execute(sql) 93 return _to_arrow_table(result)
Return data files for the table, optionally at a specific snapshot.
Columns returned: data_file, data_file_size_bytes, data_file_footer_size, data_file_encryption_key, delete_file, delete_file_size_bytes, delete_file_footer_size, delete_file_encryption_key.
Args:
snapshot_id: Fetch files at a specific snapshot version.
snapshot_time: Fetch files at a specific timestamp
(e.g. '2025-06-16 15:24:30').
Raises:
ValueError: If both snapshot_id and snapshot_time
are provided.
Uses ducklake_list_files() function.
95 def history(self) -> pa.Table: 96 """Return snapshot history as an Arrow table, newest-first.""" 97 catalog_name = self._table.catalog.name 98 result = self._table.catalog.connection.execute( 99 f'SELECT * FROM "{catalog_name}".snapshots() ORDER BY snapshot_id DESC' 100 ) 101 return _to_arrow_table(result)
Return snapshot history as an Arrow table, newest-first.
103 def partitions(self) -> pa.Table: 104 """Return partition info as an Arrow table. 105 106 Columns: partition_id, column_id, transform 107 If no partitioning, returns an empty table. 108 """ 109 from pyducklake.catalog import escape_string_literal 110 111 catalog_name = self._table.catalog.name 112 meta_schema = f"__ducklake_metadata_{catalog_name}" 113 114 # Get table_id first 115 esc_name = escape_string_literal(self._table.name) 116 esc_ns = escape_string_literal(self._table.namespace) 117 table_rows = self._table.catalog.fetchall( 118 f'SELECT t.table_id FROM "{meta_schema}".ducklake_table t' 119 f' JOIN "{meta_schema}".ducklake_schema s ON t.schema_id = s.schema_id' 120 f" WHERE t.table_name = '{esc_name}'" 121 f" AND s.schema_name = '{esc_ns}'" 122 ) 123 if not table_rows: 124 return pa.table( 125 { 126 "partition_id": pa.array([], type=pa.int64()), 127 "column_id": pa.array([], type=pa.int64()), 128 "transform": pa.array([], type=pa.string()), 129 } 130 ) 131 132 table_id = table_rows[0][0] 133 134 try: 135 rows = self._table.catalog.fetchall( 136 f"SELECT pc.partition_id, pc.column_id, pc.transform " 137 f'FROM "{meta_schema}".ducklake_partition_column pc ' 138 f'JOIN "{meta_schema}".ducklake_partition_info pi ' 139 f"ON pc.partition_id = pi.partition_id " 140 f"AND pc.table_id = pi.table_id " 141 f"WHERE pc.table_id = {table_id} " 142 f"AND pi.end_snapshot IS NULL" 143 ) 144 except (duckdb.CatalogException, duckdb.BinderException): 145 rows = [] 146 147 if not rows: 148 return pa.table( 149 { 150 "partition_id": pa.array([], type=pa.int64()), 151 "column_id": pa.array([], type=pa.int64()), 152 "transform": pa.array([], type=pa.string()), 153 } 154 ) 155 156 return pa.table( 157 { 158 "partition_id": pa.array([r[0] for r in rows], type=pa.int64()), 159 "column_id": pa.array([r[1] for r in rows], type=pa.int64()), 160 "transform": pa.array([str(r[2]) for r in rows], type=pa.string()), 161 } 162 )
Return partition info as an Arrow table.
Columns: partition_id, column_id, transform If no partitioning, returns an empty table.
23class MaintenanceTable: 24 """Table maintenance operations. Obtained via ``table.maintenance()``.""" 25 26 def __init__(self, table: Table) -> None: 27 self._table = table 28 29 def compact( 30 self, 31 *, 32 min_file_size: int | None = None, 33 max_file_size: int | None = None, 34 max_compacted_files: int | None = None, 35 ) -> None: 36 """Merge small files into larger ones. 37 38 Operates at the catalog level (ducklake does not support per-table compaction). 39 """ 40 from pyducklake.catalog import escape_string_literal 41 42 catalog_name = self._table.catalog.name 43 conn = self._table.catalog.connection 44 45 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 46 if min_file_size is not None: 47 params.append(f"min_file_size := {min_file_size}::UBIGINT") 48 if max_file_size is not None: 49 params.append(f"max_file_size := {max_file_size}::UBIGINT") 50 if max_compacted_files is not None: 51 params.append(f"max_compacted_files := {max_compacted_files}::UBIGINT") 52 53 sql = f"CALL ducklake_merge_adjacent_files({', '.join(params)})" 54 conn.execute(sql) 55 56 def rewrite_data_files( 57 self, 58 *, 59 delete_threshold: float | None = None, 60 ) -> None: 61 """Rewrite files with excessive deletions. 62 63 Operates at the catalog level. 64 """ 65 from pyducklake.catalog import escape_string_literal 66 67 catalog_name = self._table.catalog.name 68 conn = self._table.catalog.connection 69 70 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 71 if delete_threshold is not None: 72 params.append(f"delete_threshold := {delete_threshold}") 73 74 sql = f"CALL ducklake_rewrite_data_files({', '.join(params)})" 75 conn.execute(sql) 76 77 def expire_snapshots( 78 self, 79 *, 80 older_than: str | None = None, 81 versions: int | None = None, 82 dry_run: bool = False, 83 ) -> None: 84 """Remove old snapshots. 85 86 Args: 87 older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``). 88 versions: Number of versions to keep. 89 dry_run: If True, report what would be expired without acting. 90 """ 91 from pyducklake.catalog import escape_string_literal 92 93 catalog_name = self._table.catalog.name 94 conn = self._table.catalog.connection 95 96 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 97 if older_than is not None: 98 validate_older_than(older_than) 99 params.append(f"older_than := '{escape_string_literal(older_than)}'") 100 if versions is not None: 101 params.append(f"versions := [{versions}::UBIGINT]") 102 if dry_run: 103 params.append("dry_run := true") 104 105 sql = f"CALL ducklake_expire_snapshots({', '.join(params)})" 106 conn.execute(sql) 107 108 def cleanup_files( 109 self, 110 *, 111 older_than: str | None = None, 112 dry_run: bool = False, 113 ) -> None: 114 """Delete files scheduled for removal. 115 116 Args: 117 older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``). 118 dry_run: If True, report what would be cleaned without acting. 119 """ 120 from pyducklake.catalog import escape_string_literal 121 122 catalog_name = self._table.catalog.name 123 conn = self._table.catalog.connection 124 125 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 126 if older_than is not None: 127 validate_older_than(older_than) 128 params.append(f"older_than := '{escape_string_literal(older_than)}'") 129 if dry_run: 130 params.append("dry_run := true") 131 132 sql = f"CALL ducklake_cleanup_old_files({', '.join(params)})" 133 conn.execute(sql) 134 135 def delete_orphaned_files(self, *, dry_run: bool = False) -> None: 136 """Remove untracked files from storage.""" 137 from pyducklake.catalog import escape_string_literal 138 139 catalog_name = self._table.catalog.name 140 conn = self._table.catalog.connection 141 142 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 143 if dry_run: 144 params.append("dry_run := true") 145 146 sql = f"CALL ducklake_delete_orphaned_files({', '.join(params)})" 147 conn.execute(sql) 148 149 def checkpoint(self) -> None: 150 """Run all maintenance operations sequentially.""" 151 catalog_name = self._table.catalog.name 152 conn = self._table.catalog.connection 153 conn.execute(f'CHECKPOINT "{catalog_name}"')
Table maintenance operations. Obtained via table.maintenance().
29 def compact( 30 self, 31 *, 32 min_file_size: int | None = None, 33 max_file_size: int | None = None, 34 max_compacted_files: int | None = None, 35 ) -> None: 36 """Merge small files into larger ones. 37 38 Operates at the catalog level (ducklake does not support per-table compaction). 39 """ 40 from pyducklake.catalog import escape_string_literal 41 42 catalog_name = self._table.catalog.name 43 conn = self._table.catalog.connection 44 45 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 46 if min_file_size is not None: 47 params.append(f"min_file_size := {min_file_size}::UBIGINT") 48 if max_file_size is not None: 49 params.append(f"max_file_size := {max_file_size}::UBIGINT") 50 if max_compacted_files is not None: 51 params.append(f"max_compacted_files := {max_compacted_files}::UBIGINT") 52 53 sql = f"CALL ducklake_merge_adjacent_files({', '.join(params)})" 54 conn.execute(sql)
Merge small files into larger ones.
Operates at the catalog level (ducklake does not support per-table compaction).
56 def rewrite_data_files( 57 self, 58 *, 59 delete_threshold: float | None = None, 60 ) -> None: 61 """Rewrite files with excessive deletions. 62 63 Operates at the catalog level. 64 """ 65 from pyducklake.catalog import escape_string_literal 66 67 catalog_name = self._table.catalog.name 68 conn = self._table.catalog.connection 69 70 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 71 if delete_threshold is not None: 72 params.append(f"delete_threshold := {delete_threshold}") 73 74 sql = f"CALL ducklake_rewrite_data_files({', '.join(params)})" 75 conn.execute(sql)
Rewrite files with excessive deletions.
Operates at the catalog level.
77 def expire_snapshots( 78 self, 79 *, 80 older_than: str | None = None, 81 versions: int | None = None, 82 dry_run: bool = False, 83 ) -> None: 84 """Remove old snapshots. 85 86 Args: 87 older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``). 88 versions: Number of versions to keep. 89 dry_run: If True, report what would be expired without acting. 90 """ 91 from pyducklake.catalog import escape_string_literal 92 93 catalog_name = self._table.catalog.name 94 conn = self._table.catalog.connection 95 96 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 97 if older_than is not None: 98 validate_older_than(older_than) 99 params.append(f"older_than := '{escape_string_literal(older_than)}'") 100 if versions is not None: 101 params.append(f"versions := [{versions}::UBIGINT]") 102 if dry_run: 103 params.append("dry_run := true") 104 105 sql = f"CALL ducklake_expire_snapshots({', '.join(params)})" 106 conn.execute(sql)
Remove old snapshots.
Args:
older_than: Timestamp string ('YYYY-MM-DD HH:MM:SS').
versions: Number of versions to keep.
dry_run: If True, report what would be expired without acting.
108 def cleanup_files( 109 self, 110 *, 111 older_than: str | None = None, 112 dry_run: bool = False, 113 ) -> None: 114 """Delete files scheduled for removal. 115 116 Args: 117 older_than: Timestamp string (``'YYYY-MM-DD HH:MM:SS'``). 118 dry_run: If True, report what would be cleaned without acting. 119 """ 120 from pyducklake.catalog import escape_string_literal 121 122 catalog_name = self._table.catalog.name 123 conn = self._table.catalog.connection 124 125 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 126 if older_than is not None: 127 validate_older_than(older_than) 128 params.append(f"older_than := '{escape_string_literal(older_than)}'") 129 if dry_run: 130 params.append("dry_run := true") 131 132 sql = f"CALL ducklake_cleanup_old_files({', '.join(params)})" 133 conn.execute(sql)
Delete files scheduled for removal.
Args:
older_than: Timestamp string ('YYYY-MM-DD HH:MM:SS').
dry_run: If True, report what would be cleaned without acting.
135 def delete_orphaned_files(self, *, dry_run: bool = False) -> None: 136 """Remove untracked files from storage.""" 137 from pyducklake.catalog import escape_string_literal 138 139 catalog_name = self._table.catalog.name 140 conn = self._table.catalog.connection 141 142 params: list[str] = [f"'{escape_string_literal(catalog_name)}'"] 143 if dry_run: 144 params.append("dry_run := true") 145 146 sql = f"CALL ducklake_delete_orphaned_files({', '.join(params)})" 147 conn.execute(sql)
Remove untracked files from storage.
375@dataclass(frozen=True) 376class IsNaN(BooleanExpression): 377 """Column is NaN.""" 378 379 term: str 380 381 def to_sql(self) -> str: 382 return f"isnan({_quote_column(self.term)})" 383 384 def __repr__(self) -> str: 385 return f"IsNaN(term={self.term!r})"
Column is NaN.
349@dataclass(frozen=True) 350class IsNull(BooleanExpression): 351 """Column is NULL.""" 352 353 term: str 354 355 def to_sql(self) -> str: 356 return f"{_quote_column(self.term)} IS NULL" 357 358 def __repr__(self) -> str: 359 return f"IsNull(term={self.term!r})"
Column is NULL.
273@dataclass(frozen=True) 274class LessThan(BooleanExpression): 275 """Column less than value.""" 276 277 term: str 278 value: Any 279 280 def to_sql(self) -> str: 281 return f"{_quote_column(self.term)} < {_format_value(self.value)}" 282 283 def __repr__(self) -> str: 284 return f"LessThan(term={self.term!r}, value={self.value!r})"
Column less than value.
287@dataclass(frozen=True) 288class LessThanOrEqual(BooleanExpression): 289 """Column less than or equal to value.""" 290 291 term: str 292 value: Any 293 294 def to_sql(self) -> str: 295 return f"{_quote_column(self.term)} <= {_format_value(self.value)}" 296 297 def __repr__(self) -> str: 298 return f"LessThanOrEqual(term={self.term!r}, value={self.value!r})"
Column less than or equal to value.
35class NamespaceAlreadyExistsError(DucklakeError): 36 """Raised when attempting to create a namespace that already exists."""
Raised when attempting to create a namespace that already exists.
39class NamespaceNotEmptyError(DucklakeError): 40 """Raised when attempting to drop a non-empty namespace."""
Raised when attempting to drop a non-empty namespace.
Raised when a namespace does not exist.
Raised when a table does not exist.
Raised when a view does not exist.
143@dataclass(frozen=True) 144class Not(BooleanExpression): 145 """Logical NOT.""" 146 147 child: BooleanExpression 148 149 def __new__(cls, child: BooleanExpression) -> BooleanExpression: # type: ignore[misc] 150 if isinstance(child, _AlwaysTrue): 151 return AlwaysFalse() 152 if isinstance(child, _AlwaysFalse): 153 return AlwaysTrue() 154 if isinstance(child, Not): 155 return child.child 156 instance = super().__new__(cls) 157 return instance 158 159 def to_sql(self) -> str: 160 return f"(NOT {self.child.to_sql()})" 161 162 def __repr__(self) -> str: 163 return f"Not(child={self.child!r})"
Logical NOT.
231@dataclass(frozen=True) 232class NotEqualTo(BooleanExpression): 233 """Column not equals value.""" 234 235 term: str 236 value: Any 237 238 def to_sql(self) -> str: 239 return f"{_quote_column(self.term)} != {_format_value(self.value)}" 240 241 def __repr__(self) -> str: 242 return f"NotEqualTo(term={self.term!r}, value={self.value!r})"
Column not equals value.
325@dataclass(frozen=True) 326class NotIn(BooleanExpression): 327 """Column value not in a set of values.""" 328 329 term: str 330 values: tuple[Any, ...] 331 332 def __new__(cls, term: str, values: tuple[Any, ...]) -> BooleanExpression: # type: ignore[misc] 333 if not values: 334 return AlwaysTrue() 335 instance = super().__new__(cls) 336 return instance 337 338 def to_sql(self) -> str: 339 vals = ", ".join(_format_value(v) for v in self.values) 340 return f"{_quote_column(self.term)} NOT IN ({vals})" 341 342 def __repr__(self) -> str: 343 return f"NotIn(term={self.term!r}, values={self.values!r})"
Column value not in a set of values.
388@dataclass(frozen=True) 389class NotNaN(BooleanExpression): 390 """Column is not NaN.""" 391 392 term: str 393 394 def to_sql(self) -> str: 395 return f"NOT isnan({_quote_column(self.term)})" 396 397 def __repr__(self) -> str: 398 return f"NotNaN(term={self.term!r})"
Column is not NaN.
362@dataclass(frozen=True) 363class NotNull(BooleanExpression): 364 """Column is not NULL.""" 365 366 term: str 367 368 def to_sql(self) -> str: 369 return f"{_quote_column(self.term)} IS NOT NULL" 370 371 def __repr__(self) -> str: 372 return f"NotNull(term={self.term!r})"
Column is not NULL.
190@dataclass(frozen=True) 191class Or(BooleanExpression): 192 """Logical OR with short-circuit simplification.""" 193 194 left: BooleanExpression 195 right: BooleanExpression 196 197 def __new__(cls, left: BooleanExpression, right: BooleanExpression) -> BooleanExpression: # type: ignore[misc] 198 if isinstance(left, _AlwaysTrue) or isinstance(right, _AlwaysTrue): 199 return AlwaysTrue() 200 if isinstance(left, _AlwaysFalse): 201 return right 202 if isinstance(right, _AlwaysFalse): 203 return left 204 instance = super().__new__(cls) 205 return instance 206 207 def to_sql(self) -> str: 208 return f"({self.left.to_sql()} OR {self.right.to_sql()})" 209 210 def __repr__(self) -> str: 211 return f"Or(left={self.left!r}, right={self.right!r})"
Logical OR with short-circuit simplification.
133@dataclass(frozen=True) 134class Reference: 135 """Column reference.""" 136 137 name: str 138 139 def __repr__(self) -> str: 140 return f"Reference('{self.name}')"
Column reference.
74class Schema: 75 """Represents a Ducklake table schema.""" 76 77 __slots__ = ("_fields", "_schema_id", "_name_to_field", "_id_to_field") 78 79 def __init__(self, *fields: NestedField, schema_id: int = 0) -> None: 80 # Validate uniqueness 81 seen_names: set[str] = set() 82 seen_ids: set[int] = set() 83 name_index: dict[str, NestedField] = {} 84 id_index: dict[int, NestedField] = {} 85 86 for f in fields: 87 if f.name in seen_names: 88 raise ValueError(f"Duplicate field name: {f.name!r}") 89 if f.field_id in seen_ids: 90 raise ValueError(f"Duplicate field_id: {f.field_id}") 91 seen_names.add(f.name) 92 seen_ids.add(f.field_id) 93 name_index[f.name] = f 94 id_index[f.field_id] = f 95 96 self._fields = fields 97 self._schema_id = schema_id 98 self._name_to_field = name_index 99 self._id_to_field = id_index 100 101 # -- Alternate constructors ---------------------------------------------- 102 103 @classmethod 104 def of(cls, *args: NestedField | dict[str, DucklakeType]) -> Schema: 105 """Create a Schema with auto-assigned field IDs. 106 107 Accepts either :class:`NestedField` objects (from :func:`required` / 108 :func:`optional`) or a single *dict* mapping column names to types. 109 110 Examples: 111 Using ``required()`` and ``optional()`` helpers: 112 113 ```pycon 114 >>> from pyducklake import Schema, required, optional, IntegerType, StringType, DoubleType 115 >>> schema = Schema.of( 116 ... required("id", IntegerType()), 117 ... optional("name", StringType()), 118 ... optional("value", DoubleType()), 119 ... ) 120 >>> schema.column_names() 121 ['id', 'name', 'value'] 122 >>> schema.find_field("id").required 123 True 124 125 ``` 126 127 Using a dict (all fields optional by default): 128 129 ```pycon 130 >>> schema = Schema.of({"id": IntegerType(), "name": StringType()}) 131 >>> schema.column_names() 132 ['id', 'name'] 133 134 ``` 135 136 Args: 137 *args: Either NestedField objects or a single dict[str, DucklakeType]. 138 139 Returns: 140 A new Schema with field IDs assigned sequentially starting from 1. 141 142 Raises: 143 TypeError: If args are mixed types or invalid. 144 ValueError: If duplicate field names are provided. 145 """ 146 if not args: 147 raise TypeError("Schema.of() requires at least one argument") 148 149 # Single-dict form 150 if len(args) == 1 and isinstance(args[0], dict): 151 mapping = args[0] 152 fields: list[NestedField] = [] 153 for i, (name, ftype) in enumerate(mapping.items(), start=1): 154 if not isinstance(ftype, DucklakeType): # pyright: ignore[reportUnnecessaryIsInstance] 155 raise TypeError( 156 f"Dict values must be DucklakeType instances, got {type(ftype).__name__} for key {name!r}" 157 ) 158 fields.append(NestedField(field_id=i, name=name, field_type=ftype, required=False)) 159 return cls(*fields) 160 161 # NestedField form — validate types first 162 raw_fields: list[NestedField] = [] 163 for arg in args: 164 if not isinstance(arg, NestedField): 165 raise TypeError( 166 f"Schema.of() arguments must be all NestedField or a single dict, got {type(arg).__name__}" 167 ) 168 raw_fields.append(arg) 169 170 # 1. Collect all explicit (non-sentinel) field_ids 171 explicit_ids = {f.field_id for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID} 172 if len(explicit_ids) != sum(1 for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID): 173 raise ValueError("Duplicate explicit field_ids") 174 175 # 2. Auto-assign sentinel fields, skipping explicit IDs 176 next_id = 1 177 resolved: list[NestedField] = [] 178 for f in raw_fields: 179 if f.field_id == _SENTINEL_FIELD_ID: 180 while next_id in explicit_ids: 181 next_id += 1 182 resolved.append( 183 NestedField( 184 field_id=next_id, 185 name=f.name, 186 field_type=f.field_type, 187 required=f.required, 188 doc=f.doc, 189 ) 190 ) 191 next_id += 1 192 else: 193 resolved.append(f) 194 195 return cls(*resolved) 196 197 # -- Properties ---------------------------------------------------------- 198 199 @property 200 def fields(self) -> tuple[NestedField, ...]: 201 return self._fields 202 203 @property 204 def schema_id(self) -> int: 205 return self._schema_id 206 207 # -- Field lookup -------------------------------------------------------- 208 209 def find_field(self, name_or_id: str | int, case_sensitive: bool = True) -> NestedField: 210 """Find a field by name or field_id. Raises ValueError if not found.""" 211 if isinstance(name_or_id, int): 212 field = self._id_to_field.get(name_or_id) 213 if field is None: 214 raise ValueError(f"Field with field_id {name_or_id} not found") 215 return field 216 217 if case_sensitive: 218 field = self._name_to_field.get(name_or_id) 219 if field is None: 220 raise ValueError(f"Field {name_or_id!r} not found") 221 return field 222 223 # Case-insensitive 224 lower = name_or_id.lower() 225 for f in self._fields: 226 if f.name.lower() == lower: 227 return f 228 raise ValueError(f"Field {name_or_id!r} not found") 229 230 def find_type(self, name_or_id: str | int, case_sensitive: bool = True) -> DucklakeType: 231 """Find a field's type by name or field_id.""" 232 return self.find_field(name_or_id, case_sensitive=case_sensitive).field_type 233 234 def find_column_name(self, field_id: int) -> str | None: 235 """Find a column name by field_id. Returns None if not found.""" 236 field = self._id_to_field.get(field_id) 237 return field.name if field is not None else None 238 239 # -- Accessors ----------------------------------------------------------- 240 241 def column_names(self) -> list[str]: 242 return [f.name for f in self._fields] 243 244 def field_ids(self) -> set[int]: 245 return {f.field_id for f in self._fields} 246 247 @property 248 def highest_field_id(self) -> int: 249 """Highest field_id in the schema (useful for generating new IDs).""" 250 if not self._fields: 251 return 0 252 return max(f.field_id for f in self._fields) 253 254 # -- Conversion ---------------------------------------------------------- 255 256 def as_struct(self) -> StructType: 257 """Convert to a StructType.""" 258 return StructType(fields=self._fields) 259 260 def as_arrow(self) -> pa.Schema: 261 """Convert to a PyArrow Schema. 262 263 Each field carries ``PARQUET:field_id`` metadata matching Ducklake/Iceberg 264 conventions for Parquet files. 265 """ 266 arrow_fields: list[pa.Field[pa.DataType]] = [] 267 for f in self._fields: 268 arrow_type = ducklake_type_to_arrow(f.field_type) 269 arrow_field = pa.field( 270 f.name, 271 arrow_type, 272 nullable=not f.required, 273 metadata={b"PARQUET:field_id": str(f.field_id).encode()}, 274 ) 275 arrow_fields.append(arrow_field) 276 return pa.schema(arrow_fields) 277 278 # -- Projection ---------------------------------------------------------- 279 280 def select(self, *names: str, case_sensitive: bool = True) -> Schema: 281 """Return a new Schema with only the selected columns. 282 283 Field order is preserved from the original schema. 284 Raises ValueError for unknown names. 285 """ 286 # Validate all names exist first 287 if case_sensitive: 288 selected: set[str] = set() 289 for n in names: 290 if n not in self._name_to_field: 291 raise ValueError(f"Field {n!r} not found") 292 selected.add(n) 293 kept = tuple(f for f in self._fields if f.name in selected) 294 else: 295 lower_names = {n.lower() for n in names} 296 # Validate 297 known_lower = {f.name.lower() for f in self._fields} 298 for n in names: 299 if n.lower() not in known_lower: 300 raise ValueError(f"Field {n!r} not found") 301 kept = tuple(f for f in self._fields if f.name.lower() in lower_names) 302 303 return Schema(*kept, schema_id=self._schema_id) 304 305 # -- Standard methods ---------------------------------------------------- 306 307 def __eq__(self, other: object) -> bool: 308 if not isinstance(other, Schema): 309 return NotImplemented 310 return self._fields == other._fields and self._schema_id == other._schema_id 311 312 def __repr__(self) -> str: 313 fields_repr = ", ".join(repr(f) for f in self._fields) 314 return f"Schema(fields=({fields_repr}), schema_id={self._schema_id})" 315 316 def __len__(self) -> int: 317 return len(self._fields) 318 319 def __iter__(self) -> Iterator[NestedField]: 320 return iter(self._fields)
Represents a Ducklake table schema.
79 def __init__(self, *fields: NestedField, schema_id: int = 0) -> None: 80 # Validate uniqueness 81 seen_names: set[str] = set() 82 seen_ids: set[int] = set() 83 name_index: dict[str, NestedField] = {} 84 id_index: dict[int, NestedField] = {} 85 86 for f in fields: 87 if f.name in seen_names: 88 raise ValueError(f"Duplicate field name: {f.name!r}") 89 if f.field_id in seen_ids: 90 raise ValueError(f"Duplicate field_id: {f.field_id}") 91 seen_names.add(f.name) 92 seen_ids.add(f.field_id) 93 name_index[f.name] = f 94 id_index[f.field_id] = f 95 96 self._fields = fields 97 self._schema_id = schema_id 98 self._name_to_field = name_index 99 self._id_to_field = id_index
103 @classmethod 104 def of(cls, *args: NestedField | dict[str, DucklakeType]) -> Schema: 105 """Create a Schema with auto-assigned field IDs. 106 107 Accepts either :class:`NestedField` objects (from :func:`required` / 108 :func:`optional`) or a single *dict* mapping column names to types. 109 110 Examples: 111 Using ``required()`` and ``optional()`` helpers: 112 113 ```pycon 114 >>> from pyducklake import Schema, required, optional, IntegerType, StringType, DoubleType 115 >>> schema = Schema.of( 116 ... required("id", IntegerType()), 117 ... optional("name", StringType()), 118 ... optional("value", DoubleType()), 119 ... ) 120 >>> schema.column_names() 121 ['id', 'name', 'value'] 122 >>> schema.find_field("id").required 123 True 124 125 ``` 126 127 Using a dict (all fields optional by default): 128 129 ```pycon 130 >>> schema = Schema.of({"id": IntegerType(), "name": StringType()}) 131 >>> schema.column_names() 132 ['id', 'name'] 133 134 ``` 135 136 Args: 137 *args: Either NestedField objects or a single dict[str, DucklakeType]. 138 139 Returns: 140 A new Schema with field IDs assigned sequentially starting from 1. 141 142 Raises: 143 TypeError: If args are mixed types or invalid. 144 ValueError: If duplicate field names are provided. 145 """ 146 if not args: 147 raise TypeError("Schema.of() requires at least one argument") 148 149 # Single-dict form 150 if len(args) == 1 and isinstance(args[0], dict): 151 mapping = args[0] 152 fields: list[NestedField] = [] 153 for i, (name, ftype) in enumerate(mapping.items(), start=1): 154 if not isinstance(ftype, DucklakeType): # pyright: ignore[reportUnnecessaryIsInstance] 155 raise TypeError( 156 f"Dict values must be DucklakeType instances, got {type(ftype).__name__} for key {name!r}" 157 ) 158 fields.append(NestedField(field_id=i, name=name, field_type=ftype, required=False)) 159 return cls(*fields) 160 161 # NestedField form — validate types first 162 raw_fields: list[NestedField] = [] 163 for arg in args: 164 if not isinstance(arg, NestedField): 165 raise TypeError( 166 f"Schema.of() arguments must be all NestedField or a single dict, got {type(arg).__name__}" 167 ) 168 raw_fields.append(arg) 169 170 # 1. Collect all explicit (non-sentinel) field_ids 171 explicit_ids = {f.field_id for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID} 172 if len(explicit_ids) != sum(1 for f in raw_fields if f.field_id != _SENTINEL_FIELD_ID): 173 raise ValueError("Duplicate explicit field_ids") 174 175 # 2. Auto-assign sentinel fields, skipping explicit IDs 176 next_id = 1 177 resolved: list[NestedField] = [] 178 for f in raw_fields: 179 if f.field_id == _SENTINEL_FIELD_ID: 180 while next_id in explicit_ids: 181 next_id += 1 182 resolved.append( 183 NestedField( 184 field_id=next_id, 185 name=f.name, 186 field_type=f.field_type, 187 required=f.required, 188 doc=f.doc, 189 ) 190 ) 191 next_id += 1 192 else: 193 resolved.append(f) 194 195 return cls(*resolved)
Create a Schema with auto-assigned field IDs.
Accepts either NestedField objects (from required() /
optional()) or a single dict mapping column names to types.
Examples:
Using required() and optional() helpers:
>>> from pyducklake import Schema, required, optional, IntegerType, StringType, DoubleType
>>> schema = Schema.of(
... required("id", IntegerType()),
... optional("name", StringType()),
... optional("value", DoubleType()),
... )
>>> schema.column_names()
['id', 'name', 'value']
>>> schema.find_field("id").required
True
Using a dict (all fields optional by default):
>>> schema = Schema.of({"id": IntegerType(), "name": StringType()})
>>> schema.column_names()
['id', 'name']
Args: *args: Either NestedField objects or a single dict[str, DucklakeType].
Returns: A new Schema with field IDs assigned sequentially starting from 1.
Raises: TypeError: If args are mixed types or invalid. ValueError: If duplicate field names are provided.
209 def find_field(self, name_or_id: str | int, case_sensitive: bool = True) -> NestedField: 210 """Find a field by name or field_id. Raises ValueError if not found.""" 211 if isinstance(name_or_id, int): 212 field = self._id_to_field.get(name_or_id) 213 if field is None: 214 raise ValueError(f"Field with field_id {name_or_id} not found") 215 return field 216 217 if case_sensitive: 218 field = self._name_to_field.get(name_or_id) 219 if field is None: 220 raise ValueError(f"Field {name_or_id!r} not found") 221 return field 222 223 # Case-insensitive 224 lower = name_or_id.lower() 225 for f in self._fields: 226 if f.name.lower() == lower: 227 return f 228 raise ValueError(f"Field {name_or_id!r} not found")
Find a field by name or field_id. Raises ValueError if not found.
230 def find_type(self, name_or_id: str | int, case_sensitive: bool = True) -> DucklakeType: 231 """Find a field's type by name or field_id.""" 232 return self.find_field(name_or_id, case_sensitive=case_sensitive).field_type
Find a field's type by name or field_id.
234 def find_column_name(self, field_id: int) -> str | None: 235 """Find a column name by field_id. Returns None if not found.""" 236 field = self._id_to_field.get(field_id) 237 return field.name if field is not None else None
Find a column name by field_id. Returns None if not found.
247 @property 248 def highest_field_id(self) -> int: 249 """Highest field_id in the schema (useful for generating new IDs).""" 250 if not self._fields: 251 return 0 252 return max(f.field_id for f in self._fields)
Highest field_id in the schema (useful for generating new IDs).
256 def as_struct(self) -> StructType: 257 """Convert to a StructType.""" 258 return StructType(fields=self._fields)
Convert to a StructType.
260 def as_arrow(self) -> pa.Schema: 261 """Convert to a PyArrow Schema. 262 263 Each field carries ``PARQUET:field_id`` metadata matching Ducklake/Iceberg 264 conventions for Parquet files. 265 """ 266 arrow_fields: list[pa.Field[pa.DataType]] = [] 267 for f in self._fields: 268 arrow_type = ducklake_type_to_arrow(f.field_type) 269 arrow_field = pa.field( 270 f.name, 271 arrow_type, 272 nullable=not f.required, 273 metadata={b"PARQUET:field_id": str(f.field_id).encode()}, 274 ) 275 arrow_fields.append(arrow_field) 276 return pa.schema(arrow_fields)
Convert to a PyArrow Schema.
Each field carries PARQUET:field_id metadata matching Ducklake/Iceberg
conventions for Parquet files.
280 def select(self, *names: str, case_sensitive: bool = True) -> Schema: 281 """Return a new Schema with only the selected columns. 282 283 Field order is preserved from the original schema. 284 Raises ValueError for unknown names. 285 """ 286 # Validate all names exist first 287 if case_sensitive: 288 selected: set[str] = set() 289 for n in names: 290 if n not in self._name_to_field: 291 raise ValueError(f"Field {n!r} not found") 292 selected.add(n) 293 kept = tuple(f for f in self._fields if f.name in selected) 294 else: 295 lower_names = {n.lower() for n in names} 296 # Validate 297 known_lower = {f.name.lower() for f in self._fields} 298 for n in names: 299 if n.lower() not in known_lower: 300 raise ValueError(f"Field {n!r} not found") 301 kept = tuple(f for f in self._fields if f.name.lower() in lower_names) 302 303 return Schema(*kept, schema_id=self._schema_id)
Return a new Schema with only the selected columns.
Field order is preserved from the original schema. Raises ValueError for unknown names.
47def optional(name: str, field_type: DucklakeType, doc: str | None = None) -> NestedField: 48 """Create an optional (nullable) field for use with :meth:`Schema.of`. 49 50 The ``field_id`` is set to a sentinel value and will be auto-assigned 51 by :meth:`Schema.of`. 52 53 Args: 54 name: Column name. 55 field_type: Column type. 56 doc: Optional documentation string. 57 58 Example: 59 60 ```pycon 61 >>> from pyducklake import Schema, required, optional, IntegerType, StringType 62 >>> schema = Schema.of( 63 ... required("id", IntegerType()), 64 ... optional("name", StringType()), 65 ... ) 66 >>> schema.column_names() 67 ['id', 'name'] 68 69 ``` 70 """ 71 return NestedField(field_id=_SENTINEL_FIELD_ID, name=name, field_type=field_type, required=False, doc=doc)
Create an optional (nullable) field for use with Schema.of().
The field_id is set to a sentinel value and will be auto-assigned
by Schema.of().
Args: name: Column name. field_type: Column type. doc: Optional documentation string.
Example:
>>> from pyducklake import Schema, required, optional, IntegerType, StringType
>>> schema = Schema.of(
... required("id", IntegerType()),
... optional("name", StringType()),
... )
>>> schema.column_names()
['id', 'name']
22def required(name: str, field_type: DucklakeType, doc: str | None = None) -> NestedField: 23 """Create a required field for use with :meth:`Schema.of`. 24 25 The ``field_id`` is set to a sentinel value and will be auto-assigned 26 by :meth:`Schema.of`. 27 28 Args: 29 name: Column name. 30 field_type: Column type. 31 doc: Optional documentation string. 32 33 Example: 34 35 ```pycon 36 >>> from pyducklake import Schema, required, optional, IntegerType, StringType 37 >>> schema = Schema.of( 38 ... required("id", IntegerType()), 39 ... optional("name", StringType()), 40 ... ) 41 42 ``` 43 """ 44 return NestedField(field_id=_SENTINEL_FIELD_ID, name=name, field_type=field_type, required=True, doc=doc)
Create a required field for use with Schema.of().
The field_id is set to a sentinel value and will be auto-assigned
by Schema.of().
Args: name: Column name. field_type: Column type. doc: Optional documentation string.
Example:
>>> from pyducklake import Schema, required, optional, IntegerType, StringType
>>> schema = Schema.of(
... required("id", IntegerType()),
... optional("name", StringType()),
... )
12@dataclass(frozen=True) 13class Snapshot: 14 """Represents a Ducklake snapshot (committed transaction).""" 15 16 snapshot_id: int 17 timestamp: datetime 18 schema_version: int | None = None 19 changes: str | None = None 20 author: str | None = None 21 commit_message: str | None = None
Represents a Ducklake snapshot (committed transaction).
53class Table: 54 """Represents a loaded Ducklake table.""" 55 56 def __init__( 57 self, 58 identifier: tuple[str, str], 59 schema: Schema, 60 catalog: Catalog, 61 *, 62 sort_order: SortOrder | None = None, 63 ) -> None: 64 self._identifier = identifier 65 self._schema = schema 66 self._catalog = catalog 67 self._sort_order_cache: SortOrder | None = sort_order 68 69 @property 70 def name(self) -> str: 71 """Table name (without namespace).""" 72 return self._identifier[1] 73 74 @property 75 def namespace(self) -> str: 76 """Namespace (schema) name.""" 77 return self._identifier[0] 78 79 @property 80 def identifier(self) -> tuple[str, str]: 81 """(namespace, table_name) tuple.""" 82 return self._identifier 83 84 @property 85 def schema(self) -> Schema: 86 """Current table schema.""" 87 return self._schema 88 89 @property 90 def catalog(self) -> Catalog: 91 """The catalog this table belongs to.""" 92 return self._catalog 93 94 @property 95 def fully_qualified_name(self) -> str: 96 """catalog.namespace.table_name""" 97 return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1]) 98 99 def current_snapshot(self) -> Snapshot | None: 100 """Get current snapshot info. Returns None if table has no data.""" 101 snapshots = self.snapshots() 102 if not snapshots: 103 return None 104 return snapshots[-1] 105 106 def snapshots(self) -> list[Snapshot]: 107 """List all catalog-level snapshots. 108 109 Ducklake snapshots are catalog-wide (not per-table). Each snapshot 110 represents a transaction that may have modified any table in the 111 catalog. The table may not have changed in every returned snapshot. 112 """ 113 catalog_name = self._catalog.name 114 meta_schema = f"__ducklake_metadata_{catalog_name}" 115 try: 116 rows: list[tuple[Any, ...]] = self._catalog.fetchall( 117 f"SELECT snapshot_id, snapshot_time, schema_version " 118 f'FROM "{meta_schema}".ducklake_snapshot ' 119 f"ORDER BY snapshot_id" 120 ) 121 except (duckdb.CatalogException, duckdb.BinderException): 122 return [] 123 124 snapshots: list[Snapshot] = [] 125 for row in rows: 126 snapshot_id = int(row[0]) 127 ts = row[1] 128 if isinstance(ts, datetime): 129 timestamp = ts 130 else: 131 timestamp = datetime.fromtimestamp(0, tz=timezone.utc) 132 133 schema_version = int(row[2]) if len(row) > 2 and row[2] is not None else None 134 135 snapshots.append( 136 Snapshot( 137 snapshot_id=snapshot_id, 138 timestamp=timestamp, 139 schema_version=schema_version, 140 ) 141 ) 142 return snapshots 143 144 def refresh(self) -> Table: 145 """Reload schema and metadata from the catalog. Returns self.""" 146 self._schema = self._catalog.build_schema_from_describe(self._identifier[0], self._identifier[1]) 147 self._sort_order_cache = None 148 return self 149 150 # -- Rollback -------------------------------------------------------------- 151 152 def rollback_to_snapshot(self, snapshot_id: int) -> None: 153 """Roll back the table to a previous snapshot. 154 155 This creates a new snapshot that matches the state at the given snapshot_id. 156 Data written after that snapshot becomes inaccessible (but files aren't 157 deleted until maintenance operations run). 158 159 Args: 160 snapshot_id: The snapshot ID to roll back to. 161 162 Raises: 163 ValueError: If snapshot_id doesn't exist. 164 """ 165 known_ids = {s.snapshot_id for s in self.snapshots()} 166 if snapshot_id not in known_ids: 167 raise ValueError(f"Snapshot {snapshot_id} does not exist") 168 169 fqn = self.fully_qualified_name 170 conn = self._catalog.connection 171 conn.execute( 172 f"CREATE OR REPLACE TEMP TABLE _pyducklake_rollback AS SELECT * FROM {fqn} AT (VERSION => {snapshot_id})" 173 ) 174 conn.execute(f"DELETE FROM {fqn}") 175 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_rollback") 176 conn.execute("DROP TABLE _pyducklake_rollback") 177 178 def rollback_to_timestamp(self, timestamp: datetime) -> None: 179 """Roll back the table to the state at a given timestamp. 180 181 Finds the latest snapshot at or before the timestamp and rolls back to it. 182 183 Args: 184 timestamp: The point in time to roll back to. 185 186 Raises: 187 ValueError: If no snapshot exists at or before the timestamp. 188 """ 189 snapshots = self.snapshots() 190 # Normalize timestamp once to avoid mutating across iterations 191 ts = timestamp 192 candidate: Snapshot | None = None 193 for snap in snapshots: 194 snap_ts = snap.timestamp 195 # Make both offset-aware or both naive for comparison 196 if ts.tzinfo is not None and snap_ts.tzinfo is None: 197 snap_ts = snap_ts.replace(tzinfo=ts.tzinfo) 198 elif ts.tzinfo is None and snap_ts.tzinfo is not None: 199 ts = ts.replace(tzinfo=snap_ts.tzinfo) 200 if snap_ts <= ts: 201 candidate = snap 202 if candidate is None: 203 raise ValueError(f"No snapshot exists at or before {timestamp}") 204 self.rollback_to_snapshot(candidate.snapshot_id) 205 206 # -- Scan ------------------------------------------------------------------ 207 208 def scan( 209 self, 210 row_filter: BooleanExpression | str = AlwaysTrue(), 211 selected_fields: tuple[str, ...] = ("*",), 212 snapshot_id: int | None = None, 213 limit: int | None = None, 214 ) -> DataScan: 215 """Create a scan on this table. 216 217 Examples: 218 219 ```pycon 220 >>> import tempfile, os, pyarrow as pa 221 >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType 222 >>> tmp = tempfile.mkdtemp() 223 >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d")) 224 >>> table = cat.create_table("users", Schema.of( 225 ... required("id", IntegerType()), optional("name", StringType()), 226 ... )) 227 >>> table.append(pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]})) 228 >>> table.scan().count() 229 3 230 >>> table.scan().select("name").to_arrow().column("name").to_pylist() 231 ['alice', 'bob', 'carol'] 232 >>> table.scan('"id" > 1').count() 233 2 234 235 ``` 236 """ 237 from pyducklake.scan import DataScan, RawSQL 238 239 if isinstance(row_filter, str): 240 actual_filter: BooleanExpression = RawSQL(row_filter) 241 else: 242 actual_filter = row_filter 243 244 return DataScan( 245 table=self, 246 row_filter=actual_filter, 247 selected_fields=selected_fields, 248 snapshot_id=snapshot_id, 249 limit=limit, 250 ) 251 252 # -- Add Files ------------------------------------------------------------- 253 254 def add_files( 255 self, 256 file_paths: list[str] | str, 257 *, 258 allow_missing: bool = False, 259 ignore_extra_columns: bool = False, 260 ) -> None: 261 """Register external Parquet files with this table. 262 263 Args: 264 file_paths: Path(s) to Parquet files to register. 265 allow_missing: If True, columns present in the table but 266 missing from the file are filled with their initial 267 default value. 268 ignore_extra_columns: If True, extra columns in the file 269 that aren't in the table are silently ignored. 270 271 The files must be valid Parquet files with a compatible schema. 272 """ 273 from pyducklake.catalog import escape_string_literal 274 275 if isinstance(file_paths, str): 276 file_paths = [file_paths] 277 278 catalog_name = self._catalog.name 279 namespace = self._identifier[0] 280 table_name = self._identifier[1] 281 282 opts = "" 283 if namespace != "main": 284 opts += f", schema := '{escape_string_literal(namespace)}'" 285 if allow_missing: 286 opts += ", allow_missing := true" 287 if ignore_extra_columns: 288 opts += ", ignore_extra_columns := true" 289 290 esc_cat = escape_string_literal(catalog_name) 291 esc_tbl = escape_string_literal(table_name) 292 for path in file_paths: 293 esc_path = escape_string_literal(path) 294 self._catalog.connection.execute( 295 f"CALL ducklake_add_data_files('{esc_cat}', '{esc_tbl}', '{esc_path}'{opts})" 296 ) 297 298 # -- Write ----------------------------------------------------------------- 299 300 @staticmethod 301 def _to_arrow_table(df: ArrowCompatible) -> pa.Table: 302 """Convert input to pyarrow.Table, supporting the Arrow PyCapsule interface. 303 304 Accepts ``pyarrow.Table`` directly, or any object implementing 305 ``__arrow_c_stream__`` (e.g. Polars DataFrame, nanoarrow array, 306 arro3 table). 307 """ 308 if isinstance(df, pa.Table): 309 return df 310 if hasattr(df, "__arrow_c_stream__"): 311 return pa.RecordBatchReader.from_stream(df).read_all() 312 raise TypeError(f"Expected pyarrow.Table or object implementing __arrow_c_stream__, got {type(df).__name__}") 313 314 def _sort_order_clause(self) -> str | None: 315 """Return an ORDER BY clause if the table has a sort order, else None.""" 316 sort = self.sort_order 317 if sort.is_unsorted: 318 return None 319 return ", ".join(f.to_sql() for f in sort.fields) 320 321 def append(self, df: ArrowCompatible) -> None: 322 """Append data to the table. 323 324 Accepts ``pyarrow.Table`` or any object implementing the Arrow 325 PyCapsule interface (``__arrow_c_stream__``). 326 327 If the table has a sort order, data is sorted before insertion. 328 329 Examples: 330 331 ```pycon 332 >>> import tempfile, os, pyarrow as pa 333 >>> from pyducklake import Catalog, Schema, required, IntegerType 334 >>> tmp = tempfile.mkdtemp() 335 >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d")) 336 >>> table = cat.create_table("nums", Schema.of(required("n", IntegerType()))) 337 >>> table.append(pa.table({"n": [1, 2, 3]})) 338 >>> table.append(pa.table({"n": [4, 5]})) 339 >>> table.scan().count() 340 5 341 342 ``` 343 """ 344 arrow_table = self._to_arrow_table(df) 345 order = self._sort_order_clause() 346 conn = self._catalog.connection 347 conn.register("_pyducklake_tmp_append", arrow_table) 348 try: 349 fqn = self.fully_qualified_name 350 if order: 351 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append ORDER BY {order}") 352 else: 353 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append") 354 finally: 355 conn.unregister("_pyducklake_tmp_append") 356 357 def append_batches( 358 self, 359 batches: pa.RecordBatchReader | Iterator[pa.RecordBatch], 360 *, 361 schema: pa.Schema | None = None, 362 ) -> None: 363 """Append data from a stream of record batches. 364 365 Memory-efficient alternative to :meth:`append` — processes batches 366 without materializing the full dataset in memory. 367 368 Args: 369 batches: RecordBatchReader or iterator of RecordBatch. 370 schema: Required if passing an iterator (not needed for RecordBatchReader). 371 """ 372 if isinstance(batches, pa.RecordBatchReader): 373 reader = batches 374 else: 375 if schema is None: 376 raise ValueError("schema is required when passing an iterator of RecordBatch") 377 reader = pa.RecordBatchReader.from_batches(schema, batches) 378 379 order = self._sort_order_clause() 380 conn = self._catalog.connection 381 conn.register("_pyducklake_tmp_batches", reader) 382 try: 383 fqn = self.fully_qualified_name 384 if order: 385 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches ORDER BY {order}") 386 else: 387 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches") 388 finally: 389 conn.unregister("_pyducklake_tmp_batches") 390 391 def overwrite( 392 self, 393 df: ArrowCompatible, 394 overwrite_filter: BooleanExpression | str = AlwaysTrue(), 395 ) -> None: 396 """Overwrite data matching the filter, then insert new data. 397 398 Accepts ``pyarrow.Table`` or any object implementing the Arrow 399 PyCapsule interface (``__arrow_c_stream__``). 400 401 If overwrite_filter is AlwaysTrue or not provided, truncates and inserts. 402 Otherwise, deletes matching rows then inserts. 403 404 Does not manage its own transaction — relies on the caller (or DuckDB 405 auto-commit) for atomicity. This allows overwrite to be used inside 406 an explicit :class:`Transaction` without nesting conflicts. 407 """ 408 arrow_table = self._to_arrow_table(df) 409 order = self._sort_order_clause() 410 conn = self._catalog.connection 411 conn.register("_pyducklake_tmp_overwrite", arrow_table) 412 try: 413 if isinstance(overwrite_filter, str): 414 where: str | None = overwrite_filter 415 elif overwrite_filter == AlwaysTrue(): 416 where = None 417 else: 418 where = overwrite_filter.to_sql() 419 420 fqn = self.fully_qualified_name 421 if where: 422 conn.execute(f"DELETE FROM {fqn} WHERE {where}") 423 else: 424 conn.execute(f"DELETE FROM {fqn}") 425 426 if order: 427 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite ORDER BY {order}") 428 else: 429 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite") 430 finally: 431 conn.unregister("_pyducklake_tmp_overwrite") 432 433 # -- Delete ---------------------------------------------------------------- 434 435 def delete(self, delete_filter: BooleanExpression | str) -> None: 436 """Delete rows matching the filter. 437 438 Args: 439 delete_filter: Rows matching this filter are deleted. 440 """ 441 if isinstance(delete_filter, str): 442 filter_sql = delete_filter 443 elif delete_filter == AlwaysFalse(): 444 return 445 elif delete_filter == AlwaysTrue(): 446 filter_sql = None 447 else: 448 filter_sql = delete_filter.to_sql() 449 450 fqn = self.fully_qualified_name 451 if filter_sql is None: 452 self._catalog.connection.execute(f"DELETE FROM {fqn}") 453 else: 454 self._catalog.connection.execute(f"DELETE FROM {fqn} WHERE {filter_sql}") 455 456 # -- Upsert ---------------------------------------------------------------- 457 458 def upsert( 459 self, 460 df: ArrowCompatible, 461 join_cols: tuple[str, ...] | list[str], 462 ) -> UpsertResult: 463 """Upsert data: update existing rows matching on join_cols, insert new rows. 464 465 Accepts ``pyarrow.Table`` or any object implementing the Arrow 466 PyCapsule interface (``__arrow_c_stream__``). 467 468 Uses DuckDB's MERGE statement. 469 """ 470 arrow_table = self._to_arrow_table(df) 471 conn = self._catalog.connection 472 fqn = self.fully_qualified_name 473 join_col_set = set(join_cols) 474 475 # Count rows before to determine updated vs inserted 476 count_before = self.scan().count() 477 478 conn.register("_pyducklake_tmp_upsert", arrow_table) 479 try: 480 on_clause = " AND ".join(f'target."{col}" = source."{col}"' for col in join_cols) 481 482 all_cols = [field.name for field in arrow_table.schema] # pyright: ignore[reportUnknownVariableType] 483 non_join_cols = [c for c in all_cols if c not in join_col_set] 484 485 update_set = ", ".join(f'"{col}" = source."{col}"' for col in non_join_cols) 486 487 insert_cols = ", ".join(f'source."{col}"' for col in all_cols) 488 489 merge_sql = f"MERGE INTO {fqn} AS target USING _pyducklake_tmp_upsert AS source ON {on_clause}" 490 if update_set: 491 merge_sql += f" WHEN MATCHED THEN UPDATE SET {update_set}" 492 merge_sql += f" WHEN NOT MATCHED THEN INSERT VALUES ({insert_cols})" 493 494 conn.execute(merge_sql) 495 finally: 496 conn.unregister("_pyducklake_tmp_upsert") 497 498 count_after = self.scan().count() 499 rows_inserted = count_after - count_before 500 rows_updated = arrow_table.num_rows - rows_inserted 501 502 return UpsertResult(rows_updated=rows_updated, rows_inserted=rows_inserted) 503 504 # -- Partitioning ---------------------------------------------------------- 505 506 @property 507 def spec(self) -> PartitionSpec: 508 """Current partition spec. Returns UNPARTITIONED if not partitioned.""" 509 from pyducklake.catalog import escape_string_literal 510 from pyducklake.partitioning import ( 511 DAY, 512 HOUR, 513 IDENTITY, 514 MONTH, 515 UNPARTITIONED, 516 YEAR, 517 PartitionField, 518 PartitionSpec, 519 ) 520 521 catalog_name = self._catalog.name 522 meta_schema = f"__ducklake_metadata_{catalog_name}" 523 try: 524 rows: list[tuple[Any, ...]] = self._catalog.fetchall( 525 f"SELECT c.column_name, pc.transform " 526 f'FROM "{meta_schema}".ducklake_partition_column pc ' 527 f'JOIN "{meta_schema}".ducklake_partition_info pi ' 528 f"ON pc.partition_id = pi.partition_id " 529 f"AND pc.table_id = pi.table_id " 530 f'JOIN "{meta_schema}".ducklake_table t ' 531 f"ON pi.table_id = t.table_id " 532 f'JOIN "{meta_schema}".ducklake_schema s ' 533 f"ON t.schema_id = s.schema_id " 534 f'JOIN "{meta_schema}".ducklake_column c ' 535 f"ON pc.column_id = c.column_id " 536 f"AND pc.table_id = c.table_id " 537 f"WHERE t.table_name = '{escape_string_literal(self._identifier[1])}' " 538 f"AND s.schema_name = '{escape_string_literal(self._identifier[0])}' " 539 f"AND pi.end_snapshot IS NULL " 540 f"ORDER BY pc.partition_key_index" 541 ) 542 except (duckdb.CatalogException, duckdb.BinderException): 543 return UNPARTITIONED 544 545 if not rows: 546 return UNPARTITIONED 547 548 _transform_map = { 549 "identity": IDENTITY, 550 "year": YEAR, 551 "month": MONTH, 552 "day": DAY, 553 "hour": HOUR, 554 } 555 556 fields: list[PartitionField] = [] 557 for col_name, transform_name in rows: 558 transform = _transform_map.get(str(transform_name).lower(), IDENTITY) 559 fields.append(PartitionField(source_column=str(col_name), transform=transform)) 560 561 return PartitionSpec(*fields) 562 563 def update_spec(self) -> UpdateSpec: 564 """Begin partition spec evolution. Returns an UpdateSpec builder.""" 565 from pyducklake.partitioning import UpdateSpec 566 567 return UpdateSpec(self) 568 569 # -- Sorting --------------------------------------------------------------- 570 571 @property 572 def sort_order(self) -> SortOrder: 573 """Current sort order. Returns UNSORTED if not sorted.""" 574 if self._sort_order_cache is not None: 575 return self._sort_order_cache 576 577 result = self._fetch_sort_order() 578 self._sort_order_cache = result 579 return result 580 581 def _fetch_sort_order(self) -> SortOrder: 582 """Query DuckLake metadata for the current sort order.""" 583 from pyducklake.catalog import escape_string_literal 584 from pyducklake.sorting import ( 585 UNSORTED, 586 NullOrder, 587 SortDirection, 588 SortField, 589 SortOrder, 590 ) 591 592 catalog_name = self._catalog.name 593 meta_schema = f"__ducklake_metadata_{catalog_name}" 594 try: 595 rows: list[tuple[Any, ...]] = self._catalog.fetchall( 596 f"SELECT se.expression, se.sort_direction, se.null_order " 597 f'FROM "{meta_schema}".ducklake_sort_expression se ' 598 f'JOIN "{meta_schema}".ducklake_sort_info si ' 599 f"ON se.sort_id = si.sort_id " 600 f"AND se.table_id = si.table_id " 601 f'JOIN "{meta_schema}".ducklake_table t ' 602 f"ON si.table_id = t.table_id " 603 f'JOIN "{meta_schema}".ducklake_schema s ' 604 f"ON t.schema_id = s.schema_id " 605 f"WHERE t.table_name = '{escape_string_literal(self._identifier[1])}' " 606 f"AND s.schema_name = '{escape_string_literal(self._identifier[0])}' " 607 f"AND si.end_snapshot IS NULL " 608 f"ORDER BY se.sort_key_index" 609 ) 610 except (duckdb.CatalogException, duckdb.BinderException): 611 return UNSORTED 612 613 if not rows: 614 return UNSORTED 615 616 fields: list[SortField] = [] 617 for expr, sort_dir, null_ord in rows: 618 # expression is a quoted identifier like '"name"' — strip quotes 619 col_name = str(expr).strip('"') 620 direction = SortDirection.DESC if str(sort_dir).upper() == "DESC" else SortDirection.ASC 621 null_order = NullOrder.NULLS_FIRST if "FIRST" in str(null_ord).upper() else NullOrder.NULLS_LAST 622 fields.append( 623 SortField( 624 source_column=col_name, 625 direction=direction, 626 null_order=null_order, 627 ) 628 ) 629 630 return SortOrder(fields=tuple(fields)) 631 632 def update_sort_order(self) -> UpdateSortOrder: 633 """Begin sort order evolution. Returns an UpdateSortOrder builder.""" 634 from pyducklake.sorting import UpdateSortOrder 635 636 return UpdateSortOrder(self) 637 638 # -- Arrow Dataset --------------------------------------------------------- 639 640 def to_arrow_dataset(self, *, snapshot_id: int | None = None) -> ds.Dataset: 641 """Return this table as a PyArrow Dataset. 642 643 Enables interop with engines that consume the PyArrow dataset API 644 (DuckDB, Polars, DataFusion, Dask, etc.). 645 646 Args: 647 snapshot_id: Optional snapshot for time travel. 648 649 Returns: 650 A pyarrow.dataset.Dataset wrapping this table's data. 651 """ 652 import pyarrow.dataset as ds 653 654 scan = self.scan(snapshot_id=snapshot_id) 655 return ds.dataset(scan.to_arrow()) # pyright: ignore[reportUnknownMemberType] 656 657 # -- Inspect --------------------------------------------------------------- 658 659 def inspect(self) -> InspectTable: 660 """Return an :class:`InspectTable` for metadata introspection.""" 661 from pyducklake.inspect import InspectTable 662 663 return InspectTable(self) 664 665 # -- Maintenance ----------------------------------------------------------- 666 667 def maintenance(self) -> MaintenanceTable: 668 """Get maintenance operations for this table.""" 669 from pyducklake.maintenance import MaintenanceTable 670 671 return MaintenanceTable(self) 672 673 # -- CDC (Change Data Capture) --------------------------------------------- 674 675 def table_changes( 676 self, 677 start_snapshot: int | None = None, 678 end_snapshot: int | None = None, 679 *, 680 start_time: datetime | None = None, 681 end_time: datetime | None = None, 682 columns: tuple[str, ...] | list[str] | None = None, 683 filter_expr: str | None = None, 684 ) -> ChangeSet: 685 """Query all changes between two snapshots or timestamps. 686 687 Returns a ChangeSet with inserts, deletes, and update pre/post images. 688 """ 689 from pyducklake.cdc import ChangeSet 690 691 return ChangeSet( 692 self._cdc_query( 693 "ducklake_table_changes", 694 start_snapshot, 695 end_snapshot, 696 start_time=start_time, 697 end_time=end_time, 698 columns=columns, 699 filter_expr=filter_expr, 700 meta_cols=("snapshot_id", "rowid", "change_type"), 701 ), 702 change_type_col="change_type", 703 ) 704 705 def table_insertions( 706 self, 707 start_snapshot: int | None = None, 708 end_snapshot: int | None = None, 709 *, 710 start_time: datetime | None = None, 711 end_time: datetime | None = None, 712 columns: tuple[str, ...] | list[str] | None = None, 713 filter_expr: str | None = None, 714 ) -> ChangeSet: 715 """Query inserted rows between two snapshots or timestamps.""" 716 from pyducklake.cdc import ChangeSet 717 718 return ChangeSet( 719 self._cdc_query( 720 "ducklake_table_insertions", 721 start_snapshot, 722 end_snapshot, 723 start_time=start_time, 724 end_time=end_time, 725 columns=columns, 726 filter_expr=filter_expr, 727 meta_cols=(), 728 ), 729 change_type_col=None, 730 ) 731 732 def table_deletions( 733 self, 734 start_snapshot: int | None = None, 735 end_snapshot: int | None = None, 736 *, 737 start_time: datetime | None = None, 738 end_time: datetime | None = None, 739 columns: tuple[str, ...] | list[str] | None = None, 740 filter_expr: str | None = None, 741 ) -> ChangeSet: 742 """Query deleted rows between two snapshots or timestamps.""" 743 from pyducklake.cdc import ChangeSet 744 745 return ChangeSet( 746 self._cdc_query( 747 "ducklake_table_deletions", 748 start_snapshot, 749 end_snapshot, 750 start_time=start_time, 751 end_time=end_time, 752 columns=columns, 753 filter_expr=filter_expr, 754 meta_cols=(), 755 ), 756 change_type_col=None, 757 ) 758 759 @staticmethod 760 def _validate_cdc_bounds( 761 start_snapshot: int | None, 762 end_snapshot: int | None, 763 start_time: datetime | None, 764 end_time: datetime | None, 765 ) -> None: 766 """Validate CDC bound arguments.""" 767 has_snapshot = start_snapshot is not None or end_snapshot is not None 768 has_time = start_time is not None or end_time is not None 769 770 if has_snapshot and has_time: 771 raise ValueError( 772 "Cannot mix snapshot and timestamp bounds. " 773 "Use either (start_snapshot, end_snapshot) or (start_time, end_time)." 774 ) 775 776 if not has_snapshot and not has_time: 777 raise ValueError("Must provide either (start_snapshot, end_snapshot) or (start_time, end_time).") 778 779 if has_snapshot and start_snapshot is None: 780 raise ValueError("start_snapshot is required when using snapshot bounds.") 781 782 if has_time and start_time is None: 783 raise ValueError("start_time is required when using timestamp bounds.") 784 785 def _cdc_query( 786 self, 787 func_name: str, 788 start_snapshot: int | None, 789 end_snapshot: int | None, 790 *, 791 start_time: datetime | None = None, 792 end_time: datetime | None = None, 793 columns: tuple[str, ...] | list[str] | None = None, 794 filter_expr: str | None = None, 795 meta_cols: tuple[str, ...] = (), 796 ) -> pa.Table: 797 """Execute a CDC function and return the result as an Arrow table.""" 798 self._validate_cdc_bounds(start_snapshot, end_snapshot, start_time, end_time) 799 800 catalog_name = self._catalog.name 801 namespace = self._identifier[0] 802 table_name = self._identifier[1] 803 804 # Build bound arguments 805 if start_time is not None: 806 start_arg = f"'{start_time.strftime('%Y-%m-%d %H:%M:%S.%f')}'::TIMESTAMP" 807 if end_time is not None: 808 end_arg = f"'{end_time.strftime('%Y-%m-%d %H:%M:%S.%f')}'::TIMESTAMP" 809 else: 810 end_arg = "CURRENT_TIMESTAMP" 811 else: 812 assert start_snapshot is not None # validated above 813 start_arg = f"{start_snapshot}::BIGINT" 814 if end_snapshot is not None: 815 end_arg = f"{end_snapshot}::BIGINT" 816 else: 817 snap = self.current_snapshot() 818 resolved_end = snap.snapshot_id if snap is not None else start_snapshot 819 end_arg = f"{resolved_end}::BIGINT" 820 821 # Build column list 822 if columns is not None: 823 col_list = ", ".join([f'"{c}"' for c in meta_cols] + [f'"{c}"' for c in columns]) 824 else: 825 col_list = "*" 826 827 from pyducklake.catalog import escape_string_literal 828 829 esc_cat = escape_string_literal(catalog_name) 830 esc_ns = escape_string_literal(namespace) 831 esc_tbl = escape_string_literal(table_name) 832 sql = f"SELECT {col_list} FROM {func_name}('{esc_cat}', '{esc_ns}', '{esc_tbl}', {start_arg}, {end_arg})" 833 834 if filter_expr is not None: 835 sql += f" WHERE {filter_expr}" 836 837 result: Any = self._catalog.connection.execute(sql) 838 arrow_obj: Any = result.arrow() 839 if isinstance(arrow_obj, pa.Table): 840 return arrow_obj 841 tbl: pa.Table = arrow_obj.read_all() 842 return tbl 843 844 # -- Schema Evolution ------------------------------------------------------ 845 846 def update_schema(self) -> UpdateSchema: 847 """Begin schema evolution. Returns an UpdateSchema builder.""" 848 from pyducklake.schema_evolution import UpdateSchema 849 850 return UpdateSchema(self) 851 852 def __repr__(self) -> str: 853 return f"Table(identifier={self._identifier!r}, schema={self._schema!r})" 854 855 def __eq__(self, other: object) -> bool: 856 if not isinstance(other, Table): 857 return NotImplemented 858 return self._identifier == other._identifier and self._catalog.name == other._catalog.name
Represents a loaded Ducklake table.
69 @property 70 def name(self) -> str: 71 """Table name (without namespace).""" 72 return self._identifier[1]
Table name (without namespace).
74 @property 75 def namespace(self) -> str: 76 """Namespace (schema) name.""" 77 return self._identifier[0]
Namespace (schema) name.
79 @property 80 def identifier(self) -> tuple[str, str]: 81 """(namespace, table_name) tuple.""" 82 return self._identifier
(namespace, table_name) tuple.
89 @property 90 def catalog(self) -> Catalog: 91 """The catalog this table belongs to.""" 92 return self._catalog
The catalog this table belongs to.
94 @property 95 def fully_qualified_name(self) -> str: 96 """catalog.namespace.table_name""" 97 return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1])
catalog.namespace.table_name
99 def current_snapshot(self) -> Snapshot | None: 100 """Get current snapshot info. Returns None if table has no data.""" 101 snapshots = self.snapshots() 102 if not snapshots: 103 return None 104 return snapshots[-1]
Get current snapshot info. Returns None if table has no data.
106 def snapshots(self) -> list[Snapshot]: 107 """List all catalog-level snapshots. 108 109 Ducklake snapshots are catalog-wide (not per-table). Each snapshot 110 represents a transaction that may have modified any table in the 111 catalog. The table may not have changed in every returned snapshot. 112 """ 113 catalog_name = self._catalog.name 114 meta_schema = f"__ducklake_metadata_{catalog_name}" 115 try: 116 rows: list[tuple[Any, ...]] = self._catalog.fetchall( 117 f"SELECT snapshot_id, snapshot_time, schema_version " 118 f'FROM "{meta_schema}".ducklake_snapshot ' 119 f"ORDER BY snapshot_id" 120 ) 121 except (duckdb.CatalogException, duckdb.BinderException): 122 return [] 123 124 snapshots: list[Snapshot] = [] 125 for row in rows: 126 snapshot_id = int(row[0]) 127 ts = row[1] 128 if isinstance(ts, datetime): 129 timestamp = ts 130 else: 131 timestamp = datetime.fromtimestamp(0, tz=timezone.utc) 132 133 schema_version = int(row[2]) if len(row) > 2 and row[2] is not None else None 134 135 snapshots.append( 136 Snapshot( 137 snapshot_id=snapshot_id, 138 timestamp=timestamp, 139 schema_version=schema_version, 140 ) 141 ) 142 return snapshots
List all catalog-level snapshots.
Ducklake snapshots are catalog-wide (not per-table). Each snapshot represents a transaction that may have modified any table in the catalog. The table may not have changed in every returned snapshot.
144 def refresh(self) -> Table: 145 """Reload schema and metadata from the catalog. Returns self.""" 146 self._schema = self._catalog.build_schema_from_describe(self._identifier[0], self._identifier[1]) 147 self._sort_order_cache = None 148 return self
Reload schema and metadata from the catalog. Returns self.
152 def rollback_to_snapshot(self, snapshot_id: int) -> None: 153 """Roll back the table to a previous snapshot. 154 155 This creates a new snapshot that matches the state at the given snapshot_id. 156 Data written after that snapshot becomes inaccessible (but files aren't 157 deleted until maintenance operations run). 158 159 Args: 160 snapshot_id: The snapshot ID to roll back to. 161 162 Raises: 163 ValueError: If snapshot_id doesn't exist. 164 """ 165 known_ids = {s.snapshot_id for s in self.snapshots()} 166 if snapshot_id not in known_ids: 167 raise ValueError(f"Snapshot {snapshot_id} does not exist") 168 169 fqn = self.fully_qualified_name 170 conn = self._catalog.connection 171 conn.execute( 172 f"CREATE OR REPLACE TEMP TABLE _pyducklake_rollback AS SELECT * FROM {fqn} AT (VERSION => {snapshot_id})" 173 ) 174 conn.execute(f"DELETE FROM {fqn}") 175 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_rollback") 176 conn.execute("DROP TABLE _pyducklake_rollback")
Roll back the table to a previous snapshot.
This creates a new snapshot that matches the state at the given snapshot_id. Data written after that snapshot becomes inaccessible (but files aren't deleted until maintenance operations run).
Args: snapshot_id: The snapshot ID to roll back to.
Raises: ValueError: If snapshot_id doesn't exist.
178 def rollback_to_timestamp(self, timestamp: datetime) -> None: 179 """Roll back the table to the state at a given timestamp. 180 181 Finds the latest snapshot at or before the timestamp and rolls back to it. 182 183 Args: 184 timestamp: The point in time to roll back to. 185 186 Raises: 187 ValueError: If no snapshot exists at or before the timestamp. 188 """ 189 snapshots = self.snapshots() 190 # Normalize timestamp once to avoid mutating across iterations 191 ts = timestamp 192 candidate: Snapshot | None = None 193 for snap in snapshots: 194 snap_ts = snap.timestamp 195 # Make both offset-aware or both naive for comparison 196 if ts.tzinfo is not None and snap_ts.tzinfo is None: 197 snap_ts = snap_ts.replace(tzinfo=ts.tzinfo) 198 elif ts.tzinfo is None and snap_ts.tzinfo is not None: 199 ts = ts.replace(tzinfo=snap_ts.tzinfo) 200 if snap_ts <= ts: 201 candidate = snap 202 if candidate is None: 203 raise ValueError(f"No snapshot exists at or before {timestamp}") 204 self.rollback_to_snapshot(candidate.snapshot_id)
Roll back the table to the state at a given timestamp.
Finds the latest snapshot at or before the timestamp and rolls back to it.
Args: timestamp: The point in time to roll back to.
Raises: ValueError: If no snapshot exists at or before the timestamp.
208 def scan( 209 self, 210 row_filter: BooleanExpression | str = AlwaysTrue(), 211 selected_fields: tuple[str, ...] = ("*",), 212 snapshot_id: int | None = None, 213 limit: int | None = None, 214 ) -> DataScan: 215 """Create a scan on this table. 216 217 Examples: 218 219 ```pycon 220 >>> import tempfile, os, pyarrow as pa 221 >>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType 222 >>> tmp = tempfile.mkdtemp() 223 >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d")) 224 >>> table = cat.create_table("users", Schema.of( 225 ... required("id", IntegerType()), optional("name", StringType()), 226 ... )) 227 >>> table.append(pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]})) 228 >>> table.scan().count() 229 3 230 >>> table.scan().select("name").to_arrow().column("name").to_pylist() 231 ['alice', 'bob', 'carol'] 232 >>> table.scan('"id" > 1').count() 233 2 234 235 ``` 236 """ 237 from pyducklake.scan import DataScan, RawSQL 238 239 if isinstance(row_filter, str): 240 actual_filter: BooleanExpression = RawSQL(row_filter) 241 else: 242 actual_filter = row_filter 243 244 return DataScan( 245 table=self, 246 row_filter=actual_filter, 247 selected_fields=selected_fields, 248 snapshot_id=snapshot_id, 249 limit=limit, 250 )
Create a scan on this table.
Examples:
>>> import tempfile, os, pyarrow as pa
>>> from pyducklake import Catalog, Schema, required, optional, IntegerType, StringType
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
>>> table = cat.create_table("users", Schema.of(
... required("id", IntegerType()), optional("name", StringType()),
... ))
>>> table.append(pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "carol"]}))
>>> table.scan().count()
3
>>> table.scan().select("name").to_arrow().column("name").to_pylist()
['alice', 'bob', 'carol']
>>> table.scan('"id" > 1').count()
2
254 def add_files( 255 self, 256 file_paths: list[str] | str, 257 *, 258 allow_missing: bool = False, 259 ignore_extra_columns: bool = False, 260 ) -> None: 261 """Register external Parquet files with this table. 262 263 Args: 264 file_paths: Path(s) to Parquet files to register. 265 allow_missing: If True, columns present in the table but 266 missing from the file are filled with their initial 267 default value. 268 ignore_extra_columns: If True, extra columns in the file 269 that aren't in the table are silently ignored. 270 271 The files must be valid Parquet files with a compatible schema. 272 """ 273 from pyducklake.catalog import escape_string_literal 274 275 if isinstance(file_paths, str): 276 file_paths = [file_paths] 277 278 catalog_name = self._catalog.name 279 namespace = self._identifier[0] 280 table_name = self._identifier[1] 281 282 opts = "" 283 if namespace != "main": 284 opts += f", schema := '{escape_string_literal(namespace)}'" 285 if allow_missing: 286 opts += ", allow_missing := true" 287 if ignore_extra_columns: 288 opts += ", ignore_extra_columns := true" 289 290 esc_cat = escape_string_literal(catalog_name) 291 esc_tbl = escape_string_literal(table_name) 292 for path in file_paths: 293 esc_path = escape_string_literal(path) 294 self._catalog.connection.execute( 295 f"CALL ducklake_add_data_files('{esc_cat}', '{esc_tbl}', '{esc_path}'{opts})" 296 )
Register external Parquet files with this table.
Args: file_paths: Path(s) to Parquet files to register. allow_missing: If True, columns present in the table but missing from the file are filled with their initial default value. ignore_extra_columns: If True, extra columns in the file that aren't in the table are silently ignored.
The files must be valid Parquet files with a compatible schema.
321 def append(self, df: ArrowCompatible) -> None: 322 """Append data to the table. 323 324 Accepts ``pyarrow.Table`` or any object implementing the Arrow 325 PyCapsule interface (``__arrow_c_stream__``). 326 327 If the table has a sort order, data is sorted before insertion. 328 329 Examples: 330 331 ```pycon 332 >>> import tempfile, os, pyarrow as pa 333 >>> from pyducklake import Catalog, Schema, required, IntegerType 334 >>> tmp = tempfile.mkdtemp() 335 >>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d")) 336 >>> table = cat.create_table("nums", Schema.of(required("n", IntegerType()))) 337 >>> table.append(pa.table({"n": [1, 2, 3]})) 338 >>> table.append(pa.table({"n": [4, 5]})) 339 >>> table.scan().count() 340 5 341 342 ``` 343 """ 344 arrow_table = self._to_arrow_table(df) 345 order = self._sort_order_clause() 346 conn = self._catalog.connection 347 conn.register("_pyducklake_tmp_append", arrow_table) 348 try: 349 fqn = self.fully_qualified_name 350 if order: 351 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append ORDER BY {order}") 352 else: 353 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_append") 354 finally: 355 conn.unregister("_pyducklake_tmp_append")
Append data to the table.
Accepts pyarrow.Table or any object implementing the Arrow
PyCapsule interface (__arrow_c_stream__).
If the table has a sort order, data is sorted before insertion.
Examples:
>>> import tempfile, os, pyarrow as pa
>>> from pyducklake import Catalog, Schema, required, IntegerType
>>> tmp = tempfile.mkdtemp()
>>> cat = Catalog("t", os.path.join(tmp, "m.duckdb"), data_path=os.path.join(tmp, "d"))
>>> table = cat.create_table("nums", Schema.of(required("n", IntegerType())))
>>> table.append(pa.table({"n": [1, 2, 3]}))
>>> table.append(pa.table({"n": [4, 5]}))
>>> table.scan().count()
5
357 def append_batches( 358 self, 359 batches: pa.RecordBatchReader | Iterator[pa.RecordBatch], 360 *, 361 schema: pa.Schema | None = None, 362 ) -> None: 363 """Append data from a stream of record batches. 364 365 Memory-efficient alternative to :meth:`append` — processes batches 366 without materializing the full dataset in memory. 367 368 Args: 369 batches: RecordBatchReader or iterator of RecordBatch. 370 schema: Required if passing an iterator (not needed for RecordBatchReader). 371 """ 372 if isinstance(batches, pa.RecordBatchReader): 373 reader = batches 374 else: 375 if schema is None: 376 raise ValueError("schema is required when passing an iterator of RecordBatch") 377 reader = pa.RecordBatchReader.from_batches(schema, batches) 378 379 order = self._sort_order_clause() 380 conn = self._catalog.connection 381 conn.register("_pyducklake_tmp_batches", reader) 382 try: 383 fqn = self.fully_qualified_name 384 if order: 385 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches ORDER BY {order}") 386 else: 387 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_batches") 388 finally: 389 conn.unregister("_pyducklake_tmp_batches")
Append data from a stream of record batches.
Memory-efficient alternative to append() — processes batches
without materializing the full dataset in memory.
Args: batches: RecordBatchReader or iterator of RecordBatch. schema: Required if passing an iterator (not needed for RecordBatchReader).
391 def overwrite( 392 self, 393 df: ArrowCompatible, 394 overwrite_filter: BooleanExpression | str = AlwaysTrue(), 395 ) -> None: 396 """Overwrite data matching the filter, then insert new data. 397 398 Accepts ``pyarrow.Table`` or any object implementing the Arrow 399 PyCapsule interface (``__arrow_c_stream__``). 400 401 If overwrite_filter is AlwaysTrue or not provided, truncates and inserts. 402 Otherwise, deletes matching rows then inserts. 403 404 Does not manage its own transaction — relies on the caller (or DuckDB 405 auto-commit) for atomicity. This allows overwrite to be used inside 406 an explicit :class:`Transaction` without nesting conflicts. 407 """ 408 arrow_table = self._to_arrow_table(df) 409 order = self._sort_order_clause() 410 conn = self._catalog.connection 411 conn.register("_pyducklake_tmp_overwrite", arrow_table) 412 try: 413 if isinstance(overwrite_filter, str): 414 where: str | None = overwrite_filter 415 elif overwrite_filter == AlwaysTrue(): 416 where = None 417 else: 418 where = overwrite_filter.to_sql() 419 420 fqn = self.fully_qualified_name 421 if where: 422 conn.execute(f"DELETE FROM {fqn} WHERE {where}") 423 else: 424 conn.execute(f"DELETE FROM {fqn}") 425 426 if order: 427 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite ORDER BY {order}") 428 else: 429 conn.execute(f"INSERT INTO {fqn} SELECT * FROM _pyducklake_tmp_overwrite") 430 finally: 431 conn.unregister("_pyducklake_tmp_overwrite")
Overwrite data matching the filter, then insert new data.
Accepts pyarrow.Table or any object implementing the Arrow
PyCapsule interface (__arrow_c_stream__).
If overwrite_filter is AlwaysTrue or not provided, truncates and inserts. Otherwise, deletes matching rows then inserts.
Does not manage its own transaction — relies on the caller (or DuckDB
auto-commit) for atomicity. This allows overwrite to be used inside
an explicit Transaction without nesting conflicts.
435 def delete(self, delete_filter: BooleanExpression | str) -> None: 436 """Delete rows matching the filter. 437 438 Args: 439 delete_filter: Rows matching this filter are deleted. 440 """ 441 if isinstance(delete_filter, str): 442 filter_sql = delete_filter 443 elif delete_filter == AlwaysFalse(): 444 return 445 elif delete_filter == AlwaysTrue(): 446 filter_sql = None 447 else: 448 filter_sql = delete_filter.to_sql() 449 450 fqn = self.fully_qualified_name 451 if filter_sql is None: 452 self._catalog.connection.execute(f"DELETE FROM {fqn}") 453 else: 454 self._catalog.connection.execute(f"DELETE FROM {fqn} WHERE {filter_sql}")
Delete rows matching the filter.
Args: delete_filter: Rows matching this filter are deleted.
458 def upsert( 459 self, 460 df: ArrowCompatible, 461 join_cols: tuple[str, ...] | list[str], 462 ) -> UpsertResult: 463 """Upsert data: update existing rows matching on join_cols, insert new rows. 464 465 Accepts ``pyarrow.Table`` or any object implementing the Arrow 466 PyCapsule interface (``__arrow_c_stream__``). 467 468 Uses DuckDB's MERGE statement. 469 """ 470 arrow_table = self._to_arrow_table(df) 471 conn = self._catalog.connection 472 fqn = self.fully_qualified_name 473 join_col_set = set(join_cols) 474 475 # Count rows before to determine updated vs inserted 476 count_before = self.scan().count() 477 478 conn.register("_pyducklake_tmp_upsert", arrow_table) 479 try: 480 on_clause = " AND ".join(f'target."{col}" = source."{col}"' for col in join_cols) 481 482 all_cols = [field.name for field in arrow_table.schema] # pyright: ignore[reportUnknownVariableType] 483 non_join_cols = [c for c in all_cols if c not in join_col_set] 484 485 update_set = ", ".join(f'"{col}" = source."{col}"' for col in non_join_cols) 486 487 insert_cols = ", ".join(f'source."{col}"' for col in all_cols) 488 489 merge_sql = f"MERGE INTO {fqn} AS target USING _pyducklake_tmp_upsert AS source ON {on_clause}" 490 if update_set: 491 merge_sql += f" WHEN MATCHED THEN UPDATE SET {update_set}" 492 merge_sql += f" WHEN NOT MATCHED THEN INSERT VALUES ({insert_cols})" 493 494 conn.execute(merge_sql) 495 finally: 496 conn.unregister("_pyducklake_tmp_upsert") 497 498 count_after = self.scan().count() 499 rows_inserted = count_after - count_before 500 rows_updated = arrow_table.num_rows - rows_inserted 501 502 return UpsertResult(rows_updated=rows_updated, rows_inserted=rows_inserted)
Upsert data: update existing rows matching on join_cols, insert new rows.
Accepts pyarrow.Table or any object implementing the Arrow
PyCapsule interface (__arrow_c_stream__).
Uses DuckDB's MERGE statement.
506 @property 507 def spec(self) -> PartitionSpec: 508 """Current partition spec. Returns UNPARTITIONED if not partitioned.""" 509 from pyducklake.catalog import escape_string_literal 510 from pyducklake.partitioning import ( 511 DAY, 512 HOUR, 513 IDENTITY, 514 MONTH, 515 UNPARTITIONED, 516 YEAR, 517 PartitionField, 518 PartitionSpec, 519 ) 520 521 catalog_name = self._catalog.name 522 meta_schema = f"__ducklake_metadata_{catalog_name}" 523 try: 524 rows: list[tuple[Any, ...]] = self._catalog.fetchall( 525 f"SELECT c.column_name, pc.transform " 526 f'FROM "{meta_schema}".ducklake_partition_column pc ' 527 f'JOIN "{meta_schema}".ducklake_partition_info pi ' 528 f"ON pc.partition_id = pi.partition_id " 529 f"AND pc.table_id = pi.table_id " 530 f'JOIN "{meta_schema}".ducklake_table t ' 531 f"ON pi.table_id = t.table_id " 532 f'JOIN "{meta_schema}".ducklake_schema s ' 533 f"ON t.schema_id = s.schema_id " 534 f'JOIN "{meta_schema}".ducklake_column c ' 535 f"ON pc.column_id = c.column_id " 536 f"AND pc.table_id = c.table_id " 537 f"WHERE t.table_name = '{escape_string_literal(self._identifier[1])}' " 538 f"AND s.schema_name = '{escape_string_literal(self._identifier[0])}' " 539 f"AND pi.end_snapshot IS NULL " 540 f"ORDER BY pc.partition_key_index" 541 ) 542 except (duckdb.CatalogException, duckdb.BinderException): 543 return UNPARTITIONED 544 545 if not rows: 546 return UNPARTITIONED 547 548 _transform_map = { 549 "identity": IDENTITY, 550 "year": YEAR, 551 "month": MONTH, 552 "day": DAY, 553 "hour": HOUR, 554 } 555 556 fields: list[PartitionField] = [] 557 for col_name, transform_name in rows: 558 transform = _transform_map.get(str(transform_name).lower(), IDENTITY) 559 fields.append(PartitionField(source_column=str(col_name), transform=transform)) 560 561 return PartitionSpec(*fields)
Current partition spec. Returns UNPARTITIONED if not partitioned.
563 def update_spec(self) -> UpdateSpec: 564 """Begin partition spec evolution. Returns an UpdateSpec builder.""" 565 from pyducklake.partitioning import UpdateSpec 566 567 return UpdateSpec(self)
Begin partition spec evolution. Returns an UpdateSpec builder.
571 @property 572 def sort_order(self) -> SortOrder: 573 """Current sort order. Returns UNSORTED if not sorted.""" 574 if self._sort_order_cache is not None: 575 return self._sort_order_cache 576 577 result = self._fetch_sort_order() 578 self._sort_order_cache = result 579 return result
Current sort order. Returns UNSORTED if not sorted.
632 def update_sort_order(self) -> UpdateSortOrder: 633 """Begin sort order evolution. Returns an UpdateSortOrder builder.""" 634 from pyducklake.sorting import UpdateSortOrder 635 636 return UpdateSortOrder(self)
Begin sort order evolution. Returns an UpdateSortOrder builder.
640 def to_arrow_dataset(self, *, snapshot_id: int | None = None) -> ds.Dataset: 641 """Return this table as a PyArrow Dataset. 642 643 Enables interop with engines that consume the PyArrow dataset API 644 (DuckDB, Polars, DataFusion, Dask, etc.). 645 646 Args: 647 snapshot_id: Optional snapshot for time travel. 648 649 Returns: 650 A pyarrow.dataset.Dataset wrapping this table's data. 651 """ 652 import pyarrow.dataset as ds 653 654 scan = self.scan(snapshot_id=snapshot_id) 655 return ds.dataset(scan.to_arrow()) # pyright: ignore[reportUnknownMemberType]
Return this table as a PyArrow Dataset.
Enables interop with engines that consume the PyArrow dataset API (DuckDB, Polars, DataFusion, Dask, etc.).
Args: snapshot_id: Optional snapshot for time travel.
Returns: A pyarrow.dataset.Dataset wrapping this table's data.
659 def inspect(self) -> InspectTable: 660 """Return an :class:`InspectTable` for metadata introspection.""" 661 from pyducklake.inspect import InspectTable 662 663 return InspectTable(self)
Return an InspectTable for metadata introspection.
667 def maintenance(self) -> MaintenanceTable: 668 """Get maintenance operations for this table.""" 669 from pyducklake.maintenance import MaintenanceTable 670 671 return MaintenanceTable(self)
Get maintenance operations for this table.
675 def table_changes( 676 self, 677 start_snapshot: int | None = None, 678 end_snapshot: int | None = None, 679 *, 680 start_time: datetime | None = None, 681 end_time: datetime | None = None, 682 columns: tuple[str, ...] | list[str] | None = None, 683 filter_expr: str | None = None, 684 ) -> ChangeSet: 685 """Query all changes between two snapshots or timestamps. 686 687 Returns a ChangeSet with inserts, deletes, and update pre/post images. 688 """ 689 from pyducklake.cdc import ChangeSet 690 691 return ChangeSet( 692 self._cdc_query( 693 "ducklake_table_changes", 694 start_snapshot, 695 end_snapshot, 696 start_time=start_time, 697 end_time=end_time, 698 columns=columns, 699 filter_expr=filter_expr, 700 meta_cols=("snapshot_id", "rowid", "change_type"), 701 ), 702 change_type_col="change_type", 703 )
Query all changes between two snapshots or timestamps.
Returns a ChangeSet with inserts, deletes, and update pre/post images.
705 def table_insertions( 706 self, 707 start_snapshot: int | None = None, 708 end_snapshot: int | None = None, 709 *, 710 start_time: datetime | None = None, 711 end_time: datetime | None = None, 712 columns: tuple[str, ...] | list[str] | None = None, 713 filter_expr: str | None = None, 714 ) -> ChangeSet: 715 """Query inserted rows between two snapshots or timestamps.""" 716 from pyducklake.cdc import ChangeSet 717 718 return ChangeSet( 719 self._cdc_query( 720 "ducklake_table_insertions", 721 start_snapshot, 722 end_snapshot, 723 start_time=start_time, 724 end_time=end_time, 725 columns=columns, 726 filter_expr=filter_expr, 727 meta_cols=(), 728 ), 729 change_type_col=None, 730 )
Query inserted rows between two snapshots or timestamps.
732 def table_deletions( 733 self, 734 start_snapshot: int | None = None, 735 end_snapshot: int | None = None, 736 *, 737 start_time: datetime | None = None, 738 end_time: datetime | None = None, 739 columns: tuple[str, ...] | list[str] | None = None, 740 filter_expr: str | None = None, 741 ) -> ChangeSet: 742 """Query deleted rows between two snapshots or timestamps.""" 743 from pyducklake.cdc import ChangeSet 744 745 return ChangeSet( 746 self._cdc_query( 747 "ducklake_table_deletions", 748 start_snapshot, 749 end_snapshot, 750 start_time=start_time, 751 end_time=end_time, 752 columns=columns, 753 filter_expr=filter_expr, 754 meta_cols=(), 755 ), 756 change_type_col=None, 757 )
Query deleted rows between two snapshots or timestamps.
27class TableAlreadyExistsError(DucklakeError): 28 """Raised when attempting to create a table that already exists."""
Raised when attempting to create a table that already exists.
22class View: 23 """Represents a Ducklake view.""" 24 25 def __init__( 26 self, 27 identifier: tuple[str, str], 28 schema: Schema, 29 sql: str, 30 catalog: Catalog, 31 ) -> None: 32 self._identifier = identifier 33 self._schema = schema 34 self._sql = sql 35 self._catalog = catalog 36 37 @property 38 def name(self) -> str: 39 """View name (without namespace).""" 40 return self._identifier[1] 41 42 @property 43 def namespace(self) -> str: 44 """Namespace (schema) name.""" 45 return self._identifier[0] 46 47 @property 48 def identifier(self) -> tuple[str, str]: 49 """(namespace, view_name) tuple.""" 50 return self._identifier 51 52 @property 53 def schema(self) -> Schema: 54 """View's output schema.""" 55 return self._schema 56 57 @property 58 def sql_text(self) -> str: 59 """The SQL definition of the view.""" 60 return self._sql 61 62 @property 63 def fully_qualified_name(self) -> str: 64 """catalog.namespace.view_name""" 65 return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1]) 66 67 @property 68 def catalog(self) -> Catalog: 69 """The catalog this view belongs to.""" 70 return self._catalog 71 72 def scan( 73 self, 74 row_filter: BooleanExpression | str = AlwaysTrue(), 75 selected_fields: tuple[str, ...] = ("*",), 76 limit: int | None = None, 77 ) -> DataScan: 78 """Create a scan on this view. 79 80 Works exactly like Table.scan() -- the view is queryable 81 with filters, projections, and limits. 82 """ 83 from pyducklake.scan import DataScan, RawSQL 84 85 if isinstance(row_filter, str): 86 actual_filter: BooleanExpression = RawSQL(row_filter) 87 else: 88 actual_filter = row_filter 89 90 return DataScan( 91 table=self, 92 row_filter=actual_filter, 93 selected_fields=selected_fields, 94 limit=limit, 95 ) 96 97 def to_arrow(self) -> pa.Table: 98 """Read the entire view as an Arrow table. Shorthand for scan().to_arrow().""" 99 return self.scan().to_arrow() 100 101 def to_pandas(self) -> pd.DataFrame: 102 """Read the entire view as a pandas DataFrame.""" 103 return self.scan().to_pandas() 104 105 def to_arrow_dataset(self) -> ds.Dataset: 106 """Return view results as a PyArrow Dataset.""" 107 return self.scan().to_arrow_dataset() 108 109 def refresh(self) -> View: 110 """Reload schema from catalog. Returns self.""" 111 self._schema = self._catalog.build_schema_from_describe(self._identifier[0], self._identifier[1]) 112 return self 113 114 def __repr__(self) -> str: 115 return f"View(identifier={self._identifier!r}, sql={self._sql!r})" 116 117 def __eq__(self, other: object) -> bool: 118 if not isinstance(other, View): 119 return NotImplemented 120 return self._identifier == other._identifier and self._catalog.name == other._catalog.name
Represents a Ducklake view.
37 @property 38 def name(self) -> str: 39 """View name (without namespace).""" 40 return self._identifier[1]
View name (without namespace).
42 @property 43 def namespace(self) -> str: 44 """Namespace (schema) name.""" 45 return self._identifier[0]
Namespace (schema) name.
47 @property 48 def identifier(self) -> tuple[str, str]: 49 """(namespace, view_name) tuple.""" 50 return self._identifier
(namespace, view_name) tuple.
57 @property 58 def sql_text(self) -> str: 59 """The SQL definition of the view.""" 60 return self._sql
The SQL definition of the view.
62 @property 63 def fully_qualified_name(self) -> str: 64 """catalog.namespace.view_name""" 65 return self._catalog.fully_qualified_name(self._identifier[0], self._identifier[1])
catalog.namespace.view_name
67 @property 68 def catalog(self) -> Catalog: 69 """The catalog this view belongs to.""" 70 return self._catalog
The catalog this view belongs to.
72 def scan( 73 self, 74 row_filter: BooleanExpression | str = AlwaysTrue(), 75 selected_fields: tuple[str, ...] = ("*",), 76 limit: int | None = None, 77 ) -> DataScan: 78 """Create a scan on this view. 79 80 Works exactly like Table.scan() -- the view is queryable 81 with filters, projections, and limits. 82 """ 83 from pyducklake.scan import DataScan, RawSQL 84 85 if isinstance(row_filter, str): 86 actual_filter: BooleanExpression = RawSQL(row_filter) 87 else: 88 actual_filter = row_filter 89 90 return DataScan( 91 table=self, 92 row_filter=actual_filter, 93 selected_fields=selected_fields, 94 limit=limit, 95 )
Create a scan on this view.
Works exactly like Table.scan() -- the view is queryable with filters, projections, and limits.
97 def to_arrow(self) -> pa.Table: 98 """Read the entire view as an Arrow table. Shorthand for scan().to_arrow().""" 99 return self.scan().to_arrow()
Read the entire view as an Arrow table. Shorthand for scan().to_arrow().
101 def to_pandas(self) -> pd.DataFrame: 102 """Read the entire view as a pandas DataFrame.""" 103 return self.scan().to_pandas()
Read the entire view as a pandas DataFrame.
51class ViewAlreadyExistsError(DucklakeError): 52 """Raised when attempting to create a view that already exists."""
Raised when attempting to create a view that already exists.
17class Transaction: 18 """Multi-operation transaction on a Ducklake catalog. 19 20 Groups multiple write operations (append, overwrite, delete, schema changes) 21 into a single atomic commit = single snapshot. 22 23 Usage: 24 with catalog.begin_transaction() as txn: 25 tbl = txn.load_table("my_table") 26 tbl.append(df1) 27 tbl.delete(EqualTo("status", "old")) 28 29 tbl2 = txn.load_table("other_table") 30 tbl2.append(df2) 31 # auto-commits on clean exit, rolls back on exception 32 33 Or without context manager: 34 txn = catalog.begin_transaction() 35 # ... operations ... 36 txn.commit() # or txn.rollback() 37 """ 38 39 def __init__(self, catalog: Catalog) -> None: 40 """Begin a transaction on the catalog's connection.""" 41 self._catalog = catalog 42 self._committed = False 43 self._rolled_back = False 44 self._catalog.connection.execute("BEGIN TRANSACTION") 45 46 def load_table(self, identifier: str | tuple[str, str]) -> Table: 47 """Load a table within this transaction context.""" 48 return self._catalog.load_table(identifier) 49 50 def commit(self) -> None: 51 """Commit the transaction.""" 52 if self._committed or self._rolled_back: 53 raise DucklakeError("Transaction already finalized") 54 self._catalog.connection.execute("COMMIT") 55 self._committed = True 56 57 def rollback(self) -> None: 58 """Roll back the transaction.""" 59 if self._committed or self._rolled_back: 60 raise DucklakeError("Transaction already finalized") 61 self._catalog.connection.execute("ROLLBACK") 62 self._rolled_back = True 63 64 @property 65 def is_active(self) -> bool: 66 """True if the transaction has not been committed or rolled back.""" 67 return not self._committed and not self._rolled_back 68 69 def __enter__(self) -> Transaction: 70 return self 71 72 def __exit__( 73 self, 74 exc_type: type[BaseException] | None, 75 exc_val: BaseException | None, 76 exc_tb: object, 77 ) -> None: 78 if self._committed or self._rolled_back: 79 return 80 if exc_type is not None: 81 self.rollback() 82 else: 83 self.commit()
Multi-operation transaction on a Ducklake catalog.
Groups multiple write operations (append, overwrite, delete, schema changes) into a single atomic commit = single snapshot.
Usage: with catalog.begin_transaction() as txn: tbl = txn.load_table("my_table") tbl.append(df1) tbl.delete(EqualTo("status", "old"))
tbl2 = txn.load_table("other_table")
tbl2.append(df2)
# auto-commits on clean exit, rolls back on exception
Or without context manager: txn = catalog.begin_transaction() # ... operations ... txn.commit() # or txn.rollback()
39 def __init__(self, catalog: Catalog) -> None: 40 """Begin a transaction on the catalog's connection.""" 41 self._catalog = catalog 42 self._committed = False 43 self._rolled_back = False 44 self._catalog.connection.execute("BEGIN TRANSACTION")
Begin a transaction on the catalog's connection.
46 def load_table(self, identifier: str | tuple[str, str]) -> Table: 47 """Load a table within this transaction context.""" 48 return self._catalog.load_table(identifier)
Load a table within this transaction context.
50 def commit(self) -> None: 51 """Commit the transaction.""" 52 if self._committed or self._rolled_back: 53 raise DucklakeError("Transaction already finalized") 54 self._catalog.connection.execute("COMMIT") 55 self._committed = True
Commit the transaction.
52class UpdateSchema: 53 """Builder for schema evolution operations. 54 55 Obtained via table.update_schema(). Collects changes, applies on commit(). 56 """ 57 58 __slots__ = ("_table", "_changes") 59 60 def __init__(self, table: Table) -> None: 61 self._table = table 62 self._changes: list[_SchemaChange] = [] 63 64 def add_column( 65 self, 66 name: str, 67 field_type: DucklakeType, 68 doc: str | None = None, 69 required: bool = False, 70 ) -> UpdateSchema: 71 """Add a column. Returns self for chaining.""" 72 self._changes.append(_AddColumn(name=name, field_type=field_type, doc=doc, required=required)) 73 return self 74 75 def drop_column(self, name: str) -> UpdateSchema: 76 """Drop a column. Returns self for chaining.""" 77 self._changes.append(_DropColumn(name=name)) 78 return self 79 80 def rename_column(self, name: str, new_name: str) -> UpdateSchema: 81 """Rename a column. Returns self for chaining.""" 82 self._changes.append(_RenameColumn(name=name, new_name=new_name)) 83 return self 84 85 def update_column(self, name: str, new_type: DucklakeType) -> UpdateSchema: 86 """Change column type (lossless promotions only). Returns self for chaining.""" 87 self._changes.append(_UpdateColumnType(name=name, new_type=new_type)) 88 return self 89 90 def set_nullability(self, name: str, required: bool) -> UpdateSchema: 91 """Set or drop NOT NULL. Returns self for chaining.""" 92 self._changes.append(_SetNullability(name=name, required=required)) 93 return self 94 95 def commit(self) -> None: 96 """Execute all pending schema changes as ALTER TABLE statements. 97 98 Refreshes the table schema afterward. 99 """ 100 fqn = self._table.fully_qualified_name 101 conn = self._table.catalog.connection 102 103 for change in self._changes: 104 sql = self._change_to_sql(fqn, change) 105 conn.execute(sql) 106 107 self._changes.clear() 108 self._table.refresh() 109 110 def _change_to_sql(self, fqn: str, change: _SchemaChange) -> str: 111 if isinstance(change, _AddColumn): 112 type_sql = ducklake_type_to_sql(change.field_type) 113 sql = f"ALTER TABLE {fqn} ADD COLUMN {quote_identifier(change.name)} {type_sql}" 114 if change.required: 115 sql += " NOT NULL" 116 return sql 117 if isinstance(change, _DropColumn): 118 return f"ALTER TABLE {fqn} DROP COLUMN {quote_identifier(change.name)}" 119 if isinstance(change, _RenameColumn): 120 return f"ALTER TABLE {fqn} RENAME {quote_identifier(change.name)} TO {quote_identifier(change.new_name)}" 121 if isinstance(change, _UpdateColumnType): 122 type_sql = ducklake_type_to_sql(change.new_type) 123 return f"ALTER TABLE {fqn} ALTER {quote_identifier(change.name)} SET TYPE {type_sql}" 124 # At this point, change must be _SetNullability (exhaustive match) 125 if change.required: 126 return f"ALTER TABLE {fqn} ALTER {quote_identifier(change.name)} SET NOT NULL" 127 return f"ALTER TABLE {fqn} ALTER {quote_identifier(change.name)} DROP NOT NULL" 128 129 # -- Context manager ------------------------------------------------------- 130 131 def __enter__(self) -> UpdateSchema: 132 return self 133 134 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 135 """Auto-commit on clean exit.""" 136 if exc_type is None: 137 self.commit()
Builder for schema evolution operations.
Obtained via table.update_schema(). Collects changes, applies on commit().
64 def add_column( 65 self, 66 name: str, 67 field_type: DucklakeType, 68 doc: str | None = None, 69 required: bool = False, 70 ) -> UpdateSchema: 71 """Add a column. Returns self for chaining.""" 72 self._changes.append(_AddColumn(name=name, field_type=field_type, doc=doc, required=required)) 73 return self
Add a column. Returns self for chaining.
75 def drop_column(self, name: str) -> UpdateSchema: 76 """Drop a column. Returns self for chaining.""" 77 self._changes.append(_DropColumn(name=name)) 78 return self
Drop a column. Returns self for chaining.
80 def rename_column(self, name: str, new_name: str) -> UpdateSchema: 81 """Rename a column. Returns self for chaining.""" 82 self._changes.append(_RenameColumn(name=name, new_name=new_name)) 83 return self
Rename a column. Returns self for chaining.
85 def update_column(self, name: str, new_type: DucklakeType) -> UpdateSchema: 86 """Change column type (lossless promotions only). Returns self for chaining.""" 87 self._changes.append(_UpdateColumnType(name=name, new_type=new_type)) 88 return self
Change column type (lossless promotions only). Returns self for chaining.
90 def set_nullability(self, name: str, required: bool) -> UpdateSchema: 91 """Set or drop NOT NULL. Returns self for chaining.""" 92 self._changes.append(_SetNullability(name=name, required=required)) 93 return self
Set or drop NOT NULL. Returns self for chaining.
95 def commit(self) -> None: 96 """Execute all pending schema changes as ALTER TABLE statements. 97 98 Refreshes the table schema afterward. 99 """ 100 fqn = self._table.fully_qualified_name 101 conn = self._table.catalog.connection 102 103 for change in self._changes: 104 sql = self._change_to_sql(fqn, change) 105 conn.execute(sql) 106 107 self._changes.clear() 108 self._table.refresh()
Execute all pending schema changes as ALTER TABLE statements.
Refreshes the table schema afterward.
83class UpdateSortOrder: 84 """Builder for sort order changes. Obtained via table.update_sort_order().""" 85 86 __slots__ = ("_table", "_fields", "_clear") 87 88 def __init__(self, table: Table) -> None: 89 self._table = table 90 self._fields: list[SortField] = [] 91 self._clear = False 92 93 def add_field( 94 self, 95 source_column: str, 96 direction: SortDirection = SortDirection.ASC, 97 null_order: NullOrder = NullOrder.NULLS_LAST, 98 ) -> UpdateSortOrder: 99 """Add a sort field. Returns self for chaining.""" 100 self._fields.append( 101 SortField( 102 source_column=source_column, 103 direction=direction, 104 null_order=null_order, 105 ) 106 ) 107 return self 108 109 def clear(self) -> UpdateSortOrder: 110 """Remove all sorting (RESET SORTED BY). Returns self.""" 111 self._clear = True 112 self._fields.clear() 113 return self 114 115 def commit(self) -> None: 116 """Apply sort order changes via ALTER TABLE.""" 117 fqn = self._table.fully_qualified_name 118 conn = self._table.catalog.connection 119 120 if self._clear: 121 conn.execute(f"ALTER TABLE {fqn} RESET SORTED BY") 122 elif self._fields: 123 parts = [f.to_sql() for f in self._fields] 124 sort_expr = ", ".join(parts) 125 conn.execute(f"ALTER TABLE {fqn} SET SORTED BY ({sort_expr})") 126 127 self._fields.clear() 128 self._clear = False 129 self._table.refresh() 130 131 # -- Context manager ------------------------------------------------------- 132 133 def __enter__(self) -> UpdateSortOrder: 134 return self 135 136 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 137 """Auto-commit on clean exit.""" 138 if exc_type is None: 139 self.commit()
Builder for sort order changes. Obtained via table.update_sort_order().
93 def add_field( 94 self, 95 source_column: str, 96 direction: SortDirection = SortDirection.ASC, 97 null_order: NullOrder = NullOrder.NULLS_LAST, 98 ) -> UpdateSortOrder: 99 """Add a sort field. Returns self for chaining.""" 100 self._fields.append( 101 SortField( 102 source_column=source_column, 103 direction=direction, 104 null_order=null_order, 105 ) 106 ) 107 return self
Add a sort field. Returns self for chaining.
109 def clear(self) -> UpdateSortOrder: 110 """Remove all sorting (RESET SORTED BY). Returns self.""" 111 self._clear = True 112 self._fields.clear() 113 return self
Remove all sorting (RESET SORTED BY). Returns self.
115 def commit(self) -> None: 116 """Apply sort order changes via ALTER TABLE.""" 117 fqn = self._table.fully_qualified_name 118 conn = self._table.catalog.connection 119 120 if self._clear: 121 conn.execute(f"ALTER TABLE {fqn} RESET SORTED BY") 122 elif self._fields: 123 parts = [f.to_sql() for f in self._fields] 124 sort_expr = ", ".join(parts) 125 conn.execute(f"ALTER TABLE {fqn} SET SORTED BY ({sort_expr})") 126 127 self._fields.clear() 128 self._clear = False 129 self._table.refresh()
Apply sort order changes via ALTER TABLE.
187class UpdateSpec: 188 """Builder for partition spec changes. 189 190 Obtained via table.update_spec(). 191 """ 192 193 __slots__ = ("_table", "_fields", "_clear") 194 195 def __init__(self, table: Table) -> None: 196 self._table = table 197 self._fields: list[PartitionField] = [] 198 self._clear = False 199 200 def add_field( 201 self, 202 source_column: str, 203 transform: Transform = IDENTITY, 204 ) -> UpdateSpec: 205 """Add a partition field.""" 206 self._fields.append(PartitionField(source_column=source_column, transform=transform)) 207 return self 208 209 def clear(self) -> UpdateSpec: 210 """Remove all partitioning (RESET PARTITIONED BY).""" 211 self._clear = True 212 self._fields.clear() 213 return self 214 215 def commit(self) -> None: 216 """Apply partition changes via ALTER TABLE.""" 217 fqn = self._table.fully_qualified_name 218 conn = self._table.catalog.connection 219 220 if self._clear: 221 conn.execute(f"ALTER TABLE {fqn} RESET PARTITIONED BY") 222 elif self._fields: 223 parts: list[str] = [] 224 for field in self._fields: 225 col = _quote_identifier(field.source_column) 226 func = field.transform.to_sql() 227 if func: 228 parts.append(f"{func}({col})") 229 else: 230 parts.append(col) 231 partition_expr = ", ".join(parts) 232 conn.execute(f"ALTER TABLE {fqn} SET PARTITIONED BY ({partition_expr})") 233 234 self._fields.clear() 235 self._clear = False 236 self._table.refresh() 237 238 # -- Context manager ------------------------------------------------------- 239 240 def __enter__(self) -> UpdateSpec: 241 return self 242 243 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 244 """Auto-commit on clean exit.""" 245 if exc_type is None: 246 self.commit()
Builder for partition spec changes.
Obtained via table.update_spec().
200 def add_field( 201 self, 202 source_column: str, 203 transform: Transform = IDENTITY, 204 ) -> UpdateSpec: 205 """Add a partition field.""" 206 self._fields.append(PartitionField(source_column=source_column, transform=transform)) 207 return self
Add a partition field.
209 def clear(self) -> UpdateSpec: 210 """Remove all partitioning (RESET PARTITIONED BY).""" 211 self._clear = True 212 self._fields.clear() 213 return self
Remove all partitioning (RESET PARTITIONED BY).
215 def commit(self) -> None: 216 """Apply partition changes via ALTER TABLE.""" 217 fqn = self._table.fully_qualified_name 218 conn = self._table.catalog.connection 219 220 if self._clear: 221 conn.execute(f"ALTER TABLE {fqn} RESET PARTITIONED BY") 222 elif self._fields: 223 parts: list[str] = [] 224 for field in self._fields: 225 col = _quote_identifier(field.source_column) 226 func = field.transform.to_sql() 227 if func: 228 parts.append(f"{func}({col})") 229 else: 230 parts.append(col) 231 partition_expr = ", ".join(parts) 232 conn.execute(f"ALTER TABLE {fqn} SET PARTITIONED BY ({partition_expr})") 233 234 self._fields.clear() 235 self._clear = False 236 self._table.refresh()
Apply partition changes via ALTER TABLE.
45@dataclass(frozen=True) 46class UpsertResult: 47 """Result of an upsert operation.""" 48 49 rows_updated: int 50 rows_inserted: int
Result of an upsert operation.
30class NullOrder(Enum): 31 """Null ordering for a sort field.""" 32 33 NULLS_FIRST = "NULLS FIRST" 34 NULLS_LAST = "NULLS LAST"
Null ordering for a sort field.
23class SortDirection(Enum): 24 """Sort direction for a sort field.""" 25 26 ASC = "ASC" 27 DESC = "DESC"
Sort direction for a sort field.
42@dataclass(frozen=True) 43class SortField: 44 """A single sort field.""" 45 46 source_column: str 47 direction: SortDirection = SortDirection.ASC 48 null_order: NullOrder = NullOrder.NULLS_LAST 49 50 def to_sql(self) -> str: 51 """E.g. '"col_name" ASC NULLS LAST'""" 52 return f"{_quote_identifier(self.source_column)} {self.direction.value} {self.null_order.value}"
A single sort field.
55@dataclass(frozen=True) 56class SortOrder: 57 """Sort order specification for a table.""" 58 59 fields: tuple[SortField, ...] = () 60 61 @property 62 def is_unsorted(self) -> bool: 63 return len(self.fields) == 0 64 65 def __repr__(self) -> str: 66 if self.is_unsorted: 67 return "SortOrder(UNSORTED)" 68 fields_repr = ", ".join(f.to_sql() for f in self.fields) 69 return f"SortOrder({fields_repr})" 70 71 def __eq__(self, other: object) -> bool: 72 if not isinstance(other, SortOrder): 73 return NotImplemented 74 return self.fields == other.fields 75 76 def __hash__(self) -> int: 77 return hash(self.fields)
Sort order specification for a table.
95class DayTransform(Transform): 96 """Extract day from a date/timestamp column.""" 97 98 __slots__ = () 99 100 def to_sql(self) -> str: 101 return "day" 102 103 def __repr__(self) -> str: 104 return "DayTransform()"
Extract day from a date/timestamp column.
107class HourTransform(Transform): 108 """Extract hour from a timestamp column.""" 109 110 __slots__ = () 111 112 def to_sql(self) -> str: 113 return "hour" 114 115 def __repr__(self) -> str: 116 return "HourTransform()"
Extract hour from a timestamp column.
59class IdentityTransform(Transform): 60 """Identity transform -- partition by raw column value.""" 61 62 __slots__ = () 63 64 def to_sql(self) -> str: 65 return "" 66 67 def __repr__(self) -> str: 68 return "IdentityTransform()"
Identity transform -- partition by raw column value.
83class MonthTransform(Transform): 84 """Extract month from a date/timestamp column.""" 85 86 __slots__ = () 87 88 def to_sql(self) -> str: 89 return "month" 90 91 def __repr__(self) -> str: 92 return "MonthTransform()"
Extract month from a date/timestamp column.
132@dataclass(frozen=True) 133class PartitionField: 134 """A single partition field: a source column plus a transform.""" 135 136 source_column: str 137 transform: Transform
A single partition field: a source column plus a transform.
140class PartitionSpec: 141 """Partition specification for a table.""" 142 143 __slots__ = ("_fields",) 144 145 def __init__(self, *fields: PartitionField) -> None: 146 self._fields = fields 147 148 @property 149 def fields(self) -> tuple[PartitionField, ...]: 150 return self._fields 151 152 @property 153 def is_unpartitioned(self) -> bool: 154 return len(self._fields) == 0 155 156 def __repr__(self) -> str: 157 if self.is_unpartitioned: 158 return "PartitionSpec(UNPARTITIONED)" 159 fields_repr = ", ".join( 160 f"{f.transform.to_sql()}({f.source_column!r})" if f.transform.to_sql() else f.source_column 161 for f in self._fields 162 ) 163 return f"PartitionSpec({fields_repr})" 164 165 def __eq__(self, other: object) -> bool: 166 if not isinstance(other, PartitionSpec): 167 return NotImplemented 168 return self._fields == other._fields 169 170 def __hash__(self) -> int: 171 return hash(self._fields)
Partition specification for a table.
37class Transform(ABC): 38 """Base for partition transforms.""" 39 40 __slots__ = () 41 42 @abstractmethod 43 def to_sql(self) -> str: 44 """Return SQL function name for use in SET PARTITIONED BY. 45 46 Identity returns empty string (no function wrapper). 47 """ 48 49 def __eq__(self, other: object) -> bool: 50 return type(self) is type(other) 51 52 def __hash__(self) -> int: 53 return hash(type(self)) 54 55 @abstractmethod 56 def __repr__(self) -> str: ...
Base for partition transforms.
42 @abstractmethod 43 def to_sql(self) -> str: 44 """Return SQL function name for use in SET PARTITIONED BY. 45 46 Identity returns empty string (no function wrapper). 47 """
Return SQL function name for use in SET PARTITIONED BY.
Identity returns empty string (no function wrapper).
71class YearTransform(Transform): 72 """Extract year from a date/timestamp column.""" 73 74 __slots__ = () 75 76 def to_sql(self) -> str: 77 return "year" 78 79 def __repr__(self) -> str: 80 return "YearTransform()"
Extract year from a date/timestamp column.
135class BigIntType(PrimitiveType): 136 __slots__ = () 137 138 @property 139 def _name(self) -> str: 140 return "BigIntType"
Base for singleton primitive types.
207class BinaryType(PrimitiveType): 208 __slots__ = () 209 210 @property 211 def _name(self) -> str: 212 return "BinaryType"
Base for singleton primitive types.
103class BooleanType(PrimitiveType): 104 __slots__ = () 105 106 @property 107 def _name(self) -> str: 108 return "BooleanType"
Base for singleton primitive types.
215class DateType(PrimitiveType): 216 __slots__ = () 217 218 @property 219 def _name(self) -> str: 220 return "DateType"
Base for singleton primitive types.
276@dataclass(frozen=True, slots=True) 277class DecimalType(DucklakeType): 278 """DECIMAL(precision, scale).""" 279 280 precision: int 281 scale: int 282 283 def __post_init__(self) -> None: 284 if self.precision < 1 or self.precision > 38: 285 raise ValueError(f"Decimal precision must be between 1 and 38, got {self.precision}") 286 if self.scale < 0: 287 raise ValueError(f"Decimal scale must be non-negative, got {self.scale}") 288 if self.scale > self.precision: 289 raise ValueError(f"Decimal scale ({self.scale}) cannot exceed precision ({self.precision})") 290 291 def __repr__(self) -> str: 292 return f"DecimalType(precision={self.precision}, scale={self.scale})"
DECIMAL(precision, scale).
191class DoubleType(PrimitiveType): 192 __slots__ = () 193 194 @property 195 def _name(self) -> str: 196 return "DoubleType"
Base for singleton primitive types.
58class DucklakeType(ABC): 59 """Abstract base class for all Ducklake types.""" 60 61 __slots__ = () 62 63 @abstractmethod 64 def __repr__(self) -> str: ... 65 66 def __str__(self) -> str: 67 return repr(self)
Abstract base class for all Ducklake types.
183class FloatType(PrimitiveType): 184 __slots__ = () 185 186 @property 187 def _name(self) -> str: 188 return "FloatType"
Base for singleton primitive types.
143class HugeIntType(PrimitiveType): 144 __slots__ = () 145 146 @property 147 def _name(self) -> str: 148 return "HugeIntType"
Base for singleton primitive types.
127class IntegerType(PrimitiveType): 128 __slots__ = () 129 130 @property 131 def _name(self) -> str: 132 return "IntegerType"
Base for singleton primitive types.
263class IntervalType(PrimitiveType): 264 __slots__ = () 265 266 @property 267 def _name(self) -> str: 268 return "IntervalType"
Base for singleton primitive types.
255class JSONType(PrimitiveType): 256 __slots__ = () 257 258 @property 259 def _name(self) -> str: 260 return "JSONType"
Base for singleton primitive types.
329@dataclass(frozen=True, slots=True) 330class ListType(DucklakeType): 331 """LIST type with a typed element.""" 332 333 element_id: int 334 element_type: DucklakeType 335 element_required: bool = True 336 337 def __repr__(self) -> str: 338 return ( 339 f"ListType(element_id={self.element_id}, element_type={self.element_type}, " 340 f"element_required={self.element_required})" 341 )
LIST type with a typed element.
344@dataclass(frozen=True, slots=True) 345class MapType(DucklakeType): 346 """MAP type with typed key and value.""" 347 348 key_id: int 349 key_type: DucklakeType 350 value_id: int 351 value_type: DucklakeType 352 value_required: bool = True 353 354 def __repr__(self) -> str: 355 return ( 356 f"MapType(key_id={self.key_id}, key_type={self.key_type}, " 357 f"value_id={self.value_id}, value_type={self.value_type}, " 358 f"value_required={self.value_required})" 359 )
MAP type with typed key and value.
300@dataclass(frozen=True, slots=True) 301class NestedField: 302 """A named field within a struct, with an ID and optional documentation.""" 303 304 field_id: int 305 name: str 306 field_type: DucklakeType 307 required: bool = False 308 doc: str | None = None 309 310 def __repr__(self) -> str: 311 opt = "required" if self.required else "optional" 312 return f"NestedField(field_id={self.field_id}, name={self.name!r}, type={self.field_type}, {opt})"
A named field within a struct, with an ID and optional documentation.
119class SmallIntType(PrimitiveType): 120 __slots__ = () 121 122 @property 123 def _name(self) -> str: 124 return "SmallIntType"
Base for singleton primitive types.
199class StringType(PrimitiveType): 200 __slots__ = () 201 202 @property 203 def _name(self) -> str: 204 return "StringType"
Base for singleton primitive types.
315@dataclass(frozen=True, slots=True) 316class StructType(DucklakeType): 317 """STRUCT type containing named fields.""" 318 319 fields: tuple[NestedField, ...] 320 321 def __iter__(self) -> Iterator[NestedField]: 322 return iter(self.fields) 323 324 def __repr__(self) -> str: 325 field_strs = ", ".join(repr(f) for f in self.fields) 326 return f"StructType(fields=({field_strs}))"
STRUCT type containing named fields.
223class TimeType(PrimitiveType): 224 __slots__ = () 225 226 @property 227 def _name(self) -> str: 228 return "TimeType"
Base for singleton primitive types.
239class TimestampTZType(PrimitiveType): 240 __slots__ = () 241 242 @property 243 def _name(self) -> str: 244 return "TimestampTZType"
Base for singleton primitive types.
231class TimestampType(PrimitiveType): 232 __slots__ = () 233 234 @property 235 def _name(self) -> str: 236 return "TimestampType"
Base for singleton primitive types.
111class TinyIntType(PrimitiveType): 112 __slots__ = () 113 114 @property 115 def _name(self) -> str: 116 return "TinyIntType"
Base for singleton primitive types.
175class UBigIntType(PrimitiveType): 176 __slots__ = () 177 178 @property 179 def _name(self) -> str: 180 return "UBigIntType"
Base for singleton primitive types.
167class UIntegerType(PrimitiveType): 168 __slots__ = () 169 170 @property 171 def _name(self) -> str: 172 return "UIntegerType"
Base for singleton primitive types.
159class USmallIntType(PrimitiveType): 160 __slots__ = () 161 162 @property 163 def _name(self) -> str: 164 return "USmallIntType"
Base for singleton primitive types.
151class UTinyIntType(PrimitiveType): 152 __slots__ = () 153 154 @property 155 def _name(self) -> str: 156 return "UTinyIntType"
Base for singleton primitive types.
247class UUIDType(PrimitiveType): 248 __slots__ = () 249 250 @property 251 def _name(self) -> str: 252 return "UUIDType"
Base for singleton primitive types.
441def arrow_type_to_ducklake(t: pa.DataType) -> DucklakeType: 442 """Convert a PyArrow DataType to a DucklakeType.""" 443 direct = _ARROW_TO_PRIMITIVE.get(t) 444 if direct is not None: 445 return direct 446 447 if isinstance(t, pa.Decimal128Type): 448 prec: int = cast(int, t.precision) 449 sc: int = cast(int, t.scale) 450 if prec == 38 and sc == 0: 451 return HugeIntType() 452 return DecimalType(prec, sc) 453 454 if isinstance(t, pa.TimestampType): 455 unit: str = cast(str, t.unit) 456 tz: str | None = cast("str | None", t.tz) 457 if unit != "us": 458 raise TypeError(f"Unsupported timestamp unit '{unit}': only 'us' (microsecond) is supported") 459 if tz is not None and tz != "UTC": 460 raise TypeError(f"Unsupported timezone '{tz}': only UTC is supported") 461 if tz is None: 462 return TimestampType() 463 return TimestampTZType() 464 465 if isinstance(t, pa.ListType): 466 elem_type = arrow_type_to_ducklake(t.value_type) 467 return ListType(element_id=0, element_type=elem_type, element_required=not t.value_field.nullable) 468 469 if isinstance(t, pa.MapType): 470 key_type = arrow_type_to_ducklake(t.key_type) 471 value_type = arrow_type_to_ducklake(t.item_type) 472 return MapType( 473 key_id=0, 474 key_type=key_type, 475 value_id=0, 476 value_type=value_type, 477 value_required=not t.item_field.nullable, 478 ) 479 480 if isinstance(t, pa.StructType): 481 fields = tuple( 482 NestedField( 483 field_id=i, 484 name=t.field(i).name, 485 field_type=arrow_type_to_ducklake(t.field(i).type), 486 required=not t.field(i).nullable, 487 ) 488 for i in range(t.num_fields) 489 ) 490 return StructType(fields) 491 492 raise TypeError(f"Cannot convert Arrow type {t} to DucklakeType")
Convert a PyArrow DataType to a DucklakeType.
391def ducklake_type_to_arrow(t: DucklakeType) -> pa.DataType: 392 """Convert a DucklakeType to a PyArrow DataType.""" 393 if isinstance(t, PrimitiveType): 394 arrow_type = _PRIMITIVE_TO_ARROW.get(type(t)) 395 if arrow_type is not None: 396 return arrow_type 397 398 if isinstance(t, DecimalType): 399 return pa.decimal128(t.precision, t.scale) 400 401 if isinstance(t, ListType): 402 element_arrow = ducklake_type_to_arrow(t.element_type) 403 return pa.list_(pa.field("element", element_arrow, nullable=not t.element_required)) 404 405 if isinstance(t, MapType): 406 key_arrow = ducklake_type_to_arrow(t.key_type) 407 value_arrow = ducklake_type_to_arrow(t.value_type) 408 return pa.map_(key_arrow, pa.field("value", value_arrow, nullable=not t.value_required)) # type: ignore[call-overload, no-any-return] 409 410 if isinstance(t, StructType): 411 fields = [pa.field(f.name, ducklake_type_to_arrow(f.field_type), nullable=not f.required) for f in t.fields] 412 return pa.struct(fields) 413 414 raise TypeError(f"Cannot convert {t} to Arrow type")
Convert a DucklakeType to a PyArrow DataType.
524def ducklake_type_to_sql(t: DucklakeType) -> str: 525 """Convert a DucklakeType to a DuckDB SQL type string.""" 526 if isinstance(t, PrimitiveType): 527 sql = _PRIMITIVE_TO_SQL.get(type(t)) 528 if sql is not None: 529 return sql 530 531 if isinstance(t, DecimalType): 532 return f"DECIMAL({t.precision}, {t.scale})" 533 534 if isinstance(t, ListType): 535 return f"{ducklake_type_to_sql(t.element_type)}[]" 536 537 if isinstance(t, MapType): 538 return f"MAP({ducklake_type_to_sql(t.key_type)}, {ducklake_type_to_sql(t.value_type)})" 539 540 if isinstance(t, StructType): 541 field_strs = ", ".join( 542 f'"{f.name.replace(chr(34), chr(34) + chr(34))}" {ducklake_type_to_sql(f.field_type)}' for f in t.fields 543 ) 544 return f"STRUCT({field_strs})" 545 546 raise TypeError(f"Cannot convert {t} to SQL string")
Convert a DucklakeType to a DuckDB SQL type string.