✅ Installation
Step 1: Define Base Tables
Create a simple schema using Python classes. This file isn’t executed—just declarative config.
# models.py
import datafruit as dft
from typing import Optional
class users(dft.Table):
id: Optional[int] = dft.Field(primary_key=True)
name: str
email: str
is_active: bool = True
class orders(dft.Table):
id: Optional[int] = dft.Field(primary_key=True)
user_id: int = dft.Field(foreign_key="users.id")
amount: float
status: str = "pending"
databases = {
"warehouse": dft.PostgresDB("postgresql://localhost/warehouse", tables=[users, orders])
}
dft.export(databases=databases)
$ dft plan
Planning schema changes...
Target: warehouse
Datafruit will perform the following actions:
+ Table: orders
│ + Add column id (INTEGER)
│ + Add column user_id (INTEGER)
│ + Add column amount (FLOAT)
│ + Add column status (VARCHAR)
+ Table: users
│ + Add column id (INTEGER)
│ + Add column name (VARCHAR)
│ + Add column email (VARCHAR)
│ + Add column is_active (BOOLEAN)
✓ Plan saved to .dft/plan.json
Run 'dft apply' to apply these changes.
$ dft apply
Applying schema changes...
✓ Successfully applied changes to 'warehouse'
Step 2: Add Views and Models with @query
Use @query(db=..., type="")
to define transformations:
materialization type | meaning |
---|
view | Stored as a SQL view |
table | Materialized and written to a table |
# models.py (continued)
db = databases["warehouse"]
@dft.query(db=db, type="view")
def active_users():
return f"SELECT * FROM {users} WHERE is_active = true"
@dft.query(db=db, type="view")
def completed_orders():
return f"""
SELECT * FROM {orders}
WHERE status = 'completed' AND amount > 0
"""
@dft.query(db=db, type="table")
def user_stats():
return f"""
SELECT
u.id AS user_id,
COUNT(o.id) AS total_orders,
SUM(o.amount) AS total_spent,
AVG(o.amount) AS avg_order_value
FROM {active_users} u
LEFT JOIN {completed_orders} o ON u.id = o.user_id
GROUP BY u.id
"""
And define the table schema for user_stats
:
class user_stats(dft.Table):
user_id: int = dft.Field(primary_key=True)
total_orders: int
total_spent: float
avg_order_value: float
Step 3: Plan, Apply, Run
$ dft plan
Planning schema changes...
Target: warehouse
+ View: active_users
+ View: completed_orders
+ Table: user_stats
│ + Add column user_id (INTEGER)
│ + Add column total_orders (INTEGER)
│ + Add column total_spent (FLOAT)
│ + Add column avg_order_value (FLOAT)
✓ Plan saved to .dft/plan.json
$ dft apply
Applying schema changes...
✓ Created views: active_users, completed_orders
✓ Created table: user_stats
✓ Schema synced
$ dft run user_stats
→ Executing DAG:
• active_users (view)
• completed_orders (view)
• user_stats (table)
✓ user_stats completed (1,247 rows)
✓ Data written to table: user_stats
🧠 Auto-Suggest Missing Schema
If you forget the user_stats
class:
$ dft plan
✓ Computation graph valid
⚠️ Table 'user_stats' is declared as persistent, but has no schema defined.
Suggested schema:
class user_stats(dft.Table):
user_id: int = dft.Field(primary_key=True)
total_orders: int
total_spent: float
avg_order_value: float
Add this to your models and re-run:
dft plan
dft apply
🛑 Safe by Default: Detecting Breakage
If you rename a column in users
:
class users(dft.Table):
id: Optional[int] = dft.Field(primary_key=True)
name: str
email_address: str # renamed from 'email'
is_active: bool = True
$ dft plan
~ Table: users
│ ~ Rename: email → email_address
BREAKING CHANGE DETECTED:
× View 'active_users' references users.email
× Query 'user_stats' depends on active_users
Fix downstream queries before applying.