Skip to content

Database

Everything that touches PostgreSQL/SQLite via SQLAlchemy 2.0 async — base model, async repository, mixins, migrations, cursor pagination.

Audit & soft-delete mixins

SoftDeleteMixin adds a deleted_at timestamp column with mark_deleted() / mark_restored() / is_deleted helpers. AuditMixin adds created_by / updated_by UUID columns with stamp_created_by(user_id) / stamp_updated_by(user_id) helpers. Mix them in alongside BaseModel:

# src/db/models/user.py
from sqlalchemy.orm import Mapped, mapped_column

from tempest_fastapi_sdk import AuditMixin, BaseModel, SoftDeleteMixin


class UserModel(BaseModel, SoftDeleteMixin, AuditMixin):
    """Users — soft-deletable and audited."""

    name: Mapped[str] = mapped_column()
    email: Mapped[str] = mapped_column(unique=True)
    password_hash: Mapped[str] = mapped_column()

Filtering is the caller's responsibility — the mixin doesn't install a global filter. Hide soft-deleted rows from list endpoints by passing deleted_at=None (or filtering in your repository subclass). Stamping audit columns belongs to the service layer where the current user is in scope. Both patterns live inside the service:

# src/services/user.py
from uuid import UUID

from tempest_fastapi_sdk import BaseService

from src.db.repositories import UserRepository
from src.schemas import UserResponse, UserUpdateSchema


class UserService(BaseService[UserRepository, UserResponse]):
    """Business logic for the user domain."""

    # ──────── soft-delete-aware read ────────

    async def list_alive(self) -> list[UserResponse]:
        """Return only rows where ``deleted_at IS NULL``.

        ``BaseRepository._apply_filters`` skips ``None`` values by design
        (a missing filter ≠ ``WHERE col IS NULL``), so an ``IS NULL`` clause
        must be issued as a raw SQLAlchemy query bound to the same session.
        """
        from sqlalchemy import select

        result = await self.repository.session.execute(
            select(UserModel).where(UserModel.deleted_at.is_(None))
        )
        instances = result.scalars().all()
        return [self.repository.map_to_response(i) for i in instances]

    # ──────── audit-stamped update ────────

    async def update(
        self,
        user_id: UUID,
        data: UserUpdateSchema,
        *,
        actor_id: UUID,
    ) -> UserResponse:
        """Apply a partial update and stamp ``updated_by`` with the actor."""
        instance = await self.repository.get_by_id(user_id)
        instance.update_from_dict(data.model_dump(exclude_unset=True))
        instance.stamp_updated_by(actor_id)
        updated = await self.repository.update(instance)
        return self.repository.map_to_response(updated)

The two highlighted methods under the divider comments are the only soft-delete- and audit-specific code the consumer writes — the columns and helpers (mark_deleted / mark_restored / stamp_updated_by) come from the mixins.

Use the mixin's helpers (mark_deleted / mark_restored) when you want the deleted_at semantics; use BaseRepository.soft_delete(id) when the existing is_active flag is enough.

Cursor pagination

Cursor pagination scales better than offset pagination on big tables (no COUNT(*), stable under concurrent inserts) at the cost of losing random-access. The SDK provides CursorPaginationFilterSchema, CursorPaginationSchema[T] and the opaque encode_cursor / decode_cursor helpers.

# src/schemas/user.py
from tempest_fastapi_sdk import CursorPaginationFilterSchema, CursorPaginationSchema

from src.schemas.user import UserResponse


class UserCursorFilter(CursorPaginationFilterSchema):
    name: str | None = None  # ILIKE %value% via repository convention


UserCursorPage = CursorPaginationSchema[UserResponse]

Repository helper (cursor over created_at + id tie-break):

# src/db/repositories/user.py
from datetime import datetime
from typing import Any
from uuid import UUID

from sqlalchemy import asc, desc, select, tuple_
from sqlalchemy.ext.asyncio import AsyncSession

from tempest_fastapi_sdk import BaseRepository, decode_cursor, encode_cursor

from src.db.models.user import UserModel
from src.schemas.user import UserResponse


class UserRepository(BaseRepository[UserModel]):
    def __init__(self, session: AsyncSession) -> None:
        super().__init__(session, model=UserModel)

    async def cursor_page(
        self,
        *,
        cursor: str | None,
        limit: int,
        ascending: bool,
        filters: dict[str, Any] | None = None,
    ) -> UserCursorPage:
        query = select(UserModel)
        if filters:
            query = self._apply_filters(query, filters)

        order = asc if ascending else desc
        query = query.order_by(order(UserModel.created_at), order(UserModel.id))

        if cursor is not None:
            state = decode_cursor(cursor)
            # The cursor was encoded as `created_at.isoformat()` — decode back
            # to a datetime so the tuple comparison stays type-consistent on
            # Postgres (which rejects str-vs-timestamp comparisons).
            cursor_ts = datetime.fromisoformat(state["value"])
            cursor_id = UUID(state["id"])
            # SQLAlchemy needs `tuple_()` to express (col_a, col_b) > (val_a, val_b)
            # — bare Python tuples on mapped columns raise at composition time.
            cmp = tuple_(UserModel.created_at, UserModel.id) > tuple_(
                cursor_ts, cursor_id
            )
            query = query.where(cmp if ascending else ~cmp)

        query = query.limit(limit + 1)  # peek one ahead to set has_more
        result = await self.session.execute(query)
        rows = list(result.unique().scalars().all())
        has_more = len(rows) > limit
        rows = rows[:limit]
        next_cursor = (
            encode_cursor(
                {"id": str(rows[-1].id), "value": rows[-1].created_at.isoformat()},
            )
            if has_more and rows
            else None
        )
        return UserCursorPage(
            items=[self.map_to_response(r) for r in rows],
            next_cursor=next_cursor,
            has_more=has_more,
            limit=limit,
        )

Router:

@router.get("/", response_model=UserCursorPage)
async def list_users(
    f: UserCursorFilter = Depends(),
    controller: UserController = Depends(get_user_controller),
) -> UserCursorPage:
    return await controller.service.repository.cursor_page(
        cursor=f.cursor,
        limit=f.limit,
        ascending=f.ascending,
        filters=f.get_conditions(),
    )

The cursor is opaque base64-url-safe JSON — clients never inspect it; they pass back the value verbatim until next_cursor becomes null.

Alembic migrations

Full workflow: bootstrap → first migration → apply → CI gate.

Bootstrap once per project

# scripts/alembic_init.py
from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper(config_path="alembic.ini", db_url=settings.DATABASE_URL)
helper.init(
    directory="alembic",
    metadata_module="src.db.models",        # exposes BaseModel
    metadata_attr="BaseModel",
    db_url=settings.DATABASE_URL,
)

Run once: uv run python scripts/alembic_init.py.

This creates:

alembic.ini                 # SDK-curated config (UTC timezone, date-prefixed file template)
alembic/
├── env.py                  # SDK template (already wires target_metadata, compare_type, batch mode)
├── script.py.mako
└── versions/

Author migrations

# scripts/make_migration.py
import sys

from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper("alembic.ini", db_url=settings.DATABASE_URL)
helper.revision(
    message=sys.argv[1],
    autogenerate=True,
)
uv run python scripts/make_migration.py "add users table"

Generated file lands at alembic/versions/2026_05_16_1432-ae12cd34_add_users_table.py — the date prefix means files sort chronologically and merge conflicts are obvious.

Migrations come out lint-clean

The alembic.ini that init() writes includes a [post_write_hooks] block that runs ruff check --fix then ruff format on every freshly generated revision. Without it, the files Alembic emits fail tempest lint with W291 (trailing whitespace on the Revises: line when down_revision is None) and E501 (over-length sa.Column(...) lines). The hooks resolve your project's ruff config, so every autofixable rule (I, UP, E501, …) is fixed at generation time. Requires ruff on PATH — already a dev dependency in every tempest new scaffold.

Apply on startup

# src/api/app.py — extend lifespan
import asyncio

from tempest_fastapi_sdk import AlembicHelper


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # Run pending migrations before serving traffic.
    helper = AlembicHelper("alembic.ini", db_url=settings.DATABASE_URL)
    await asyncio.to_thread(helper.upgrade)

    await db.connect()
    yield
    await db.disconnect()

CI gate — schema must match models

# scripts/check_migrations.py
import sys

from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper("alembic.ini", db_url=settings.DATABASE_URL)
if not helper.check():
    print("Schema drift detected — run make_migration.py and commit.")
    sys.exit(1)
print("Schema is in sync.")
# .github/workflows/ci.yml
- name: Check migrations are in sync
  run: uv run python scripts/check_migrations.py