✅ Installation

pip install datafruit

Step 1: Define Base Tables

Create a simple schema using Python classes. This file isn’t executed—just declarative config.

# models.py
import datafruit as dft
from typing import Optional

class users(dft.Table):
    id: Optional[int] = dft.Field(primary_key=True)
    name: str
    email: str
    is_active: bool = True

class orders(dft.Table):
    id: Optional[int] = dft.Field(primary_key=True)
    user_id: int = dft.Field(foreign_key="users.id")
    amount: float
    status: str = "pending"

databases = {
    "warehouse": dft.PostgresDB("postgresql://localhost/warehouse", tables=[users, orders])
}

dft.export(databases=databases)
$ dft plan

Planning schema changes...
Target: warehouse

Datafruit will perform the following actions:

+ Table: orders
 + Add column id (INTEGER)
 + Add column user_id (INTEGER)
 + Add column amount (FLOAT)
 + Add column status (VARCHAR)

+ Table: users
 + Add column id (INTEGER)
 + Add column name (VARCHAR)
 + Add column email (VARCHAR)
 + Add column is_active (BOOLEAN)

 Plan saved to .dft/plan.json
Run 'dft apply' to apply these changes.
$ dft apply

Applying schema changes...
 Successfully applied changes to 'warehouse'

Step 2: Add Views and Models with @query

Use @query(db=..., type="") to define transformations:

materialization typemeaning
viewStored as a SQL view
tableMaterialized and written to a table
# models.py (continued)
db = databases["warehouse"]

@dft.query(db=db, type="view")
def active_users():
    return f"SELECT * FROM {users} WHERE is_active = true"

@dft.query(db=db, type="view")
def completed_orders():
    return f"""
    SELECT * FROM {orders}
    WHERE status = 'completed' AND amount > 0
    """

@dft.query(db=db, type="table")
def user_stats():
    return f"""
    SELECT 
        u.id AS user_id,
        COUNT(o.id) AS total_orders,
        SUM(o.amount) AS total_spent,
        AVG(o.amount) AS avg_order_value
    FROM {active_users} u
    LEFT JOIN {completed_orders} o ON u.id = o.user_id
    GROUP BY u.id
    """

And define the table schema for user_stats:

class user_stats(dft.Table):
    user_id: int = dft.Field(primary_key=True)
    total_orders: int
    total_spent: float
    avg_order_value: float

Step 3: Plan, Apply, Run

$ dft plan

Planning schema changes...
Target: warehouse

+ View: active_users
+ View: completed_orders
+ Table: user_stats
 + Add column user_id (INTEGER)
 + Add column total_orders (INTEGER)
 + Add column total_spent (FLOAT)
 + Add column avg_order_value (FLOAT)

 Plan saved to .dft/plan.json
$ dft apply

Applying schema changes...
 Created views: active_users, completed_orders
 Created table: user_stats
 Schema synced
$ dft run user_stats

 Executing DAG:
 active_users (view)
 completed_orders (view)
 user_stats (table)

 user_stats completed (1,247 rows)
 Data written to table: user_stats

🧠 Auto-Suggest Missing Schema

If you forget the user_stats class:

$ dft plan

 Computation graph valid

⚠️ Table 'user_stats' is declared as persistent, but has no schema defined.

Suggested schema:

class user_stats(dft.Table):
    user_id: int = dft.Field(primary_key=True)
    total_orders: int
    total_spent: float
    avg_order_value: float

Add this to your models and re-run:
  dft plan
  dft apply

🛑 Safe by Default: Detecting Breakage

If you rename a column in users:

class users(dft.Table):
    id: Optional[int] = dft.Field(primary_key=True)
    name: str
    email_address: str  # renamed from 'email'
    is_active: bool = True
$ dft plan

~ Table: users
 ~ Rename: email email_address

BREAKING CHANGE DETECTED:

× View 'active_users' references users.email
× Query 'user_stats' depends on active_users

Fix downstream queries before applying.