# tempest-fastapi-sdk > tempest-fastapi-sdk are the shared FastAPI + SQLAlchemy 2.0 (async) + Pydantic v2 building blocks used across Tempest services: BaseAppSettings, BaseModel, BaseRepository[Model], BaseService, BaseController, base and pagination schemas, the AppException hierarchy + handlers, AsyncDatabaseManager, AlembicHelper, utilities (PasswordUtils, JWTUtils, EmailUtils, UploadUtils, StoredFileServiceMixin, Brazilian document/phone helpers), JWT auth (UserAuthService + make_jwt_user_dependency), MinIO/S3 storage, a FastStream broker and TaskIQ tasks. Optional features ship as extras ([auth], [email], [upload], [minio], [cache], [queue], [tasks], [metrics], [webpush], [all]); Python >= 3.11. --- # tempest-fastapi-sdk Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/ # tempest-fastapi-sdk > Blocos compartilhados de FastAPI / SQLAlchemy / Pydantic usados em todos os serviços backend do Tempest. **Comece todo projeto com a mesma fundação opinativa já pronta.** ![Python](https://img.shields.io/badge/python-3.11%20%7C%203.12%20%7C%203.13-blue) ![License](https://img.shields.io/badge/license-MIT-green) ![PyPI](https://img.shields.io/pypi/v/tempest-fastapi-sdk) --- ## O que você ganha de fábrica !!! tip "Use o SDK sempre que for copiar e colar isto" `BaseModel` SQLAlchemy, `BaseUserModel` + `BaseUserTokenModel` abstratos, `BaseRepository` async com `bulk_create_values`/`bulk_upsert`, schemas Pydantic + paginação (offset + cursor), envelope de exceções + handlers, mixins de settings com `title`/`description`/`examples`, managers async de DB / Redis / RabbitMQ / TaskIQ, validadores brasileiros, utilitários de JWT / senha / e-mail (com templates Jinja2) / upload (com backends pluggable Local + MinIO), Server-Sent Events, Web Push, painel admin estilo Django, fluxo de auth bundled (signup / activate / login / reset), OAuth2/OIDC (Google/GitHub + genérico), middlewares de CSRF / Idempotency / BodySize / Prometheus / RateLimit, `HTTPClient` httpx tipado com retry + circuit-breaker, MinIO/S3 (`AsyncMinIOClient`), hook de Alembic que reordena colunas-base e uma CLI única (`tempest db`, `tempest user`, `tempest new`, `tempest generate`). | Módulo | Exporta | | --- | --- | | `tempest_fastapi_sdk.admin` | `AdminSite`, `AdminModel`, `make_admin_router`, `UserModelAuthBackend` | | `tempest_fastapi_sdk.api` | `register_exception_handlers`, `apply_cors`, `RequestIDMiddleware`, `IdempotencyMiddleware`, `BodySizeLimitMiddleware`, `CSRFMiddleware`, `PrometheusMiddleware`, `make_health_router`, `make_logs_router`, `make_prometheus_router`, OAuth (`GoogleOAuthClient` / `GitHubOAuthClient` / `OIDCProvider`), dependências JWT/role/permissão, `HardenedStaticFiles`, `RateLimitMiddleware`, `WebhookSignatureVerifier`, `run_server` | | `tempest_fastapi_sdk.auth` | `UserAuthService`, `make_auth_router`, schemas (`SignupSchema`, `LoginSchema`, `PasswordResetRequestSchema`, …) | | `tempest_fastapi_sdk.cache` | `AsyncRedisManager`, `@cached` | | `tempest_fastapi_sdk.controllers` | `BaseController` | | `tempest_fastapi_sdk.core` | `JSONFormatter`, `configure_logging`, contexto de request-ID, `BaseStrEnum` / `BaseIntEnum` | | `tempest_fastapi_sdk.db` | `BaseModel`, `BaseUserModel`, `BaseUserTokenModel`, `UserTokenPurpose`, `BaseRepository` (com `bulk_*`), `AsyncDatabaseManager`, `AlembicHelper`, `AuditMixin`, `SoftDeleteMixin`, `reorder_base_columns_first`, `compose_hooks` | | `tempest_fastapi_sdk.exceptions` | hierarquia `AppException` (404 / 409 / 401 / 403 / 422 / 429 / arquivo grande demais / tipo de arquivo inválido / JWT) | | `tempest_fastapi_sdk.queue` | `AsyncBrokerManager` (FastStream / RabbitMQ) | | `tempest_fastapi_sdk.schemas` | `BaseSchema`, `BaseResponseSchema`, `BasePaginationFilterSchema`, `BasePaginationSchema`, paginação por cursor | | `tempest_fastapi_sdk.services` | `BaseService` | | `tempest_fastapi_sdk.settings` | `BaseAppSettings`, `ServerSettings`, `DatabaseSettings`, `RedisSettings`, `RabbitMQSettings`, `JWTSettings`, `AuthSettings`, `CORSSettings`, `EmailSettings`, `LogSettings`, `TokenSettings`, `UploadSettings`, `WebPushSettings`, `TaskIQSettings` | | `tempest_fastapi_sdk.sse` | `EventStream`, `ServerSentEvent`, `sse_response` | | `tempest_fastapi_sdk.storage` | `AsyncMinIOClient`, `ObjectStat` | | `tempest_fastapi_sdk.tasks` | `AsyncTaskBrokerManager`, `AsyncTaskScheduler` | | `tempest_fastapi_sdk.testing` | `test_session`, `test_database`, helpers de SQLite em memória | | `tempest_fastapi_sdk.utils` | `PasswordUtils`, `JWTUtils`, `EmailUtils` (com `render_template`), `UploadUtils`, `LocalUploadStorage`, `MinIOUploadStorage`, `HTTPClient`, `RetryPolicy`, `MetricsUtils`, `LogUtils`, `AttemptThrottle`, `DownloadUtils`, helpers BR, helpers de token opaco | | `tempest_fastapi_sdk.webpush` | `WebPushDispatcher`, `WebPushPayloadSchema`, `WebPushSubscriptionSchema` | ## Início rápido em cinco minutos ```bash # 1. Instale o SDK com todos os extras pip install "tempest-fastapi-sdk[all]" # 2. Gere um novo serviço no diretório atual tempest new . # 3. Sincronize as deps + rode o smoke test uv sync uv run pytest ``` !!! example "O que o `tempest new` produz" ```text my-service/ ├── main.py # one-liner que importa run de src.server ├── pyproject.toml ├── .env.example └── src/ ├── server.py # entrypoint uvicorn + app no nível do módulo ├── api/ # routers, dependencies, factory do app ├── controllers/ # orquestração fina sobre os services ├── services/ # lógica de negócio ├── schemas/ # DTOs de request/response ├── db/ │ ├── models/ │ └── repositories/ └── core/ # settings + constants + exceptions ``` Continue com **[Instalação »](installation.md)** para o passo a passo por extra, **[Arquitetura »](architecture.md)** para entender o fatiamento em camadas, ou vá direto para o **[Tutorial »](tutorial.md)**. ## Status | Superfície | Estado | | --- | --- | | Python | 3.11 / 3.12 / 3.13 (matriz testada no CI) | | Testes | 630+ casos de pytest, cobertura ≥ 89 % | | Type-checking | `mypy --strict`, `py.typed` distribuído (PEP 561) | | Lint / format | `ruff` (check + fix + format) | | Pipeline de release | publicação confiável no PyPI a cada tag `vX.Y.Z` | --- # Instalação Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/installation/ # Instalação ## Resumo ```bash pip install tempest-fastapi-sdk ``` Requer **Python 3.11+**. !!! tip "Use o `uv`" `uv add tempest-fastapi-sdk` é mais rápido e já escreve no `pyproject.toml` para você. ## Extras opcionais Os helpers mais ricos puxam dependências de terceiros que só são necessárias quando você de fato usa o helper. Escolha os extras que o seu serviço consome: | Extra | Puxa | Habilita | | --- | --- | --- | | `[auth]` | `bcrypt`, `PyJWT` | `PasswordUtils`, `JWTUtils`, fluxo bundled `UserAuthService` + `make_auth_router` | | `[email]` | `aiosmtplib`, `jinja2`, `email-validator` | `EmailUtils` (com `render_template` + templates Jinja2) | | `[upload]` | `aiofiles`, `python-multipart` | `UploadUtils`, `DownloadUtils`, `LocalUploadStorage` | | `[cache]` | `redis` | `AsyncRedisManager` + `@cached` + `RedisIdempotencyStore` | | `[webpush]` | `pywebpush`, `cryptography` | `WebPushDispatcher` | | `[metrics]` | `psutil`, `nvidia-ml-py` | `MetricsUtils` | | `[queue]` | `faststream[rabbit]` | `AsyncBrokerManager` | | `[tasks]` | `taskiq`, `taskiq-aio-pika` | `AsyncTaskBrokerManager`, `AsyncTaskScheduler` | | `[admin]` | `jinja2`, `itsdangerous` | `AdminSite`, `AdminModel`, `make_admin_router` | | `[minio]` | `minio` | `AsyncMinIOClient`, `MinIOUploadStorage` | | `[http]` | `httpx` | `HTTPClient` + `RetryPolicy` + circuit-breaker | | `[prometheus]` | `prometheus-client` | `PrometheusMiddleware`, `make_prometheus_router`, `make_prometheus_registry` | | `[mfa]` | `pyotp` | `TOTPHelper` + endpoints MFA/2FA (TOTP) do fluxo bundled de auth | | `[sqlite]` | `aiosqlite` | driver async SQLite para URLs `sqlite+aiosqlite://` (default de dev) | | `[postgres]` | `asyncpg` | driver async PostgreSQL para URLs `postgresql+asyncpg://` (produção) | | `[all]` | tudo acima | todos os helpers | === "Subconjunto (recomendado)" ```bash pip install "tempest-fastapi-sdk[auth,upload,cache]" ``` === "Tudo" ```bash pip install "tempest-fastapi-sdk[all]" ``` === "uv add" ```bash uv add "tempest-fastapi-sdk[auth,upload,postgres]>=0.41.0" ``` === "pyproject.toml" ```toml dependencies = [ "tempest-fastapi-sdk[auth,upload,postgres]>=0.41.0", ] ``` !!! warning "O SDK não traz driver de banco por padrão" `sqlalchemy[asyncio]` é dependência core, mas o DBAPI async é escolha do seu deploy: instale `[sqlite]` (`aiosqlite`, default de dev) ou `[postgres]` (`asyncpg`, produção). Sem nenhum, o engine levanta `ModuleNotFoundError` do driver na primeira conexão. Serviços criados com `tempest new` já pinam `aiosqlite` e carregam uma linha `asyncpg` comentada no `pyproject.toml`. !!! info "Imports preguiçosos" Desde a 0.7.1 toda dependência opcional é importada de forma preguiçosa na primeira instanciação, então `import tempest_fastapi_sdk` funciona mesmo quando só um subconjunto de extras está instalado. Instanciar um helper cujo extra está faltando levanta `ImportError` com uma dica clara apontando para o extra certo. ## CLI A CLI `tempest` vem na instalação base (sem extra): ```bash tempest --version # mostra a versão instalada do SDK tempest new # gera um serviço em camadas no diretório atual tempest new myproject # gera dentro de ./myproject tempest generate --docker # regenera docker-compose.yaml a partir dos extras já escolhidos tempest db init # bootstrapa diretório alembic (alembic.ini sem credenciais) tempest db revision -m "msg" # autogenerate revision aplicando o reorder hook tempest db upgrade # roda upgrade até head (lê DATABASE_URL do .env) tempest db downgrade -1 # volta uma revisão tempest db current # mostra revisão atual tempest db history # log de revisões tempest user create --email admin@local --admin # `--email` obrigatório; senha pedida interativamente tempest user list --admin # lista somente os admins (omita `--admin` pra listar todos) tempest fix # ruff check --fix . + ruff format . tempest check # lint + fmt-check + mypy + pytest ``` Veja **[Receitas → CLI »](recipes/cli.md)** para o detalhamento completo. ## Verifique a instalação ```bash python -c "import tempest_fastapi_sdk; print(tempest_fastapi_sdk.__version__)" ``` ## Política de versões do Python | Python | Status | | --- | --- | | 3.13 | Matriz principal do CI | | 3.12 | Suportado | | 3.11 | Suportado (mínimo) | | 3.10 e anteriores | Não suportado (usa a sintaxe `X \| None` do PEP 604) | --- # Arquitetura Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/architecture/ # Arquitetura O SDK impõe um fatiamento estrito **router → controller → service → repository**. Todo projeto Tempest segue o mesmo formato, então um desenvolvedor jogado em um repositório novo encontra o arquivo que precisa logo de primeira. ## As quatro camadas ```mermaid flowchart LR subgraph HTTP Router["📡 Router\n(api/routers/)"] end subgraph Coordenação Controller["🎼 Controller\n(controllers/)"] end subgraph Domínio Service["📐 Service\n(services/)"] end subgraph Dados Repository["🗄️ Repository\n(db/repositories/)"] Model["🧱 Modelo SQLAlchemy\n(db/models/)"] end DB[(PostgreSQL / SQLite)] Router -->|"Depends()"| Controller Controller -->|orquestra| Service Service -->|regras de domínio| Repository Repository -->|"SELECT/INSERT/UPDATE"| Model Model -->|SQLAlchemy async| DB ``` ## O que vive onde !!! abstract "Responsabilidades de cada camada" | Camada | Responsável por | NUNCA toca | | --- | --- | --- | | **Router** | Verbos HTTP, status codes, schemas de request/response, `Depends()` | DB, lógica de negócio | | **Controller** | Coordenação entre múltiplos services, política transversal (log de auditoria, emissão de outbox, notificação downstream) | DB, formato de request/response | | **Service** | Regras de domínio (unicidade, estado derivado, fluxo transacional) | HTTP, tipos do SQLAlchemy | | **Repository** | Queries SQLAlchemy async cruas, CRUD + filtro + paginação | Regras de domínio, HTTP | O repository **DEVE** ser uma subclasse (ou instância) de [`BaseRepository[ModelType]`][tempest_fastapi_sdk.BaseRepository]. O service **DEVE** ser uma subclasse de [`BaseService[RepositoryT, ResponseT]`][tempest_fastapi_sdk.BaseService]. O controller **DEVE** ser uma subclasse de [`BaseController[ServiceT, ResponseT]`][tempest_fastapi_sdk.BaseController] — mesmo quando todo método é um pass-through, porque o controller é a costura para adicionar coordenação entre services mais tarde. ## Layout obrigatório do projeto ```text / ├── main.py # ONE-LINER: from src.server import run; run() └── src/ (ou app/) ├── __init__.py # re-exporta run de src.server ├── server.py # uvicorn.run() programático + app FastAPI no nível do módulo ├── api/ │ ├── app.py # factory create_app() — middleware + handlers + wiring (magro) │ ├── routers/ # endpoints HTTP, sem lógica de negócio │ ├── dependencies/ # PACOTE (auth.py + resources.py + controllers.py / services.py) │ └── docs/ # customização do OpenAPI ├── controllers/ # orquestra entre services ├── services/ # camada de lógica de negócio ├── schemas/ # DTOs de request/response Pydantic v2 ├── core/ # settings.py + constants + exceptions + logging ├── db/ (opcional) │ ├── models/ # modelos ORM SQLAlchemy │ └── repositories/ # camada de acesso a dados ├── utils/ (opcional) # helpers stateless compartilhados ├── queue/ (opcional) # consumers/publishers FastStream └── tasks/ (opcional) # tarefas em background TaskIQ ``` !!! warning "Regras inegociáveis" - `main.py` na raiz do serviço é um **one-liner** que importa `run` de `src.server`. Nunca `subprocess.run(["uvicorn", ...])`. - `src/server.py` expõe tanto uma função `run()` quanto a instância `app` importável. - `api/dependencies/` é **sempre um pacote**, nunca um arquivo plano. A auth vive em `auth.py`; os provedores factory vivem em `controllers.py` (ou `services.py` quando ainda não há camada de controller). - **Singletons de infra (db / storage / mail) vivem em `dependencies/resources.py`**, construídos uma vez (`db = AsyncDatabaseManager(**settings.database_kwargs())`) e acessados via provedores `get_db` / `get_session` / `get_storage` / `get_mailer`. O `app.py` **importa** esses recursos para o lifespan e o wiring dos routers — não os constrói inline. Isso mantém o `app.py` magro e dá um único dono do ciclo de vida dos recursos. - Routers recebem controllers (e sessões/recursos) via `Depends`, nunca construídos inline. - Endpoints meta (`/health`, `/tool-spec`) ficam no **prefixo raiz**; endpoints de negócio ficam sob `/api/`. ## Ciclo de vida da requisição ```mermaid sequenceDiagram autonumber participant C as Cliente participant M as RequestIDMiddleware participant L as RateLimitMiddleware participant R as Router participant Ctl as Controller participant S as Service participant Repo as Repository participant DB as PostgreSQL C->>M: requisição HTTP M->>M: vincula X-Request-ID a um contextvar M->>L: encaminha L->>L: verifica cota de janela deslizante L->>R: encaminha R->>R: valida o schema Pydantic R->>Ctl: Depends(get_user_controller) Ctl->>S: orquestra S->>Repo: filtra / pagina / adiciona Repo->>DB: SELECT/INSERT/UPDATE DB-->>Repo: linhas Repo-->>S: instâncias ORM S-->>Ctl: ResponseSchema Ctl-->>R: ResponseSchema R->>M: serializa Pydantic → JSON M-->>C: resposta HTTP + header X-Request-ID ``` Cada passo tem um dono claro — o **router nunca conversa com o SQLAlchemy**, o **repository nunca levanta exceções HTTP** (ele levanta a `not_found_exception` configurada no `__init__`, e o exception handler a transforma no envelope JSON). ## Envelope de exceções O SDK traz [`AppException`][tempest_fastapi_sdk.AppException] + [`register_exception_handlers`][tempest_fastapi_sdk.register_exception_handlers] para que todo erro no seu serviço serialize no mesmo formato JSON: ```json { "detail": "Usuário não encontrado", "code": "USER_NOT_FOUND", "details": {"user_id": "01923..."} } ``` O frontend ramifica no `code` (estável, legível por máquina), nunca no `detail` (que pode estar traduzido). ## Para onde ir agora | Você quer… | Leia | | --- | --- | | Construir uma feature passo a passo | **[Tutorial »](tutorial.md)** | | Conectar um helper específico | **[Receitas »](recipes/index.md)** | | Consultar a assinatura de uma classe | **[Referência »](reference.md)** | | Atualizar de uma versão antiga | **[Guia de migração »](migration.md)** | ## Camadas de controllers & services `BaseService[RepositoryT, ResponseT]` e `BaseController[ServiceT, ResponseT]` são esqueletos genéricos que casam com o fatiamento do SDK (router → controller → service → repository). Eles expõem métodos CRUD pass-through para que endpoints simples possam herdar deles sem sobrescrever nada; você sobrescreve apenas os métodos que precisam de orquestração. O que você herda ao subclassear `BaseService[RepositoryT, ResponseT]`: | Método | Retorna | Notas | | --- | --- | --- | | `get_by_id(id)` | `ResponseT` | Aguarda `repository.get_by_id` + `repository.map_to_response`. Levanta `repository.not_found_exception` quando não encontra. | | `get_or_none(filters)` | `ResponseT \| None` | Mesmo formato, retorna `None` em vez de levantar. | | `list(filters=None, order_by=None, ascending=True)` | `list[ResponseT]` | Retorna `[]` quando não há correspondência (nunca levanta). | | `paginate(filters=None, order_by=None, page=1, page_size=20, ascending=True)` | `dict` com `items` mapeados + `total`/`page`/`page_size`/`pages`. | Paginação por offset via `repository.paginate`. | | `count(filters=None)` | `int` | Pass-through para `repository.count`. | | `exists(filters)` | `bool` | Pass-through para `repository.exists`. | | `update(id, data)` | `ResponseT` | Busca por id, copia os campos presentes em `data` (tipado por `UpdateT`, o 3º parâmetro genérico opcional — default `BaseSchema`) na instância, persiste e mapeia. `to_dict()` descarta unset/`None`, então serve PUT e PATCH. | | `delete(id)` | `None` | Hard delete via `repository.delete`. | `map_to_response` é aguardado com `await` quando retorna uma coroutine, então mappers async funcionam de forma transparente — sem precisar sobrescrever o método. O que você herda ao subclassear `BaseController[ServiceT, ResponseT]`: | Método | Encaminha para | Notas | | --- | --- | --- | | `get_by_id(id)` | `service.get_by_id` | Mesmo tipo de retorno do service. | | `list(filters, order_by, ascending)` | `service.list` | Igual. | | `paginate(filters, order_by, page, page_size, ascending)` | `service.paginate` | Igual. | | `count(filters)` | `service.count` | Igual. | | `update(id, data)` | `service.update` | Igual. | | `delete(id)` | `service.delete` | Igual. | Quando um caso de uso precisa de regras de domínio, sobrescreva o método herdado no service. Quando um caso de uso precisa coordenar mais de um service, sobrescreva o método herdado (ou adicione um novo) no controller. O router nunca cresce — ele só depende do controller. ```python # src/services/user_service.py from uuid import UUID from tempest_fastapi_sdk import BaseService from src.db.repositories import UserRepository from src.schemas.user import UserCreate, UserResponse, UserUpdate from src.utils.security import password_utils class UserService(BaseService[UserRepository, UserResponse]): """Business logic for the user feature.""" async def signup(self, data: UserCreate) -> UserResponse: # Business logic — hash the password, then delegate to the repo. instance = self.repository.map_to_model( { "name": data.name, "email": data.email, "password_hash": password_utils.hash(data.password), }, ) created = await self.repository.add(instance) return self.repository.map_to_response(created) # src/controllers/user_controller.py from tempest_fastapi_sdk import BaseController from src.schemas.user import UserCreate, UserResponse from src.services.user_service import UserService class UserController(BaseController[UserService, UserResponse]): """Thin orchestration over UserService.""" async def signup(self, data: UserCreate) -> UserResponse: # Pass-through today; the controller is the seam to add # cross-service coordination later (audit log, outbox event, # downstream notification, etc.) without touching the router. return await self.service.signup(data) # src/api/dependencies/controllers.py from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession from src.api.app import db from src.controllers.user_controller import UserController from src.db.repositories import UserRepository from src.services.user_service import UserService def get_user_controller( session: AsyncSession = Depends(db.session_dependency), ) -> UserController: # UserRepository é uma subclasse de BaseRepository[UserModel] cujo # __init__ injeta `model=UserModel` via super().__init__(session, model=UserModel). # Veja o tutorial para o esqueleto completo: # https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/tutorial/#6-repository return UserController(UserService(UserRepository(session))) # src/api/routers/users.py from fastapi import APIRouter, Depends, status from src.api.dependencies.controllers import get_user_controller from src.controllers.user_controller import UserController from src.schemas.user import UserCreate, UserResponse router = APIRouter(prefix="/users", tags=["users"]) @router.post( "/", response_model=UserResponse, status_code=status.HTTP_201_CREATED, ) async def create_user( data: UserCreate, controller: UserController = Depends(get_user_controller), ) -> UserResponse: return await controller.signup(data) ``` Mantenha os controllers presentes mesmo quando só fazem pass-through — o grafo de imports fica uniforme entre os serviços, então adicionar política transversal mais tarde não muda a assinatura do router. --- # Tutorial — construindo a feature *Users* Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/tutorial/ # Tutorial — construindo a feature *Users* Este tutorial passa pela conexão da feature **Users** usando todas as convenções do SDK. Ao final você terá: - Um `UserModel` SQLAlchemy com colunas de auditoria + soft-delete - Schemas Pydantic para create / update / response / filter - Um repository, service e controller herdando das bases do SDK - Routers com controllers injetados via `Depends` - Subclasses de exceção de domínio serializadas pelo exception handler do SDK - Um `GET /users` paginado e um `POST /users` protegido por JWT !!! tip "Para os impacientes" Se você só quer copiar o layout, gere-o: `tempest new my-service`. A CLI entrega o mesmo esqueleto que este tutorial percorre. !!! info "Já tem fluxo de auth pronto" Este tutorial mostra como **construir** signup/login com `BaseRepository` + `BaseService` + `BaseController` — é a base para qualquer feature. Para o **fluxo de auth completo** (signup + activation por email + login com JWT + reset de senha), o SDK fornece `UserAuthService` + `make_auth_router` desde v0.31.0; pule para a receita **[Auth flow »](recipes/auth-flow.md)** quando quiser usar o atalho em vez de implementar manualmente. !!! info "Acompanhando" Todo snippet é **standalone** — cole-o no caminho de arquivo mostrado no comentário. A árvore completa do projeto é o [layout obrigatório de projeto da Arquitetura →](architecture.md#layout-obrigatorio-do-projeto). Vamos construir uma feature `Users` completa do zero, de ponta a ponta. Todo arquivo abaixo é algo que você escreve no seu projeto; os primitivos do SDK são importados. ### 1. Layout do projeto O layout canônico que todo serviço Python distribuído contra este SDK deve adotar — `main.py` é um one-liner, `src/server.py` expõe tanto `run()` quanto o `app` importável (ou o re-exporta de `src/api/app.py`), `api/dependencies/` é **sempre um pacote** (auth + provedores factory), `controllers/` é obrigatório mesmo quando é só um pass-through fino, e `repositories/` vive **sob** `db/`. ```text my-service/ ├── main.py # one-liner: from src.server import run; run() └── src/ ├── __init__.py # re-exporta `run` de src.server ├── server.py # uvicorn.run(...) programático + `app` no nível do módulo ├── core/ │ ├── __init__.py │ ├── settings.py # Settings(BaseAppSettings, mixins...) │ └── exceptions.py # exceções de domínio (UserNotFoundError, ...) ├── db/ │ ├── __init__.py # re-exporta BaseModel + todo modelo │ ├── models/ │ │ ├── __init__.py │ │ └── user.py # UserModel(BaseModel) │ └── repositories/ │ ├── __init__.py │ └── user.py # UserRepository(BaseRepository[UserModel]) ├── schemas/ │ ├── __init__.py │ └── user.py # UserCreate/Update/Response/Filter ├── services/ │ ├── __init__.py │ └── user.py # UserService — lógica de negócio ├── controllers/ │ ├── __init__.py │ └── user.py # UserController — orquestração (pass-through fino OK) └── api/ ├── __init__.py ├── app.py # create_app() — middleware, CORS, exception handlers, routers ├── routers/ │ ├── __init__.py │ └── users.py └── dependencies/ # SEMPRE um pacote, nunca um módulo plano ├── __init__.py ├── auth.py # dependências X-Token / current_user / require_role ├── resources.py # singletons de infra (db/storage/mail) + get_db/get_session └── controllers.py # factories get__controller / get__service ``` Cada `__init__.py` re-exporta todo símbolo público do seu diretório para que os consumidores sempre façam `from src.schemas import UserCreateSchema` (não `from src.schemas.user import UserCreateSchema`). Isso mantém os refactors indolores — mova arquivos sem quebrar imports. Se o seu serviço ainda não tem controllers/services/repositories, **ainda assim distribua pacotes vazios com os nomes certos** — a uniformidade importa mais do que pular um diretório. Descarte `db/`, `utils/`, `queue/` ou `tasks/` só quando o serviço genuinamente não precisa de persistência/utilitários/mensageria. ### 2. Settings, server, factory do app & entrypoint Quatro arquivos mapeiam em quatro responsabilidades: | Arquivo | Responsabilidade | | --- | --- | | `src/core/settings.py` | `Settings(BaseAppSettings, ...mixins)` — uma fonte única de verdade para env vars. | | `src/api/dependencies/resources.py` | singletons de infra (`db = AsyncDatabaseManager(**settings.database_kwargs())`, e — opt-in — storage/mail) + provedores `get_db` / `get_session`. Dono único do ciclo de vida dos recursos. | | `src/api/app.py` | factory `create_app()` magra — middleware + CORS + exception handlers + includes de router + instância `app` no nível do módulo. **Importa** os recursos de `dependencies`, não os constrói. | | `src/server.py` | `run()` invocando `uvicorn.run("src.api.app:app", ...)` programaticamente, mais re-exporta `app` para que runners externos (gunicorn, CLI do uvicorn) possam importá-lo. | | `main.py` | Entry point do processo — uma única linha sob `if __name__ == "__main__":` chamando `run()`. | ```python # src/core/settings.py from tempest_fastapi_sdk import BaseAppSettings, DatabaseSettings, ServerSettings class Settings(ServerSettings, DatabaseSettings, BaseAppSettings): """All environment-driven configuration lives here. BaseAppSettings ships `env_file=.env`, `extra=ignore`, `case_sensitive=True`, `frozen=True` and `str_strip_whitespace=True`. ServerSettings adds SERVER_HOST/PORT/RELOAD, DatabaseSettings adds DATABASE_URL/ECHO/POOL_*. """ JWT_SECRET: str JWT_ALGORITHM: str = "HS256" JWT_TTL_HOURS: int = 1 SMTP_HOST: str = "localhost" SMTP_PORT: int = 587 SMTP_USERNAME: str | None = None SMTP_PASSWORD: str | None = None SMTP_FROM_ADDR: str = "noreply@example.com" UPLOAD_DIR: str = "./var/uploads" settings = Settings() ``` Os recursos de infra (o banco agora; storage / mail quando entrarem) vivem em `src/api/dependencies/resources.py`, construídos **uma vez** e acessados por provedores `get_*`. O `app.py` só os **importa** — assim ele não engorda com construção de recursos: ```python # src/api/dependencies/resources.py from tempest_fastapi_sdk import AsyncDatabaseManager from src.core.settings import settings db = AsyncDatabaseManager(**settings.database_kwargs()) get_session = db.session_dependency # Depends(get_session) -> AsyncSession def get_db() -> AsyncDatabaseManager: """Return the process-wide database manager.""" return db ``` ```python # src/api/app.py from collections.abc import AsyncIterator from contextlib import asynccontextmanager from fastapi import FastAPI from tempest_fastapi_sdk import ( RequestIDMiddleware, make_health_router, register_exception_handlers, ) from src.api.dependencies.resources import db from src.api.routers import users @asynccontextmanager async def lifespan(_: FastAPI) -> AsyncIterator[None]: """Connect on startup, dispose on shutdown.""" await db.connect() try: yield finally: await db.disconnect() def create_app() -> FastAPI: """Build and configure the FastAPI app.""" app = FastAPI(title="my-service", version="0.1.0", lifespan=lifespan) app.add_middleware(RequestIDMiddleware) register_exception_handlers(app) # Meta endpoints sit at the root prefix. app.include_router(make_health_router(db=db, version="0.1.0")) # Business endpoints sit under /api/. app.include_router(users.router, prefix="/api") return app app = create_app() ``` ```python # src/server.py from tempest_fastapi_sdk import run_server from src.api.app import app # noqa: F401 — re-exported for external runners from src.core.settings import settings def run() -> None: """Start the API server programmatically.""" run_server("src.api.app:app", settings=settings) __all__: list[str] = ["app", "run"] ``` `run_server` lê `SERVER_HOST` / `SERVER_PORT` / `SERVER_RELOAD` de `settings` (caindo para `127.0.0.1` / `8000` / `False`) e encaminha quaisquer kwargs extras (`workers=`, `log_config=`, `ssl_*=`) literalmente para `uvicorn.run`. Veja a [receita de ponto de entrada programático do servidor](recipes/http.md#ponto-de-entrada-programatico-do-servidor). ```python # src/__init__.py from src.server import run __all__: list[str] = ["run"] ``` ```python # main.py from src.server import run if __name__ == "__main__": run() ``` Defaults de bind: `127.0.0.1` para serviços internos (o default `ServerSettings.SERVER_HOST` do SDK), `0.0.0.0` só quando o serviço é consumido por uma origem separada (ex.: um dev server de frontend). Nunca inicie o uvicorn via `subprocess.run(["uvicorn", ...])` — sempre passe por `run_server` (ou `uvicorn.run("src.api.app:app", ...)` diretamente) para que reload, tratamento de sinais e shutdown gracioso se comportem corretamente. ### 3. Modelo ORM ```python # src/db/models/user.py from sqlalchemy import String from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import BaseModel class UserModel(BaseModel): """One row per registered user. Inherits from BaseModel, so it automatically gets: - id (UUID v4, cross-DB portable via sqlalchemy.Uuid) - is_active (bool, soft-delete flag) - created_at, updated_at (timezone-aware TIMESTAMP, set by Python AND the DB so the instance attribute is populated right after flush) - __tablename__ = "user" (auto: class name without "Model" suffix, snake-cased; override by assigning __tablename__ explicitly) - __eq__/__hash__ by (type, id) so the same row across sessions compares equal - to_dict(exclude, include, remove_none) and update_from_dict(data, allowed_fields) helpers """ name: Mapped[str] = mapped_column(String(64), nullable=False) email: Mapped[str] = mapped_column(String(128), unique=True, nullable=False) password_hash: Mapped[str] = mapped_column(String(128), nullable=False) ``` Re-exporte: ```python # src/db/models/__init__.py from src.db.models.user import UserModel __all__: list[str] = ["UserModel"] ``` ```python # src/db/__init__.py from src.db.models import UserModel from tempest_fastapi_sdk import BaseModel __all__: list[str] = ["BaseModel", "UserModel"] ``` > **Dica:** Sempre importe os modelos em `src/db/__init__.py`. O SQLAlchemy precisa "ver" todo modelo antes de `BaseModel.metadata` ficar completa, para que o autogenerate do Alembic e o `create_tables()` funcionem corretamente. ### 4. Schemas O padrão de nomenclatura recomendado: um schema `*Create`, `*Update`, `*Response` e `*Filter` por recurso. ```python # src/schemas/user.py from pydantic import EmailStr, Field from tempest_fastapi_sdk import ( BasePaginationFilterSchema, BaseResponseSchema, BaseSchema, ) class UserCreateSchema(BaseSchema): """Payload for POST /users.""" name: str = Field(min_length=1, max_length=64) email: EmailStr password: str = Field(min_length=8, max_length=128) class UserUpdateSchema(BaseSchema): """Partial payload for PATCH /users/{id}. Every field optional.""" name: str | None = Field(default=None, min_length=1, max_length=64) email: EmailStr | None = None class UserResponseSchema(BaseResponseSchema): """Outbound representation. Inherits id/is_active/created_at/updated_at from BaseResponseSchema (timestamps already normalized to UTC by the field validator). """ name: str email: EmailStr class UserFilterSchema(BasePaginationFilterSchema): """Query-string filters for GET /users. Inherits page/page_size/order_by/ascending/is_active from BasePaginationFilterSchema. Add domain-level filters below. """ name: str | None = None # ILIKE %name% search email: EmailStr | None = None # exact-match filter ``` ```python # src/schemas/__init__.py from src.schemas.user import ( UserCreateSchema, UserFilterSchema, UserResponseSchema, UserUpdateSchema, ) __all__: list[str] = [ "UserCreateSchema", "UserFilterSchema", "UserResponseSchema", "UserUpdateSchema", ] ``` ### 5. Exceções de domínio O SDK entrega `NotFoundException`, `ConflictException`, etc. genéricos. Subclasse-os por domínio para que o matching `isinstance` / `except DomainError` fique explícito. `message` / `code` / `status_code` a nível de classe são defaults aos quais o construtor recorre — você também pode sobrescrever qualquer um deles no ponto do raise sem subclassear: ```python # src/core/exceptions.py from tempest_fastapi_sdk import ConflictException, NotFoundException class UserNotFoundError(NotFoundException): """Subclass kept only for ``except UserNotFoundError`` matching.""" message: str = "Usuário não encontrado" code: str = "USER_NOT_FOUND" class UserEmailAlreadyTakenError(ConflictException): message: str = "Já existe um usuário com esse e-mail" code: str = "USER_EMAIL_TAKEN" ``` Para códigos pontuais você não precisa de uma subclasse — passe-os ao construtor: ```python raise NotFoundException( "Pedido não encontrado", code="ORDER_NOT_FOUND", details={"order_id": str(order_id)}, ) ``` O exception handler do SDK ([`register_exception_handlers`](#2-settings-server-factory-do-app-entrypoint)) os serializa para: ```json { "detail": "Usuário não encontrado", "code": "USER_NOT_FOUND", "details": {} } ``` O frontend ramifica no `code`, não na mensagem (que pode estar traduzida). ### 6. Repository Para CRUD simples você não precisa de uma subclasse nenhuma — instancie `BaseRepository` diretamente e vincule o modelo pelo construtor: ```python # anywhere a session is in scope from tempest_fastapi_sdk import BaseRepository from src.db.models import UserModel repository = BaseRepository(session, model=UserModel) await repository.add( UserModel( email="ana@example.com", name="Ana", password_hash="", ) ) ``` Subclasse quando quiser embutir mensagens específicas de domínio, trocar a exceção de not-found, sobrescrever os métodos de mapeamento ou adicionar queries custom. A assinatura do construtor (não os atributos de classe) é o contrato: ```python # src/db/repositories/user.py from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import BaseRepository from src.core.exceptions import UserNotFoundError from src.db.models import UserModel from src.schemas import UserResponseSchema class UserRepository(BaseRepository[UserModel]): """Data-access layer for users.""" def __init__(self, session: AsyncSession) -> None: super().__init__( session, model=UserModel, not_found_exception=UserNotFoundError, not_found_message="Usuário não encontrado", create_conflict_message="Já existe um usuário com esse e-mail", update_conflict_message="Conflito ao atualizar usuário", ) def map_to_schema(self, instance: UserModel) -> UserResponseSchema: return UserResponseSchema.model_validate(instance) def map_to_response(self, instance: UserModel) -> UserResponseSchema: return self.map_to_schema(instance) ``` O repo base te dá 20+ métodos de graça — veja a [tabela de referência](reference.md#tempest_fastapi_sdk.db.repository.BaseRepository) abaixo. Adicione queries custom em cima do mesmo `UserRepository`: ```python # src/db/repositories/user.py (continued) class UserRepository(BaseRepository[UserModel]): # ... __init__ and mappers above ... # ──────── custom queries on top of the inherited bulk + read methods ──────── async def get_by_email(self, email: str) -> UserModel: """Look up a user by email. Raises ``UserNotFoundError`` on miss.""" return await self.get({"email": email}) ``` O bloco destacado (sob o comentário divisor) é o que você tipicamente adiciona por projeto — tudo acima dele é o boilerplate de que a classe base já cuida. ### 7. Service O service é onde as regras de negócio vivem. Ele chama um ou mais repositories e nunca toca em tipos de HTTP ou SQLAlchemy diretamente. Herde de `BaseService[RepositoryT, ResponseT]`. Fazer isso te dá `get_by_id`, `get_or_none`, `list`, `paginate`, `count`, `exists` e `delete` de graça — cada um já conectado a `repository.map_to_response` (sync ou async). Sobrescreva só os métodos que precisam de lógica de domínio; adicione novos para casos de uso que a base não cobre (signup, reset de senha, etc.): ```python # src/services/user.py from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import BaseService, PasswordUtils from src.core.exceptions import UserEmailAlreadyTakenError from src.db.repositories import UserRepository from src.schemas import UserCreateSchema, UserResponseSchema, UserUpdateSchema class UserService(BaseService[UserRepository, UserResponseSchema]): """Business logic for the user domain. Inherits the canonical read-path methods (``get_by_id`` / ``list`` / ``paginate`` / ``count`` / ``exists`` / ``delete``) from :class:`BaseService` and adds the write-path methods that need domain rules (uniqueness check, password hashing, mass-assignment guard). """ def __init__( self, repository: UserRepository, *, passwords: PasswordUtils, ) -> None: """Initialize the service. Args: repository (UserRepository): User-domain repository. passwords (PasswordUtils): Shared bcrypt helper. """ super().__init__(repository) self.passwords: PasswordUtils = passwords # ──────── overrides: domain rules live here ──────── async def signup(self, data: UserCreateSchema) -> UserResponseSchema: """Create a new user, enforcing email uniqueness + hashing the password.""" if await self.repository.exists({"email": data.email}): raise UserEmailAlreadyTakenError() instance = self.repository.map_to_model( { **data.to_dict(exclude=["password"]), "password_hash": self.passwords.hash(data.password), }, ) instance = await self.repository.add(instance) return self.repository.map_to_response(instance) async def update( self, user_id: UUID, data: UserUpdateSchema, ) -> UserResponseSchema: """Apply a partial update, whitelisting the columns that may change.""" instance = await self.repository.get_by_id(user_id) instance.update_from_dict( data.to_dict(), allowed_fields={"name", "email"}, # prevents mass-assignment ) instance = await self.repository.update(instance) return self.repository.map_to_response(instance) async def soft_delete(self, user_id: UUID) -> None: """Flip ``is_active=False`` instead of hard-deleting.""" await self.repository.soft_delete(user_id) ``` Os métodos que você **não** escreve — `get_by_id(user_id)`, `get_or_none(filters)`, `list(filters=None, order_by=None, ascending=True)`, `paginate(filters=None, order_by=None, page=1, page_size=20, ascending=True)`, `count(filters)`, `exists(filters)`, `delete(user_id)` — já existem na base, já aguardam um `map_to_response` async, e já retornam o `UserResponseSchema` tipado declarado no parâmetro genérico. Quando o caso de uso precisa de um pipeline custom (joins, projeções, fan-out transacional), sobrescreva o método herdado. A assinatura continua a mesma, então o controller não percebe: ```python class UserService(BaseService[UserRepository, UserResponseSchema]): # ... __init__ and overrides above ... async def list( # override the inherited pass-through self, filters: dict[str, Any] | None = None, order_by: Any | None = None, ascending: bool = True, ) -> list[UserResponseSchema]: """List active users only — domain rule baked into the base method.""" merged: dict[str, Any] = {**(filters or {}), "is_active": True} return await super().list(filters=merged, order_by=order_by, ascending=ascending) ``` ### 8. Controller Mesmo quando não há orquestração a fazer, `controllers/` existe como um **pass-through fino** para que o grafo de imports fique uniforme entre os serviços. No dia em que um caso de uso precisar coordenar dois services (ou fazer fan-out para uma fila), o controller já está lá. Herde de `BaseController[ServiceT, ResponseT]`. A base encaminha `get_by_id`, `list`, `paginate`, `count` e `delete` para o service por você — você só declara métodos que adicionam coordenação entre services ou que não existem no service (casos de uso custom como `signup`): ```python # src/controllers/user.py from uuid import UUID from tempest_fastapi_sdk import BaseController from src.schemas import UserCreateSchema, UserResponseSchema, UserUpdateSchema from src.services.user import UserService class UserController(BaseController[UserService, UserResponseSchema]): """Orchestrate user use cases. Today every method is a thin pass-through to ``UserService``. As soon as a use case needs to coordinate more than one service — e.g. signup also sends a welcome email and enqueues a CRM sync — the orchestration lives here, not in the router and not in the service. """ # ──────── new methods for use cases the base doesn't cover ──────── async def signup(self, data: UserCreateSchema) -> UserResponseSchema: """Create a user and (eventually) trigger downstream side effects.""" return await self.service.signup(data) async def update( self, user_id: UUID, data: UserUpdateSchema, ) -> UserResponseSchema: """Domain-specific partial update — distinct from the base ``delete``.""" return await self.service.update(user_id, data) async def soft_delete(self, user_id: UUID) -> None: """Soft-delete instead of the inherited hard ``delete``.""" await self.service.soft_delete(user_id) ``` `get_by_id` / `list` / `paginate` / `count` não são redeclarados — `BaseController` já os expõe. Quando o dia da coordenação entre services chegar, sobrescreva o pass-through no lugar: ```python class UserController(BaseController[UserService, UserResponseSchema]): # ... methods above ... async def signup(self, data: UserCreateSchema) -> UserResponseSchema: """Create the user, send a welcome email, enqueue the CRM sync.""" user = await self.service.signup(data) await self.emails.send_welcome(user) # second dependency await self.tasks.enqueue("crm.user.created", {"id": str(user.id)}) return user ``` A assinatura do router nunca muda — só o corpo do controller cresce. ### 9. Provedores de dependência `api/dependencies/` é **sempre um pacote**. `auth.py` hospeda as dependências de segredo compartilhado / usuário atual; `resources.py` hospeda os singletons de infra (`db`, e — opt-in — storage/mail) com os provedores `get_db` / `get_session`; `controllers.py` (ou `services.py` quando ainda não há camada de controller) hospeda os provedores factory dos quais os routers dependem. Nunca construa controllers, services ou recursos de infra inline dentro do arquivo do router (nem do `app.py`). ```python # src/api/dependencies/controllers.py from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import PasswordUtils from src.api.dependencies.resources import get_session from src.controllers.user import UserController from src.db.repositories import UserRepository from src.services.user import UserService # Stateless utilities — instantiate once per process. _passwords: PasswordUtils = PasswordUtils() def get_user_controller( session: AsyncSession = Depends(get_session), ) -> UserController: """Wire repository → service → controller for a single request.""" repository = UserRepository(session) service = UserService(repository=repository, passwords=_passwords) return UserController(service=service) ``` ```python # src/api/dependencies/__init__.py from src.api.dependencies.controllers import get_user_controller __all__: list[str] = ["get_user_controller"] ``` ### 10. Router Routers recebem controllers via `Depends` do FastAPI — sem construção inline, sem lógica de negócio, sem chamadas de DB. Endpoints de negócio ficam sob `/api/` (o prefixo é adicionado no ponto do include em `src/api/app.py`); endpoints meta (`/health`, `/tool-spec`) ficam no prefixo raiz. ```python # src/api/routers/users.py from uuid import UUID from fastapi import APIRouter, Depends, status from tempest_fastapi_sdk import BasePaginationSchema from src.api.dependencies import get_user_controller from src.controllers.user import UserController from src.schemas import ( UserCreateSchema, UserFilterSchema, UserResponseSchema, UserUpdateSchema, ) router = APIRouter(prefix="/users", tags=["users"]) @router.post( "", response_model=UserResponseSchema, status_code=status.HTTP_201_CREATED, ) async def create_user( data: UserCreateSchema, controller: UserController = Depends(get_user_controller), ) -> UserResponseSchema: return await controller.signup(data) @router.get("/{user_id}", response_model=UserResponseSchema) async def get_user( user_id: UUID, controller: UserController = Depends(get_user_controller), ) -> UserResponseSchema: return await controller.get_by_id(user_id) @router.patch("/{user_id}", response_model=UserResponseSchema) async def update_user( user_id: UUID, data: UserUpdateSchema, controller: UserController = Depends(get_user_controller), ) -> UserResponseSchema: return await controller.update(user_id, data) @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: UUID, controller: UserController = Depends(get_user_controller), ) -> None: await controller.soft_delete(user_id) @router.get("", response_model=BasePaginationSchema[UserResponseSchema]) async def list_users( filters: UserFilterSchema = Depends(), controller: UserController = Depends(get_user_controller), ) -> BasePaginationSchema[UserResponseSchema]: result = await controller.paginate( filters=filters.get_conditions(), order_by=filters.order_by, page=filters.page, page_size=filters.page_size, ascending=filters.ascending, ) return BasePaginationSchema[UserResponseSchema](**result) ``` ### 11. Paginação O contrato de paginação é imposto de ponta a ponta pelos primitivos do SDK: - `UserFilterSchema(BasePaginationFilterSchema)` parseia `?page=&page_size=&order_by=&ascending=&is_active=&name=` da query string e expõe `.get_conditions()` retornando só os filtros de domínio (sem as chaves de paginação). - `UserRepository.paginate(...)` roda a query com o dict de filtro + ordenação + offset/limit + contagem, retornando o dict `{items, total, page, page_size, pages}` que você embrulha em `BasePaginationSchema[UserResponseSchema]`. - `BasePaginationSchema[UserResponseSchema]` embrulha o resultado para que o OpenAPI documente o formato da resposta corretamente. ```http GET /api/users?page=2&page_size=20&order_by=name&ascending=true&is_active=true&name=ana ``` Retorna: ```json { "items": [ {"id": "...", "name": "Ana ...", "email": "...", ...}, ... ], "total": 142, "page": 2, "page_size": 20, "pages": 8 } ``` --- --- # Receitas Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/ # Receitas Passo a passo curtos no estilo "quero conectar X". Cada página começa com **qual problema resolve**, **quando recorrer a ela** e um exemplo de código completo que você pode copiar literalmente. !!! tip "Quando ler o quê" - Só precisa consultar uma assinatura? Pule para a **[Referência »](../reference.md)**. - Construindo um serviço novo do zero? Siga primeiro o **[Tutorial »](../tutorial.md)** linear. - Conectando uma peça específica do SDK? Você está no lugar certo — escolha a receita abaixo. | Tema | Cobre | | --- | --- | | **[Banco de dados »](database.md)** | `BaseModel`, `AsyncDatabaseManager`, `BaseRepository` (CRUD + filtros + bulk), paginação offset/cursor, mixins, `AlembicHelper`, `SlowQueryLogger` | | **[Multi-tenant »](multi-tenant.md)** | `TenantScopedRepository` — isolamento por `tenant_id` em toda query | | **[Audit trail »](audit-trail.md)** | `BaseAuditLogModel`, `add_audited` / `update_audited` / `delete_audited`, `snapshot_model` / `diff_snapshots` | | **[Camada HTTP »](http.md)** | `apply_cors`, `RequestIDMiddleware`, `RateLimitMiddleware`, `make_health_router`, dependências de JWT / role / permissão, verificador de assinatura de webhook, headers Link de paginação, router de tool-spec | | **[HTTP client (saída) »](http-client.md)** | `HTTPClient` — httpx tipado com retry/backoff, circuit-breaker, X-Request-ID; `RetryPolicy`, `CircuitOpenError` | | **[Cache »](cache.md)** | `AsyncRedisManager`, decorator `@cached`, `CacheInvalidator` (tag/namespace) | | **[Feature flags »](feature-flags.md)** | `FeatureFlags`, backends env/Redis/composto, `make_flag_dependency` | | **[Tempo real »](realtime.md)** | Server-Sent Events (`EventStream`, `sse_response`) | | **[Fila e Tarefas »](queue-tasks.md)** | FastStream (`AsyncBrokerManager`), TaskIQ (`AsyncTaskBrokerManager`), `AsyncTaskScheduler`, outbox transacional | | **[Outbox transacional »](outbox.md)** | `BaseOutboxModel`, `OutboxRelay`, `save_with_outbox` — eventos confiáveis | | **[Email transacional »](email.md)** | `EmailUtils` — SMTP, corpo texto/HTML, anexos, templates Jinja2 | | **[Web Push »](webpush.md)** | `WebPushDispatcher`, schemas VAPID, broadcast com poda | | **[Visão computacional (ONNX) »](vision.md)** | `Detector` / `Classifier` / `Segmenter` + schemas de predição | | **[Logging »](logging.md)** | `LogUtils`, logging JSON estruturado, propagação de request-ID | | **[Métricas »](metrics.md)** | `MetricsUtils` — snapshots de CPU / RAM / disco / GPU | | **[Observabilidade (tracing) »](observability.md)** | `setup_tracing` (OpenTelemetry), `SlowQueryLogger` | | **[Painel admin »](admin.md)** | `AdminSite`, `AdminModel`, `make_admin_router`, `BaseUserModel` | | **[Downloads »](downloads.md)** | `DownloadUtils` — `file_response`, `stream`, `build_content_disposition`, anti path-traversal | | **[Utilitários »](utilities.md)** | `utcnow`/`to_utc`, `modify_dict`, `get_client_ip`, tokens opacos (`generate_opaque_token`) | | **[Testes »](testing.md)** | `test_session`, `test_database`, SQLite em memória, fixtures pytest | | **[Deploy seguro »](deploy-safety.md)** | `AlembicHelper.safe_upgrade` (barra DROPs), `GracefulShutdownMiddleware` | | **[CLI »](cli.md)** | `tempest new` / `db` (+ `seed`) / `user` / `secrets rotate` / `lint` / `fix` / `format` / `type` / `test` / `check` | | **[Segurança »](security.md)** | `AttemptThrottle`, helpers de token opaco, `HardenedStaticFiles`, headers de segurança | | **[Helpers brasileiros »](br-helpers.md)** | validação + normalização de CPF / CNPJ / CEP / telefone | ## Anatomia de uma receita Toda receita segue o mesmo formato de quatro seções para você bater o olho: 1. **O que resolve** — um parágrafo em linguagem simples. 2. **Quando usar** — lista de situações + quando *não* usar. 3. **O código** — completo, executável, com anotações `# 1. setup` / `# 2. wire` / `# 3. test`. 4. **Pegadinhas** — ressalvas de produção, defaults de segurança, notas de escala. Se você encontrar uma receita que não segue esse formato, [abra uma issue](https://github.com/mauriciobenjamin700/tempest-fastapi-sdk/issues/new) — tratamos regressões de doc como regressões de código. --- # Banco de dados Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/database/ # Banco de dados Esta é a camada que toda service Tempest usa para falar com PostgreSQL (produção) ou SQLite (desenvolvimento/testes) sobre **SQLAlchemy 2.0 async**. Ela existe para que você nunca reescreva a mesma engine, a mesma sessão por request, o mesmo CRUD e a mesma paginação em cada projeto. São quatro peças, e você vai conhecê-las uma de cada vez: | Peça | Símbolo | Para quê | | --- | --- | --- | | Modelo base | `BaseModel` | As quatro colunas canônicas (`id` / `is_active` / `created_at` / `updated_at`) + helpers de serialização. | | Conexão | `AsyncDatabaseManager` | Engine, pool, sessão por request, `health_check`. | | Repository | `BaseRepository[Model]` | CRUD async, filtros por convenção, operações em lote, paginação. | | Migrações | `AlembicHelper` | Bootstrap do Alembic, autogenerate, gate de drift no CI. | Mais três opcionais que entram quando o domínio pede: os **mixins** (`SoftDeleteMixin`, `AuditMixin`, `MFAMixin`), a **paginação por cursor** e o **`SlowQueryLogger`**. !!! tip "Como ler esta página" Ela é progressiva. Comece pelo modelo, conecte o banco, suba um repository, aprenda os filtros, então paginação, migrações e observabilidade. Cada bloco de código é um arquivo completo — copie, cole, rode. Se você só quer a referência da API, pule para [Referência »](../reference.md). --- ## 1. O modelo base Todo modelo da sua service herda de `BaseModel`. Você ganha quatro colunas sem escrever nenhuma: ```python # src/db/models/user.py from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import BaseModel class UserModel(BaseModel): """Users table.""" name: Mapped[str] = mapped_column() email: Mapped[str] = mapped_column(unique=True) password_hash: Mapped[str] = mapped_column() ``` Isso já cria a tabela `user` com **sete** colunas: as três suas (`name`, `email`, `password_hash`) mais as quatro herdadas: | Coluna | Tipo | Padrão | Papel | | --- | --- | --- | --- | | `id` | `UUID` (v4) | `uuid4()` | Chave primária, portável entre Postgres/SQLite/MySQL/MSSQL. | | `is_active` | `bool` | `True` | Flag de soft-delete rápido. | | `created_at` | `datetime` (tz-aware) | `utcnow()` no flush | Carimbo de criação. | | `updated_at` | `datetime` (tz-aware) | `utcnow()` no `onupdate` | Carimbo da última escrita. | !!! info "Por que o nome da tabela é `user` e não `UserModel`?" `BaseModel` deriva `__tablename__` da classe automaticamente: tira o sufixo `Model` e converte para `snake_case`. `UserModel` → `user`, `OrderItemModel` → `order_item`. Você sempre pode fixar `__tablename__ = "users"` explicitamente — a declaração explícita vence o automático. ### Convenção de nomes de constraints `BaseModel.metadata` já vem configurado com `NAMING_CONVENTION`. Isso faz toda PK/FK/índice/unique/check receber um nome **determinístico** — `ix_user_email`, `uq_user_email`, `fk_order_user_id_user` — igual em toda máquina e todo engine. !!! check "O ganho real está nas migrações" Sem nomes determinísticos, o `alembic revision --autogenerate` inventa identificadores aleatórios e cada desenvolvedor gera um diff diferente para o mesmo schema. Com a convenção, o autogenerate só emite **diffs de schema reais** — sem churn de nomes. ### Helpers que vêm de graça Toda instância de `BaseModel` ganha: ```python # Serializar para dict (útil em logs/testes) data: dict[str, Any] = user.to_dict(exclude=["password_hash"]) # Atribuir vários campos de uma vez, com whitelist contra mass-assignment user.update_from_dict( payload.model_dump(exclude_unset=True), allowed_fields={"name", "email"}, # id/role nunca são escritos ) ``` `__eq__` e `__hash__` comparam por `(tipo, id)`, então a mesma linha carregada em sessões diferentes é igual — prático em testes e `set`s. Linhas ainda não persistidas (`id is None`) caem para identidade Python. !!! warning "Use sempre `allowed_fields` em payloads externos" `update_from_dict` sem `allowed_fields` aceita qualquer coluna mapeada. Para corpos de PATCH vindos do cliente, passe a whitelist — é a defesa contra mass-assignment em colunas sensíveis (`id`, `role`, `is_active`). **Recap:** herde `BaseModel`, declare só as colunas do seu domínio, e o SDK entrega id/timestamps/soft-delete, nomes de constraint determinísticos e helpers de serialização. --- ## 2. Conectando ao banco `AsyncDatabaseManager` é instanciado **uma vez** por aplicação e cuida da engine, do pool e da fábrica de sessões. Coloque-o nas dependências de infraestrutura, não dentro do `app.py`: ```python # src/api/dependencies/resources.py from tempest_fastapi_sdk import AsyncDatabaseManager from src.core.settings import settings db = AsyncDatabaseManager( settings.DATABASE_URL, echo=settings.DEBUG, # ecoa SQL no stdout em dev pool_size=10, # ignorado para SQLite max_overflow=20, pool_recycle=3600, ) ``` Ele detecta o backend pela URL (`make_url`), então SQLite ganha `check_same_thread=False` automaticamente e os parâmetros de pool são ignorados — não há truque de substring. ### Uma sessão por request Use `session_dependency` como dependência do FastAPI. Ela entrega uma sessão por request e **não** faz commit no sucesso — o commit é responsabilidade da camada de repository/service: ```python # src/api/dependencies/resources.py (continuação) from typing import Annotated from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession SessionDep = Annotated[AsyncSession, Depends(db.session_dependency)] ``` ```python # src/api/routers/user.py from uuid import UUID from fastapi import APIRouter from src.api.dependencies.resources import SessionDep from src.db.repositories import UserRepository from src.schemas import UserResponse router = APIRouter(prefix="/api/users", tags=["users"]) @router.get("/{user_id}", response_model=UserResponse) async def get_user(user_id: UUID, session: SessionDep) -> UserResponse: """Fetch a single user by id.""" repository = UserRepository(session) return repository.map_to_response(await repository.get_by_id(user_id)) ``` ### Ciclo de vida no lifespan Abra e feche a engine junto com a aplicação: ```python # src/api/app.py from collections.abc import AsyncIterator from contextlib import asynccontextmanager from fastapi import FastAPI from src.api.dependencies.resources import db @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: """Open the database on startup, dispose it on shutdown.""" await db.connect() yield await db.disconnect() ``` ### Health check `health_check()` roda um `SELECT 1` e engole qualquer exceção, devolvendo só `True`/`False` — perfeito para `/health`: ```python @router.get("/health") async def health() -> dict[str, object]: """Liveness + database probe.""" return { "status": "ok", "database": await db.health_check(), "url": db.db_url_safe, # credenciais mascaradas } ``` !!! info "Outras formas de obter sessão" - `db.get_session_context()` — context manager que faz **commit** no sucesso e rollback no erro. Use em scripts e tasks de background. - `db.get_session()` — sessão crua; você fecha. - `db.create_tables()` / `db.drop_tables()` — só para testes e dev local; em produção o schema é do Alembic. !!! danger "Nunca logue `db_url`, sempre `db_url_safe`" A URL crua carrega usuário e senha. `db_url_safe` renderiza `postgresql+asyncpg://***@host/db`. A URL crua fica num atributo privado justamente para não vazar em `repr()` ou log acidental. **Recap:** um `AsyncDatabaseManager` por app, em `resources.py`; `session_dependency` injeta a sessão por request; `connect`/`disconnect` no lifespan; `health_check` + `db_url_safe` no `/health`. --- ## 3. O repository `BaseRepository[Model]` é o coração da camada. Ele encapsula o CRUD async, os filtros, as operações em lote e a paginação. Há dois jeitos de usá-lo. ### Modo direto — CRUD puro Quando você não tem query custom, instancie direto: ```python from tempest_fastapi_sdk import BaseRepository from src.db.models import UserModel repository = BaseRepository(session, model=UserModel) user = await repository.get_by_id(user_id) ``` ### Modo subclasse — quando há queries próprias Subclassifique para adicionar consultas do domínio e os três mappers que traduzem ORM ↔ DTO. **O construtor é o contrato** — você repassa `model` para `super().__init__`, não há atributos de classe mágicos: ```python # src/db/repositories/user.py from typing import Any from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import BaseRepository from src.db.models import UserModel from src.schemas import UserResponse class UserRepository(BaseRepository[UserModel]): """Data access for the user domain.""" def __init__(self, session: AsyncSession) -> None: """Bind the repository to a session and the user model. Args: session (AsyncSession): The async database session. """ super().__init__( session, model=UserModel, not_found_message="Usuário não encontrado", create_conflict_message="E-mail já cadastrado", ) def map_to_response(self, instance: UserModel) -> UserResponse: """Map an ORM row to its API response schema. Args: instance (UserModel): The persisted user row. Returns: UserResponse: The serializable response DTO. """ return UserResponse.model_validate(instance) def map_to_model(self, data: dict[str, Any]) -> UserModel: """Build an ORM instance from a plain payload. Args: data (dict[str, Any]): Column-value pairs. Returns: UserModel: The unpersisted instance. """ return UserModel(**data) ``` !!! tip "Mensagens de erro por repository" Os kwargs `not_found_message`, `create_conflict_message`, `update_conflict_message`, `bulk_create_conflict_message` e `bulk_update_conflict_message` customizam o texto das exceções. Sem eles, o SDK gera mensagens a partir de `Model.__name__` (`"User not found"`, `"Conflict creating User"`). Passe `not_found_exception=` para subir uma exceção de domínio mais rica que o `NotFoundException` padrão. ### O CRUD que você ganha Lembrando da convenção de coleções do projeto: lookups de **registro único** levantam 404; lookups de **coleção** devolvem `[]`. ```python # Leitura — registro único (404 quando não acha) user = await repository.get_by_id(user_id) user = await repository.get({"email": "a@b.com"}) # Leitura — pode não existir (None, sem 404) user = await repository.get_or_none({"email": "a@b.com"}) first = await repository.first({"is_active": True}) # Leitura — coleção (sempre [], nunca 404) users = await repository.list({"is_active": True}) # Existência / contagem exists = await repository.exists({"email": "a@b.com"}) total = await repository.count({"is_active": True}) # "Esse valor já é de OUTRO registro?" — validação de unicidade no update taken = await repository.exists_excluding( {"email": "a@b.com"}, exclude_id=user.id ) # id-ou-instância → instância (sem if isinstance espalhado nas services) user = await repository.resolve(user_or_id) # Escrita created = await repository.add( UserModel(name="Ana", email="ana@x.com", password_hash="...") ) updated = await repository.update(user) # commita mutações numa instância anexada # Remoção await repository.delete(user_id) # hard delete (404 se não existe) await repository.delete_many({"is_active": False}) # retorna contagem await repository.delete_batch([id1, id2, id3]) # por PK, retorna contagem # Soft-delete via flag is_active (não precisa do SoftDeleteMixin) await repository.soft_delete(user_id) # is_active = False await repository.restore(user_id) # is_active = True ``` !!! note "`update` espera uma instância anexada" O fluxo típico é: `get_by_id` → mutar com `update_from_dict` → `repository.update(instance)`. Não construa um modelo solto e mande para o `update` — ele persiste mutações de algo já carregado na sessão. !!! tip "`resolve` e `exists_excluding` — dois ajudantes que você vai usar sempre" **`resolve(id_ou_instância)`** resolve o velho dilema: seu método recebe `UUID | UserModel` e você não quer escrever `if isinstance(x, UUID): ... else: ...` em toda service. O `resolve` faz isso por você — passa um `UUID`, ele busca (404 se não existir); passa uma instância, ele devolve a mesma. Uma linha: ```python user_model = await self.repository.resolve(user) # user é UUID OU UserModel ``` **`exists_excluding(filtros, exclude_id=...)`** responde a pergunta "esse e-mail/telefone/username já é de **outra** pessoa?" — exatamente o que você precisa ao **atualizar** um campo único. O `exists` normal diria `True` até para o próprio registro; o `exists_excluding` ignora o id que você passar: ```python if await self.repository.exists_excluding( {"phone": new_phone}, exclude_id=user.id ): raise UserWithPhoneExistsException(phone=new_phone) ``` Passe `exclude_id=None` no cadastro (quando ainda não há registro a excluir) — aí ele se comporta igual ao `exists`. **Recap:** instancie direto para CRUD puro, subclassifique para queries + mappers. 404 só em lookup único; coleção devolve `[]`. `soft_delete` mexe na flag `is_active`; o `SoftDeleteMixin` (seção 6) adiciona um carimbo `deleted_at` quando você precisa de auditoria temporal. --- ## 4. Filtros por convenção Todos os métodos que recebem `filters: dict[str, Any]` passam pelo mesmo motor. Um valor `None` **sempre pula** a condição (filtro ausente ≠ `WHERE col IS NULL`). As convenções: | Chave / valor | SQL gerado | Exemplo | | --- | --- | --- | | `name` (str) | `ILIKE %value%` case-insensitive | `{"name": "ana"}` | | `bool` | `col.is_(value)` | `{"is_active": True}` | | `list` | `col.in_(values)` | `{"id": [id1, id2]}` | | `date` | `func.date(col) == value` (dia inteiro) | `{"created_at": hoje}` | | `start_in` / `end_in` (date) | range no `date`/`created_at` | `{"start_in": d1, "end_in": d2}` | | `__` | comparação `gt`/`gte`/`lt`/`lte`/`ne` | `{"updated_at__gt": marca}` | | qualquer outra coluna | `col == value` | `{"email": "a@b.com"}` | ```python # "ativos atualizados depois da marca d'água" — precisão de timestamp changed = await repository.list({ "is_active": True, "updated_at__gt": watermark, }) # "criados entre duas datas" — dia inteiro report = await repository.list({"start_in": inicio, "end_in": fim}) # busca textual + pertinência a um conjunto hits = await repository.list({"name": "silva", "id": selected_ids}) ``` !!! info "`start_in`/`end_in` vs `__gt`/`__lt`" `start_in`/`end_in` casam por **dia inteiro** (`func.date`) contra a coluna `date` do modelo (ou `created_at` se não houver). Os sufixos `__op` são **precisos no timestamp** — é o que queries de delta-sync usam. Escolha por precisão. !!! tip "Filtros vêm de um schema, não de strings soltas" Na prática você não monta esse dict à mão. `BasePaginationFilterSchema` (e suas subclasses) expõem `.get_conditions()`, que devolve o dict já limpo de `None`. O router recebe o filtro via `Depends()`. **Recap:** um dict, convenções previsíveis, `None` pula. Strings em `name` viram busca ILIKE; sufixos `__op` dão comparações precisas; `None` nunca vira `IS NULL`. --- ## 5. Operações em lote Para volume, o ORM linha-a-linha é caro. O repository oferece duas famílias: as que **mantêm** a unit-of-work (instâncias atualizadas de volta) e as que a **contornam** (uma única instrução, sem refresh). ```python # Mantém a UoW — instâncias anexadas e atualizadas created = await repository.add_all([m1, m2, m3]) # vários INSERTs, 1 tx updated = await repository.update_many([u1, u2]) # vários UPDATEs, 1 tx # Contorna a UoW — uma instrução, escala melhor (>= 50 linhas) n = await repository.bulk_create_values([ {"name": "A", "email": "a@x.com", "password_hash": "..."}, {"name": "B", "email": "b@x.com", "password_hash": "..."}, ]) # INSERT ... VALUES (...), (...) — devolve nº de linhas n = await repository.bulk_update( filters={"is_active": False}, values={"is_active": True}, ) # UPDATE ... WHERE — devolve nº de linhas afetadas n = await repository.bulk_upsert( rows=[{"sku": "ABC", "price": 10}, {"sku": "DEF", "price": 20}], conflict_columns=["sku"], # precisa de índice UNIQUE update_columns=["price"], # None = atualiza tudo menos PK + conflito ) # INSERT ... ON CONFLICT DO UPDATE — Postgres e SQLite ``` !!! warning "`bulk_update` recusa filtro vazio" Passar `filters={}` levanta `ValueError` — é a trava contra um UPDATE acidental na tabela inteira. Para realmente atualizar todas as linhas, passe uma condição explícita sempre verdadeira. !!! danger "`bulk_*` não atualiza a sessão" `bulk_create_values`, `bulk_update` e `bulk_upsert` emitem uma instrução crua e **não** refrescam nem anexam instâncias à sessão. Use quando você não precisa dos objetos ORM de volta. Se precisar das instâncias, use `add_all` / `update_many`. !!! note "`bulk_upsert` é específico de dialeto" Postgres e SQLite têm upsert nativo. Outros dialetos levantam `NotImplementedError` — caia para um loop `SELECT FOR UPDATE` + `UPDATE`. **Recap:** `add_all`/`update_many` quando você quer as instâncias de volta; `bulk_*` quando quer throughput. Filtro vazio em `bulk_update` é erro proposital. --- ## 6. Soft-delete e auditoria (mixins) Os mixins são **opt-in**: você os mistura ao lado de `BaseModel` só quando o domínio pede. `SoftDeleteMixin` adiciona `deleted_at` (+ `mark_deleted()` / `mark_restored()` / `is_deleted`). `AuditMixin` adiciona `created_by` / `updated_by` (+ `stamp_created_by` / `stamp_updated_by`). ```python # src/db/models/user.py from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import AuditMixin, BaseModel, SoftDeleteMixin class UserModel(BaseModel, SoftDeleteMixin, AuditMixin): """Users — soft-deletable and audited.""" name: Mapped[str] = mapped_column() email: Mapped[str] = mapped_column(unique=True) password_hash: Mapped[str] = mapped_column() ``` A filtragem é responsabilidade de quem chama — o mixin **não** instala um filtro global. Esconda linhas soft-deleted passando `deleted_at=None`, ou filtrando na subclasse. Carimbar auditoria pertence ao service, onde o usuário atual está em escopo: ```python # src/services/user.py from uuid import UUID from sqlalchemy import select from tempest_fastapi_sdk import BaseService from src.db.models import UserModel from src.db.repositories import UserRepository from src.schemas import UserResponse, UserUpdateSchema class UserService(BaseService[UserRepository, UserResponse]): """Business logic for the user domain.""" async def list_alive(self) -> list[UserResponse]: """Return only rows where ``deleted_at IS NULL``. ``_apply_filters`` skips ``None`` by design (filtro ausente != ``IS NULL``), so the ``IS NULL`` clause must be issued as a raw SQLAlchemy query bound to the same session. Returns: list[UserResponse]: The alive users. """ result = await self.repository.session.execute( select(UserModel).where(UserModel.deleted_at.is_(None)) ) instances = result.scalars().all() return [self.repository.map_to_response(i) for i in instances] async def update( self, user_id: UUID, data: UserUpdateSchema, *, actor_id: UUID, ) -> UserResponse: """Apply a partial update and stamp ``updated_by`` with the actor. Args: user_id (UUID): Primary key of the row to update. data (UserUpdateSchema): The partial payload. actor_id (UUID): The acting user, written to ``updated_by``. Returns: UserResponse: The updated user. """ instance = await self.repository.get_by_id(user_id) instance.update_from_dict(data.model_dump(exclude_unset=True)) instance.stamp_updated_by(actor_id) updated = await self.repository.update(instance) return self.repository.map_to_response(updated) ``` !!! tip "Dois carimbos de delete, propósitos diferentes" Use `repository.soft_delete(id)` (flag `is_active`) quando o booleano já basta. Use os helpers do `SoftDeleteMixin` (`mark_deleted` → `deleted_at`) quando precisa **saber quando** o delete aconteceu — auditoria, políticas de retenção. !!! info "MFA é outro mixin opt-in" `MFAMixin` adiciona `totp_secret` / `totp_enabled_at` ao modelo de usuário quando o projeto liga o fluxo MFA bundled. Detalhes em [MFA (TOTP / 2FA) »](mfa.md). **Recap:** mixins entram só quando o domínio precisa; a filtragem de soft-delete é sua (`deleted_at IS NULL` via query crua); o carimbo de auditoria mora no service. --- ## 7. Paginação O SDK pagina de duas formas, **ambas embutidas no repository**. Você quase nunca escreve a query de paginação à mão. ### Offset — quando o cliente quer "página 3 de 12" ```python # src/db/repositories/user.py — método de conveniência from typing import Any from tempest_fastapi_sdk import BasePaginationSchema from src.schemas import UserResponse UserPage = BasePaginationSchema[UserResponse] class UserRepository(BaseRepository[UserModel]): # ... __init__ + mappers ... async def list_page( self, *, filters: dict[str, Any] | None = None, page: int = 1, page_size: int = 20, ) -> UserPage: """Return one offset-paginated page of users. Args: filters (dict[str, Any] | None): Filter conditions. page (int): 1-indexed page number. page_size (int): Items per page. Returns: UserPage: Items + total + page metadata. """ result = await self.paginate( filters=filters, page=page, page_size=page_size, ) return UserPage( items=[self.map_to_response(i) for i in result["items"]], total=result["total"], page=result["page"], page_size=result["page_size"], pages=result["pages"], ) ``` `BaseRepository.paginate` devolve um `dict` com `items` / `total` / `page` / `page_size` / `pages`. O total é computado da **mesma** query filtrada, então joins custom ainda reportam total correto. Quando `order_by` é `None`, ordena por `created_at desc`. !!! tip "Encaminhe o schema sem desempacotar à mão" O par `get_conditions()` / `get_pagination_conditions()` cobre os dois lados do filtro: o primeiro devolve só os filtros de domínio, o segundo só as chaves de paginação (`page`, `page_size`, `order_by`, `ascending`). Assim o service repassa o filtro direto, sem `**f` — que vazaria filtros de domínio (`is_active`, etc.) como kwargs que o repository não aceita: ```python data = await repo.paginate( filters=f.get_conditions(), **f.get_pagination_conditions(), ) ``` `CursorPaginationFilterSchema` tem o mesmo par (com `cursor` / `limit` no lugar de `page` / `page_size`). ### Cursor — quando a tabela é grande A paginação por cursor escala melhor que offset em tabelas grandes (sem `COUNT(*)`, estável sob inserts concorrentes) ao custo de perder acesso aleatório. **Já está pronta** em `cursor_paginate` — ordena por `(order_by, id)` e codifica o cursor opaco automaticamente: ```python # src/db/repositories/user.py from typing import Any from tempest_fastapi_sdk import CursorPaginationSchema from src.schemas import UserResponse UserCursorPage = CursorPaginationSchema[UserResponse] class UserRepository(BaseRepository[UserModel]): # ... __init__ + mappers ... async def cursor_page( self, *, cursor: str | None = None, limit: int = 20, ascending: bool = False, filters: dict[str, Any] | None = None, ) -> UserCursorPage: """Return one cursor-paginated page of users. Args: cursor (str | None): Opaque cursor from the previous page. limit (int): Max items in the page. ascending (bool): Sort direction. filters (dict[str, Any] | None): Filter conditions. Returns: UserCursorPage: Items + next_cursor + has_more. """ result = await self.cursor_paginate( filters=filters, cursor=cursor, limit=limit, order_by="created_at", ascending=ascending, ) return UserCursorPage( items=[self.map_to_response(i) for i in result["items"]], next_cursor=result["next_cursor"], has_more=result["has_more"], limit=result["limit"], ) ``` Router, com o filtro vindo de um schema via `Depends()`: ```python # src/api/routers/user.py from fastapi import APIRouter, Depends from tempest_fastapi_sdk import CursorPaginationFilterSchema from src.api.dependencies.resources import SessionDep from src.db.repositories import UserCursorPage, UserRepository router = APIRouter(prefix="/api/users", tags=["users"]) class UserCursorFilter(CursorPaginationFilterSchema): """Cursor filter for the user listing.""" name: str | None = None # ILIKE %value% pela convenção do repository @router.get("/", response_model=UserCursorPage) async def list_users( session: SessionDep, f: UserCursorFilter = Depends(), ) -> UserCursorPage: """List users, cursor-paginated.""" repository = UserRepository(session) return await repository.cursor_page( cursor=f.cursor, limit=f.limit, ascending=f.ascending, filters=f.get_conditions(), ) ``` !!! info "O cursor é opaco" `next_cursor` é JSON em base64 url-safe. O cliente nunca o inspeciona; ele devolve o valor literalmente até `next_cursor` virar `null`. Por baixo, `cursor_paginate` usa `encode_cursor`/`decode_cursor` e uma comparação de tupla `(order_by, id)` estável no Postgres. !!! tip "Para sincronização offline-first, há um terceiro modo" `changes_since` + `SyncPaginationSchema` fazem paginação de delta (rows alteradas desde uma marca d'água). Veja [Offline sync »](offline-sync.md). **Recap:** `paginate` (offset) para navegação por página; `cursor_paginate` para feeds/tabelas grandes. Ambos prontos — você só mapeia o resultado para o schema de resposta. --- ## 8. Migrações Alembic `AlembicHelper` embrulha o Alembic com uma config curada (timezone UTC, arquivos com prefixo de data, `target_metadata` já ligado, modo batch). Fluxo completo: bootstrap → revisão → aplicar → gate de CI. ### Bootstrap, uma vez por projeto ```python # scripts/alembic_init.py from tempest_fastapi_sdk import AlembicHelper from src.core.settings import settings helper = AlembicHelper(config_path="alembic.ini", db_url=settings.DATABASE_URL) helper.init( directory="alembic", metadata_module="src.db.models", # expõe BaseModel metadata_attr="BaseModel", db_url=settings.DATABASE_URL, ) ``` ```bash uv run python scripts/alembic_init.py ``` Cria: ```text alembic.ini # config curada pelo SDK (UTC, prefixo de data, post-write hooks) alembic/ ├── env.py # template do SDK (target_metadata, compare_type, batch) ├── script.py.mako └── versions/ ``` ### Gerar revisões ```python # scripts/make_migration.py import sys from tempest_fastapi_sdk import AlembicHelper from src.core.settings import settings helper = AlembicHelper("alembic.ini", db_url=settings.DATABASE_URL) helper.revision(message=sys.argv[1], autogenerate=True) ``` ```bash uv run python scripts/make_migration.py "add users table" ``` O arquivo cai em `alembic/versions/2026_05_16_1432-ae12cd34_add_users_table.py` — o prefixo de data ordena cronologicamente e torna conflitos de merge óbvios. !!! check "Migrações já saem lint-clean" O `alembic.ini` que o `init()` escreve inclui `[post_write_hooks]` que roda `ruff check --fix` e depois `ruff format` em cada revisão. Sem isso, os arquivos do Alembic falham no `tempest lint` (`W291` no `Revises:` vazio, `E501` em `sa.Column(...)` longas). Os hooks usam a config de `ruff` do **seu** projeto. Requer `ruff` no `PATH` — já é dependência de dev em todo scaffold `tempest new`. ### Aplicar no startup ```python # src/api/app.py — dentro do lifespan import asyncio from tempest_fastapi_sdk import AlembicHelper from src.api.dependencies.resources import db from src.core.settings import settings @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: """Run pending migrations, then serve.""" helper = AlembicHelper("alembic.ini", db_url=settings.DATABASE_URL) await asyncio.to_thread(helper.upgrade) await db.connect() yield await db.disconnect() ``` !!! warning "Migrações destrutivas: use `safe_upgrade`" `helper.pending_destructive_ops()` lista DROPs de coluna/tabela pendentes; `helper.safe_upgrade()` levanta `DestructiveMigrationError` em vez de apagar dados silenciosamente. O guia completo de deploy (migração + shutdown gracioso) está em [Deploy seguro »](deploy-safety.md). ### Gate de CI — o schema deve casar com os modelos ```python # scripts/check_migrations.py import sys from tempest_fastapi_sdk import AlembicHelper from src.core.settings import settings helper = AlembicHelper("alembic.ini", db_url=settings.DATABASE_URL) if not helper.check(): print("Schema drift detected — run make_migration.py and commit.") sys.exit(1) print("Schema is in sync.") ``` ```yaml # .github/workflows/ci.yml - name: Check migrations are in sync run: uv run python scripts/check_migrations.py ``` !!! info "Colunas base sempre primeiro" O `env.py` do SDK instala o hook `reorder_base_columns_first`, então toda migração gerada lista `id` / `is_active` / `created_at` / `updated_at` antes das suas colunas — diffs consistentes entre pessoas. !!! check "Coluna `NOT NULL` nova não explode mais (v0.67.0)" Adicionar uma coluna `NOT NULL` numa tabela que **já tem linhas** estoura no Postgres com `NotNullViolationError: column "x" contains null values` — porque um `default=` Python só roda no insert do ORM, nunca como DDL. O SDK agora instala um segundo hook, `backfill_non_nullable_defaults`: toda coluna adicionada que seja `nullable=False`, **sem** `server_default`, mas **com** um `default` escalar no model, recebe um `server_default` derivado desse default — então a migração gerada backfilla as linhas existentes na mesma instrução. ```python # No model — só o default Python: is_professional: Mapped[bool] = mapped_column(default=False) ``` ```python # A migração gerada agora sai assim (note o server_default): op.add_column( "users", sa.Column( "is_professional", sa.Boolean(), nullable=False, server_default=sa.text("false"), ), ) ``` Cobre `bool` / `int` / `float` / `str` / `Enum` (usa `.value`). **Não** age quando o default é callable (`uuid4`, `func.now()`) ou inexistente — esses precisam de uma migração de dados escrita à mão, porque o SDK não tem como inferir um valor de backfill seguro. Já tem um `env.py` antigo? Atualize o import + wiring para os dois hooks compostos: ```python # alembic/env.py from tempest_fastapi_sdk.db.alembic_hooks import ( backfill_non_nullable_defaults, compose_hooks, reorder_base_columns_first, ) _process_revision_directives = compose_hooks( reorder_base_columns_first, backfill_non_nullable_defaults, ) # ...e passe-o em context.configure(process_revision_directives=...) ``` Para uma migração **já gerada** que estourou, adicione o `server_default=sa.text("...")` na mão no `op.add_column` (ou backfille + `alter_column` para remover o default depois). **Recap:** `init` uma vez, `revision --autogenerate` por mudança, `upgrade` no startup, `check` no CI, `safe_upgrade` para proteger dados. --- ## 9. Detectando queries lentas `SlowQueryLogger` registra um listener na engine e emite uma linha de log para toda instrução acima de um limiar. Anexe uma vez no boot: ```python # src/api/app.py — depois de db.connect() from tempest_fastapi_sdk.db import SlowQueryLogger from src.api.dependencies.resources import db @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: """Connect, instrument slow queries, then serve.""" await db.connect() slow = SlowQueryLogger(db.engine, threshold_ms=200.0) slow.attach() yield await db.disconnect() ``` | Parâmetro | Padrão | Para quê | | --- | --- | --- | | `threshold_ms` | `500.0` | Instruções neste tempo ou acima são logadas. | | `level` | `logging.WARNING` | Nível das linhas de slow-query. | | `log_parameters` | `False` | Inclui os bind params na linha. **Só em dev** — podem carregar PII. | | `explain` | `False` | Roda `EXPLAIN` e anexa o plano. Custa um round-trip por query lenta. | !!! danger "`log_parameters=True` só em desenvolvimento" Os bind parameters podem conter segredos e PII. Mantenha `False` em produção — o padrão já é seguro. **Recap:** `SlowQueryLogger(db.engine, threshold_ms=...).attach()` no lifespan transforma queries lentas em linhas de log acionáveis, com `EXPLAIN` opcional para investigar planos. --- ## Próximos passos Esta página cobriu o núcleo. Os recursos avançados de banco têm receitas dedicadas: - [Multi-tenant »](multi-tenant.md) — `TenantScopedRepository` para isolamento por tenant. - [Audit trail »](audit-trail.md) — `BaseAuditLogModel`, `add_audited` / `update_audited` / `delete_audited` (quem mudou o quê, na mesma tx). - [Outbox transacional »](outbox.md) — `BaseOutboxModel` + `OutboxRelay`, `save_with_outbox` para publicar eventos atomicamente com a escrita. - [Offline sync »](offline-sync.md) — `changes_since` + paginação de delta para clientes offline-first. - [Deploy seguro »](deploy-safety.md) — migrações destrutivas + shutdown gracioso. - [Testes »](testing.md) — SQLite em memória, fixtures, `create_tables`. --- # Multi-tenant (TenantScopedRepository) Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/multi-tenant/ # Multi-tenant (TenantScopedRepository) Num banco multi-tenant de **schema compartilhado**, as linhas de todos os tenants moram na mesma tabela, separadas por uma coluna `tenant_id`. O perigo é claro: esquecer **um** `WHERE tenant_id = ?` e o tenant A lê (ou apaga) os dados do tenant B. `TenantScopedRepository` tira esse risco da mesa — você amarra o `tenant_id` na construção e ele injeta o filtro em **toda** leitura e carimba em **toda** escrita. Os call sites não têm como errar. !!! info "Onde isso encaixa" É um `BaseRepository` que se comporta igualzinho — mesma API ([Banco de dados](database.md)). A diferença é invisível pro chamador: o escopo de tenant é automático. ## 1. O modelo precisa da coluna de tenant ```python from uuid import UUID from sqlalchemy import String, Uuid from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import BaseModel class OrderModel(BaseModel): """Pedido — isolado por tenant.""" __tablename__ = "order" tenant_id: Mapped[UUID] = mapped_column(Uuid(), nullable=False, index=True) total: Mapped[int] = mapped_column(nullable=False) ``` ## 2. Construa o repositório amarrado ao tenant O `tenant_id` normalmente vem do JWT / sessão / header do request: ```python from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import TenantScopedRepository from src.db.models import OrderModel def get_order_repo( session: AsyncSession, tenant_id: UUID ) -> TenantScopedRepository[OrderModel]: """Repositório de pedidos travado no tenant do request.""" return TenantScopedRepository(session, model=OrderModel, tenant_id=tenant_id) ``` Se o modelo não tiver a coluna `tenant_id`, o construtor levanta `AttributeError` na hora — você descobre o erro no boot, não em produção. Coluna com outro nome? Passe `tenant_field="org_id"`. ## 3. Use como qualquer repositório Toda leitura já vem filtrada; toda escrita já vem carimbada: ```python from uuid import UUID from tempest_fastapi_sdk import TenantScopedRepository from src.db.models import OrderModel async def list_orders(repo: TenantScopedRepository[OrderModel]) -> list[OrderModel]: """Só os pedidos DESTE tenant — sem WHERE manual.""" return await repo.list() # WHERE tenant_id = async def create_order( repo: TenantScopedRepository[OrderModel], total: int ) -> OrderModel: """tenant_id é carimbado automaticamente no insert.""" return await repo.add(OrderModel(total=total)) ``` ### Acesso cruzado é impossível, mesmo por id `get_by_id`, `delete` e `delete_batch` também são escopados. Um id de outro tenant simplesmente **não casa** — é indistinguível de uma linha que nunca existiu: ```python from uuid import UUID from tempest_fastapi_sdk import TenantScopedRepository from src.db.models import OrderModel async def fetch(repo: TenantScopedRepository[OrderModel], order_id: UUID) -> OrderModel: """Levanta NotFound se o pedido for de OUTRO tenant — sem vazar existência.""" return await repo.get_by_id(order_id) ``` `delete_many({})` apaga só as linhas **deste** tenant, nunca a tabela inteira — o predicado de tenant é sempre adicionado. !!! warning "Queries cruas são responsabilidade sua" Uma subclasse que monta o próprio `select(...)` sem passar por `_apply_filters` — ou um `query=` pré-montado passado pro `paginate` — **não** é escopado automaticamente. Nesses casos, adicione `.where(self.tenant_column == self.tenant_id)` você mesmo. ## Recap - O modelo declara `tenant_id` (ou outra coluna, via `tenant_field=`). - `TenantScopedRepository(session, model=..., tenant_id=...)` injeta o filtro em toda leitura e carimba toda escrita. - `get_by_id` / `delete` / `delete_batch` / `delete_many` são escopados — acesso cruzado entre tenants é impossível pelos métodos do repositório. - Construtor valida a existência da coluna de tenant no boot. --- # Audit trail Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/audit-trail/ # Audit trail `AuditMixin` guarda **quem** mexeu por último (`created_by` / `updated_by`) e o `BaseModel` guarda **quando** (`created_at` / `updated_at`). Nenhum dos dois guarda o **histórico** das mudanças. O audit trail adiciona um log append-only: uma linha por create / update / delete, com o ator, a ação e um diff antes/depois das colunas alteradas. A linha de auditoria é gravada na **mesma transação** da mudança (reusa a maquinaria do outbox), então uma entrada de auditoria nunca referencia uma mudança que foi revertida. ## A tabela de auditoria Subclasse `BaseAuditLogModel` e escolha um `__tablename__` (`audit_log` por convenção), igual ao `BaseOutboxModel`: ```python from tempest_fastapi_sdk import BaseAuditLogModel class AuditLogModel(BaseAuditLogModel): """Log append-only de mutações por entidade.""" __tablename__ = "audit_log" ``` Herda os quatro campos canônicos (`id`, `is_active`, `created_at`, `updated_at`) mais: `entity` (nome do model), `entity_id` (id da linha, como texto), `action` (`AuditAction`), `actor` (quem fez, ou `None`), `changes` (o diff em JSON) e `context` (metadados opcionais — request id, ip, motivo). ## Ligando no repository Passe `audit_model=` no repository e use as variantes auditadas. Elas gravam a linha de negócio **e** a de auditoria juntas: ```python from typing import Any from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import BaseRepository from src.db.models import AuditLogModel, ProductModel class ProductRepository(BaseRepository[ProductModel]): """Repository de produtos com trilha de auditoria.""" def __init__(self, session: AsyncSession) -> None: """Inicializa o repository. Args: session (AsyncSession): A sessão async do banco. """ super().__init__(session, model=ProductModel, audit_model=AuditLogModel) ``` ### Create ```python product = await repo.add_audited(ProductModel(name="Widget"), actor=str(user.id)) # grava o produto + uma entrada CREATE com {"after": {...}} ``` ### Update — tire um snapshot antes de mutar `update_audited` precisa do estado **anterior** para calcular o diff. Tire o snapshot com `repo.snapshot(...)` antes de alterar a instância: ```python async def rename_product(repo: ProductRepository, product_id: UUID, name: str) -> None: """Renomeia um produto registrando o diff na auditoria. Args: repo (ProductRepository): O repository de produtos. product_id (UUID): O id do produto. name (str): O novo nome. Raises: NotFoundException: Se o produto não existe. """ product = await repo.get_by_id(product_id) before = repo.snapshot(product) # ← antes de mutar product.name = name await repo.update_audited(product, before, actor=str(user.id)) # grava uma entrada UPDATE com {"name": {"before": "...", "after": "..."}} ``` ### Delete ```python await repo.delete_audited(product, actor=str(user.id)) # apaga a linha + grava uma entrada DELETE com {"before": {...}} ``` !!! warning "Mesma transação" As três variantes commitam a linha de negócio e a de auditoria **juntas**. Se a auditoria falhar, a mudança é revertida — nunca fica meia gravada. Repositories sem `audit_model` levantam `RuntimeError` ao chamar os métodos auditados. ## Helpers avulsos Fora do repository, `snapshot_model(instance)` e `diff_snapshots(before, after)` ficam disponíveis, e `BaseAuditLogModel.for_create / for_update / for_delete` constroem a entrada (sem adicionar à sessão) quando você quer controlar a gravação manualmente. ## Recapitulando - `BaseAuditLogModel` (subclasse com `__tablename__`) + `AuditAction`. - `repo = Repository(session, model=..., audit_model=AuditLogModel)`. - `add_audited` / `update_audited(model, before)` / `delete_audited` — negócio + auditoria na mesma tx. - `repo.snapshot(model)` antes de mutar; `snapshot_model` / `diff_snapshots` para uso manual. --- # Camada HTTP Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/http/ # Camada HTTP Middlewares, dependências, routers e composição de middleware para a superfície da API. ## Bootstrap da aplicação [A seção 2 do tutorial](../tutorial.md#2-settings-server-factory-do-app-entrypoint) mostra o `create_app()` mínimo. Esta receita é a versão **estendida**, conectando tudo que `tempest_fastapi_sdk.api` entrega — exception handlers, CORS, middleware de request-ID, o health router com checks extras, uma dependência de token de segredo compartilhado e um manager extra de Redis — tudo a partir da mesma localização canônica `src/api/app.py`. O padrão de bootstrap continua idêntico; só o conteúdo de `create_app()` cresce. ```python # src/api/app.py from collections.abc import AsyncIterator from contextlib import asynccontextmanager from fastapi import Depends, FastAPI from tempest_fastapi_sdk import ( AsyncDatabaseManager, RequestIDMiddleware, apply_cors, configure_logging, make_health_router, make_token_dependency, register_exception_handlers, ) from tempest_fastapi_sdk.cache import AsyncRedisManager from src.core.settings import settings configure_logging(level=settings.LOG_LEVEL, json_output=settings.LOG_JSON) db = AsyncDatabaseManager( settings.DATABASE_URL, echo=settings.DATABASE_ECHO, pool_size=settings.DATABASE_POOL_SIZE, max_overflow=settings.DATABASE_MAX_OVERFLOW, pool_recycle=settings.DATABASE_POOL_RECYCLE, ) redis = AsyncRedisManager(settings.REDIS_URL) require_token = make_token_dependency(settings.TOKEN_SECRET) @asynccontextmanager async def lifespan(_: FastAPI) -> AsyncIterator[None]: await db.connect() await redis.connect() try: yield finally: await redis.disconnect() await db.disconnect() def create_app() -> FastAPI: """Build and configure the FastAPI app.""" app = FastAPI( title="my-service", version=settings.VERSION, lifespan=lifespan, ) app.add_middleware(RequestIDMiddleware) apply_cors(app, settings) register_exception_handlers(app) # Meta endpoints at the root prefix. app.include_router( make_health_router( db=db, checks={"redis": redis.health_check}, version=settings.VERSION, ), ) # Business endpoints under /api/, guarded by the shared secret. from src.api.routers import users app.include_router( users.router, prefix="/api", dependencies=[Depends(require_token)], ) return app app = create_app() ``` Pontos-chave: - `src/server.py` e `main.py` (one-liner) ficam exatamente como na [seção 2 do tutorial](../tutorial.md#2-settings-server-factory-do-app-entrypoint) — só `create_app()` muda quando você adiciona primitivos. Nunca inicie o uvicorn via `subprocess.run(["uvicorn", ...])`; sempre importe `app` de `src.api.app` ou chame `uvicorn.run("src.api.app:app", ...)` programaticamente de `src/server.py`. - `RequestIDMiddleware` lê/escreve `X-Request-ID` e semeia `request_id_ctx` para que toda linha de log emitida durante a requisição carregue o ID de correlação. - `apply_cors(app, settings)` lê os defaults de `CORSSettings`; passe overrides nomeados para mudanças pontuais. - `register_exception_handlers(app)` conecta três handlers, cada um com seu nível de log: - `AppException` → envelope `{detail, code, details}` + log `INFO` (4xx) ou `ERROR` + traceback + `500.log` (5xx). - `HTTPException` → mantém o body padrão do Starlette (`{"detail"}`) em 4xx com log `INFO`; em 5xx aplica o envelope SDK + traceback + `500.log`. - `Exception` (catch-all) → envelope SDK + traceback + `500.log` (corrige o default do Starlette, que devolve só `"Internal Server Error"` sem log). Todos os handlers respeitam `RequestIDMiddleware`: a linha de log carrega o `request_id`, e o envelope expõe ele em `details` para correlacionar com o cliente. Passe `log_traceback=False` se um APM (Sentry, OpenTelemetry) já estiver capturando a trace. - `make_health_router(db=db, checks={"redis": redis.health_check}, version=...)` monta `GET /health/liveness` e `GET /health/readiness` (retorna `503` quando algum check falha) no prefixo raiz. - `make_token_dependency(secret)` retorna uma dependência async que valida `X-Token` via `hmac.compare_digest`; passe uma string vazia para desabilitar no dev. A dependência vive ao lado do resto da cola de auth em `src/api/dependencies/auth.py` quando crescer além do one-liner acima. ### Mensagens de erro localizadas (i18n) Por padrão o `detail` do envelope é a mensagem literal da exceção (em inglês nos built-ins). Para devolver a mensagem **no idioma do cliente** sem traduzir em cada `raise`, passe um `MessageCatalog` para `register_exception_handlers`: ```python # src/api/app.py from tempest_fastapi_sdk import default_message_catalog, register_exception_handlers def create_app() -> FastAPI: app = FastAPI(...) register_exception_handlers( app, catalog=default_message_catalog(), # ← PT-BR + EN-US embutidos default_locale="pt-BR", ) ... ``` O handler negocia o locale a partir do header `Accept-Language` (ordenado por `q`), cai em `default_locale` quando nada casa, e resolve a **chave** da exceção — `message_key` se definido, senão o `code` — contra o catálogo. Sem catálogo, ou quando a chave não existe, mantém o `detail` literal (zero quebra de compatibilidade). ```python # Mesmo NotFoundException, idioma decidido pelo Accept-Language do cliente: # Accept-Language: pt-BR → {"detail": "Recurso não encontrado", "code": "NOT_FOUND"} # Accept-Language: en-US → {"detail": "Resource not found", "code": "NOT_FOUND"} ``` Para códigos de domínio (e mensagens com parâmetros), estenda o catálogo com `merge` e passe `message_params` no `raise`: ```python # src/core/i18n.py from tempest_fastapi_sdk import MessageCatalog, default_message_catalog CATALOG: MessageCatalog = default_message_catalog().merge( { "pt-BR": {"USER_NOT_FOUND": "Usuário {email} não encontrado"}, "en-US": {"USER_NOT_FOUND": "User {email} not found"}, } ) ``` ```python # src/services/user.py from tempest_fastapi_sdk import NotFoundException def require_user(email: str) -> None: """Raise a localized 404 carrying the offending e-mail. Args: email (str): The e-mail that was not found. Raises: NotFoundException: Always — keyed to ``USER_NOT_FOUND`` so the handler localizes it from the request locale. """ raise NotFoundException( "User not found", # fallback literal code="USER_NOT_FOUND", message_params={"email": email}, ) ``` !!! tip "A chave segue o `code` por padrão" Você raramente passa `message_key` — ele cai no `code` da exceção. Defina `message_key` só quando quiser desacoplar a string traduzida do código de erro. Um template que referencia um parâmetro ausente volta sem interpolar, em vez de estourar. ## Dependências JWT bearer / usuário atual / role Quatro factories de dependência vivem em `tempest_fastapi_sdk.api.dependencies.auth` — escolha o nível de abstração que você precisa. | Factory | O que você ganha | | --- | --- | | `make_token_dependency(secret)` | Valida o header de segredo compartilhado `X-Token` (tempo constante). | | `make_bearer_token_dependency(tokens, soft=False)` | Decodifica `Authorization: Bearer ` e retorna o dict de claims. | | `make_jwt_user_dependency(tokens, user_loader, soft=False, subject_claim="sub")` | Decodifica o bearer JWT, aguarda `user_loader(subject)`, retorna o usuário carregado. | | `make_role_dependency(tokens, ["admin"], require_all=False, roles_claim="roles")` / `make_permission_dependency(tokens, ["users:write"], require_all=True, permissions_claim="permissions")` | Decodifica o bearer JWT e controla a rota por roles / permissões. | !!! tip "Usa o flow bundled? Pule o `load_user`" Se você monta auth com `UserAuthService` + `make_auth_router`, não precisa escrever `load_user` nem instanciar um `JWTUtils` aqui — chame `auth_service.current_user_dependency()` (e `.current_user_dependency(soft=True)`), que reusa o `JWTUtils` interno do service. Veja a [receita de auth »](auth-flow.md#pegando-o-current_user-da-requisicao). O exemplo abaixo é a montagem manual, pra quando você **não** usa o service. ```python # src/api/dependencies/auth.py from uuid import UUID from tempest_fastapi_sdk import ( JWTUtils, make_bearer_token_dependency, make_jwt_user_dependency, make_permission_dependency, make_role_dependency, ) from src.api.app import db from src.core.settings import settings from src.db.models import UserModel from src.db.repositories import UserRepository tokens = JWTUtils( secret=settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM, ) async def load_user(subject: str) -> UserModel: """Resolve the JWT subject (a UUID string) to a persisted user.""" async with db.get_session_context() as session: repo = UserRepository(session) return await repo.get_by_id(UUID(subject)) require_bearer = make_bearer_token_dependency(tokens) get_current_user = make_jwt_user_dependency(tokens, load_user) get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True) require_admin = make_role_dependency(tokens, ["admin"]) require_users_write = make_permission_dependency(tokens, ["users:write"]) ``` ```python # src/api/routers/users.py from fastapi import APIRouter, Depends from src.api.dependencies.auth import ( get_current_user, require_admin, require_users_write, ) router = APIRouter(prefix="/users", tags=["users"]) @router.get("/me") async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema: return UserResponseSchema.model_validate(current) @router.delete("/{user_id}", dependencies=[Depends(require_admin)]) async def delete_user(user_id: UUID) -> None: ... @router.patch( "/{user_id}/permissions", dependencies=[Depends(require_users_write)], ) async def update_perms(user_id: UUID) -> None: ... ``` `soft=True` retorna `None` em vez de levantar em tokens ausentes/inválidos — útil para endpoints que funcionam tanto autenticados quanto anônimos. `subject_claim` é `"sub"` por padrão, mas pode ser qualquer claim custom (`"user_id"`, `"uid"`, ...). As dependências de role aceitam uma string ou uma lista de strings no claim do JWT; `require_all=True` exige cada role/permissão listada, `False` (default para roles, sobrescrito para permissões) exige qualquer uma. ## Middleware de rate limit `RateLimitMiddleware` é um limitador de janela deslizante — cada chave única (IP do cliente por padrão) é permitida no máximo `max_requests` requisições dentro de cada janela `window_seconds`. Requisições que excedem ganham um `429 Too Many Requests` com um header `Retry-After`. Dois eixos são plugáveis: o **store** (memória ou Redis) e a **chave** (IP, usuário, tenant, API key) — veja abaixo. ```python # src/api/app.py from tempest_fastapi_sdk import RateLimitMiddleware def create_app() -> FastAPI: app = FastAPI(...) app.add_middleware( RateLimitMiddleware, max_requests=120, window_seconds=60.0, exempt_paths=("/health/liveness", "/health/readiness"), ) ... ``` ### Limite por usuário / tenant / API key Por padrão a chave é o IP do cliente. Para limitar **por principal** (usuário autenticado, tenant, API key), passe um `key_func`. O SDK traz factories prontas: | Factory | Chave gerada | Uso | | --- | --- | --- | | `key_by_ip(trusted_header=...)` | `ip:` | Por IP (default). | | `key_by_jwt_subject(jwt)` | `user:` | Por usuário autenticado (claim `sub`). | | `key_by_jwt_claim(jwt, "tenant_id", scope="tenant")` | `tenant:` | Por claim arbitrária do token. | | `key_by_header("x-api-key", scope="apikey")` | `apikey:` | Por valor de header. | !!! warning "O middleware roda antes das dependencies" O `RateLimitMiddleware` executa **antes** das `Depends` do FastAPI resolverem — então o usuário autenticado pela sua dependency de auth ainda não existe quando a chave é calculada. Por isso as factories `key_by_jwt_*` decodificam o bearer **do request cru** (via `JWTUtils.decode_or_none`, sem levantar exceção). Tráfego anônimo cai de volta no IP, então continua limitado. ```python # src/api/app.py from fastapi import FastAPI from tempest_fastapi_sdk import RateLimitMiddleware, key_by_jwt_subject from src.api.dependencies.resources import get_jwt_utils def create_app() -> FastAPI: app = FastAPI(...) app.add_middleware( RateLimitMiddleware, max_requests=600, window_seconds=60.0, key_func=key_by_jwt_subject(get_jwt_utils()), # ← limite por usuário exempt_paths=("/health/liveness", "/health/readiness"), ) return app ``` ### Estado distribuído com Redis O store padrão (`MemoryRateLimitStore`) conta **em processo** — correto para um único worker. Para deploys multi-réplica, passe `store=RedisRateLimitStore(redis)`: cada chave vira um sorted set e um único script Lua poda os expirados, conta e adiciona o novo hit **atomicamente** (sem corrida entre contar e adicionar). Em erro do Redis, `fail_open=True` (default) libera a requisição em vez de derrubar todo mundo. ```python # src/api/app.py from redis.asyncio import Redis from tempest_fastapi_sdk import ( RateLimitMiddleware, RedisRateLimitStore, key_by_jwt_subject, ) from src.api.dependencies.resources import get_jwt_utils def create_app() -> FastAPI: redis: Redis = Redis.from_url("redis://localhost:6379/0") app = FastAPI(...) app.add_middleware( RateLimitMiddleware, max_requests=600, window_seconds=60.0, key_func=key_by_jwt_subject(get_jwt_utils()), store=RedisRateLimitStore(redis), # ← compartilhado entre réplicas exempt_paths=("/health/liveness", "/health/readiness"), ) return app ``` A semântica de janela deslizante é idêntica nos dois stores; só muda onde os contadores vivem. Ainda dá para empurrar o rate limiting para a borda (nginx / Cloudflare / AWS WAF) quando preferir. ## Verificação de assinatura de webhook `WebhookSignatureVerifier` valida webhooks de entrada assinados com HMAC (estilo Stripe / GitHub) e expõe uma dependência FastAPI que lê o corpo cru, checa a assinatura com `hmac.compare_digest` e entrega os bytes do corpo para que o handler da rota possa reparsear sem reler o stream. ```python # src/api/dependencies/webhooks.py from tempest_fastapi_sdk import WebhookSignatureVerifier from src.core.settings import settings github = WebhookSignatureVerifier( secret=settings.GITHUB_WEBHOOK_SECRET, algorithm="sha256", header_name="X-Hub-Signature-256", prefix="sha256=", ) stripe = WebhookSignatureVerifier( secret=settings.STRIPE_WEBHOOK_SECRET, algorithm="sha256", header_name="Stripe-Signature", encoding="hex", ) ``` ```python # src/api/routers/webhooks.py from fastapi import APIRouter, Depends from src.api.dependencies.webhooks import github router = APIRouter(prefix="/webhooks", tags=["webhooks"]) @router.post("/github") async def github_event(body: bytes = Depends(github.dependency())) -> None: payload = json.loads(body) ... ``` Suporta encodings `hex` (default) e `base64`, qualquer algoritmo hashlib garantido entre plataformas, e um `prefix` opcional (ex.: `"sha256="`) removido antes da comparação. Use o imperativo `verifier.verify(body, signature)` de handlers de fila quando a validação acontece fora do pipeline FastAPI. Para provedores que assinam com uma chave privada RSA (Apple App Store, Google Play, serviços enterprise custom), troque `WebhookSignatureVerifier` por `RSAWebhookSignatureVerifier` — mesma superfície `verify(body, signature)`, mas valida a assinatura contra uma chave pública codificada em PEM. Usa `RSASSA-PKCS1-v1_5` sobre SHA-256/384/512 (configurável via `algorithm=`). Requer o pacote `cryptography` (instalado com o extra `[webpush]`). ```python from tempest_fastapi_sdk import RSAWebhookSignatureVerifier apple = RSAWebhookSignatureVerifier( public_key_pem=settings.APPLE_PUBLIC_KEY_PEM, header_name="X-Apple-Signature", algorithm="sha256", ) # Em handlers de fila / fora do FastAPI: ok: bool = apple.verify(raw_body_bytes, base64_signature_header_value) ``` ## Headers Link de paginação `build_pagination_link_header` emite um header `Link` RFC 8288 com os rels `first` / `prev` / `next` / `last` — combine-o com (ou use no lugar de) o wrapper de corpo `BasePaginationSchema` para clientes REST que esperam headers no estilo GitHub. Os query parameters existentes na URL base são preservados. ```python from fastapi import Request, Response from tempest_fastapi_sdk import ( BasePaginationSchema, build_pagination_link_header, ) @router.get("", response_model=list[UserResponseSchema]) async def list_users( request: Request, response: Response, filters: UserFilterSchema = Depends(), controller: UserController = Depends(get_user_controller), ) -> list[UserResponseSchema]: result = await controller.paginate( filters=filters.get_conditions(), order_by=filters.order_by, page=filters.page, page_size=filters.page_size, ascending=filters.ascending, ) page = BasePaginationSchema[UserResponseSchema](**result) response.headers["Link"] = build_pagination_link_header( str(request.url), page=page.page, page_size=page.page_size, pages=page.pages, ) response.headers["X-Total-Count"] = str(page.total) return page.items ``` Ajuste `page_param=` / `size_param=` quando seu serviço usa nomes de query parameter não-padrão (ex.: `offset` / `limit`). Passe `extra_params={"sort": "name"}` para embutir o estado atual de sort/filtro em cada link. ## Router de tool-spec `make_tool_spec_router(spec)` monta um endpoint `GET /tool-spec` expondo um manifesto legível por máquina no prefixo raiz — pensado para ficar ao lado de `/health/liveness` para que callers externos possam descobrir capacidades sem parsear o documento OpenAPI completo. ```python # src/api/app.py from tempest_fastapi_sdk import ( make_health_router, make_tool_spec_router, ) def _tool_spec() -> dict[str, object]: """Computed per request — keeps version + counts in sync with state.""" return { "service": "my-service", "version": settings.VERSION, "tools": [ {"path": "/api/users", "method": "GET", "summary": "List users"}, {"path": "/api/orders", "method": "POST", "summary": "Place order"}, ], } def create_app() -> FastAPI: app = FastAPI(...) app.include_router(make_health_router(db=db)) app.include_router(make_tool_spec_router(_tool_spec)) ... return app ``` Passe um dict (servido literalmente), um callable sync (chamado a cada requisição) ou um callable async (aguardado). Sobrescreva `path=` para expor o manifesto em uma URL diferente ou `tag=` para agrupá-lo sob uma tag OpenAPI diferente. ## Ponto de entrada programático do servidor `run_server` é o helper canônico importado de `src/server.py`. Ele centraliza os defaults de `host` / `port` / `reload` — puxando valores de um objeto `settings` no estilo `ServerSettings` quando presente — e mantém o ponto de entrada em uma única linha. ```python # src/server.py from tempest_fastapi_sdk import run_server from src.api.app import app # noqa: F401 — re-exported for external runners from src.core.settings import settings def run() -> None: """Start the API server programmatically.""" run_server("src.api.app:app", settings=settings) __all__: list[str] = ["app", "run"] ``` ```python # main.py from src.server import run if __name__ == "__main__": run() ``` A ordem de resolução de cada kwarg é `argumento explícito → settings.SERVER_* → default do SDK` (`"127.0.0.1"` / `8000` / `False`). Kwargs extras do uvicorn (`workers=`, `log_config=`, `ssl_*=`) são encaminhados literalmente. ## Composição de mixins de settings `BaseAppSettings` é a base `pydantic-settings` configurada. O SDK também expõe mixins componíveis para as dependências mais comuns; escolha os que o serviço precisa e ponha `BaseAppSettings` no **final** da MRO para que seu `model_config` vença. ```python # src/core/settings.py from pydantic import Field from tempest_fastapi_sdk import ( BaseAppSettings, CORSSettings, DatabaseSettings, EmailSettings, JWTSettings, LogSettings, RabbitMQSettings, RedisSettings, ServerSettings, TaskIQSettings, TokenSettings, UploadSettings, WebPushSettings, ) class Settings( ServerSettings, LogSettings, DatabaseSettings, RedisSettings, RabbitMQSettings, TaskIQSettings, JWTSettings, CORSSettings, EmailSettings, UploadSettings, TokenSettings, WebPushSettings, BaseAppSettings, ): """Service-wide settings.""" VERSION: str = Field(default="0.0.0") settings = Settings() ``` Cada mixin é dono do seu próprio prefixo de env var — escolha só os que o serviço precisa: | Mixin | Env vars | | --- | --- | | `ServerSettings` | `SERVER_HOST`, `SERVER_PORT`, `SERVER_RELOAD`, `SERVER_DEBUG` | | `LogSettings` | `LOG_LEVEL`, `LOG_JSON` | | `DatabaseSettings` | `DATABASE_URL`, `DATABASE_ECHO`, `DATABASE_POOL_SIZE`, `DATABASE_MAX_OVERFLOW`, `DATABASE_POOL_RECYCLE` | | `RedisSettings` | `REDIS_URL`, `REDIS_DECODE_RESPONSES` | | `RabbitMQSettings` | `RABBITMQ_URL`, `RABBITMQ_PREFETCH_COUNT` | | `TaskIQSettings` | `TASKIQ_BROKER_URL`, `TASKIQ_RESULT_BACKEND_URL` | | `JWTSettings` | `JWT_SECRET`, `JWT_ALGORITHM`, `JWT_ACCESS_TTL_SECONDS`, `JWT_REFRESH_TTL_SECONDS`, `JWT_ISSUER` | | `CORSSettings` | `CORS_ORIGINS`, `CORS_ALLOW_CREDENTIALS`, `CORS_ALLOW_METHODS`, `CORS_ALLOW_HEADERS`, `CORS_EXPOSE_HEADERS`, `CORS_MAX_AGE` | | `EmailSettings` | `SMTP_HOST`, `SMTP_PORT`, `SMTP_USERNAME`, `SMTP_PASSWORD`, `SMTP_FROM_ADDR`, `SMTP_USE_TLS`, `SMTP_USE_SSL`, `SMTP_TIMEOUT_SECONDS` | | `UploadSettings` | `UPLOAD_DIR`, `UPLOAD_MAX_SIZE_BYTES`, `UPLOAD_ALLOWED_EXTENSIONS`, `UPLOAD_ALLOWED_MIMETYPES` | | `TokenSettings` | `TOKEN_SECRET` | | `WebPushSettings` | `VAPID_PUBLIC_KEY`, `VAPID_PRIVATE_KEY`, `VAPID_SUBJECT`, `WEBPUSH_DEFAULT_TTL_SECONDS` | > **Mudança que quebra na 0.8.0:** `ServerSettings` antes expunha os campos crus `HOST` / `PORT` / `DEBUG` / `LOG_LEVEL` / `LOG_JSON`. Eles foram renomeados para `SERVER_HOST` / `SERVER_PORT` / `SERVER_RELOAD` / `SERVER_DEBUG`, e `LOG_LEVEL` / `LOG_JSON` migraram para o novo mixin `LogSettings`. Atualize tanto o seu arquivo `.env` (nomes de env var) quanto qualquer código lendo `settings.HOST` etc. ## Autenticação Signup + login + rota protegida de ponta a ponta usando `PasswordUtils` e `JWTUtils`. Requer o extra `[auth]`. #### Conecte os singletons utilitários ```python # src/core/security.py from datetime import timedelta from tempest_fastapi_sdk import JWTUtils, PasswordUtils from src.core.settings import settings passwords = PasswordUtils(rounds=12) tokens = JWTUtils( secret=settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM, default_ttl=timedelta(seconds=settings.JWT_ACCESS_TTL_SECONDS), issuer="my-app", ) ``` #### Signup Reutilize o `UserService.create` definido no tutorial — ele já faz hash da senha. #### Login ```python # src/schemas/auth.py from pydantic import EmailStr from tempest_fastapi_sdk import BaseSchema class LoginSchema(BaseSchema): email: EmailStr password: str class TokenResponseSchema(BaseSchema): access_token: str token_type: str = "bearer" ``` ```python # src/services/auth.py from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import JWTUtils, PasswordUtils, UnauthorizedException from src.db.repositories import UserRepository from src.schemas.auth import LoginSchema, TokenResponseSchema class AuthService: def __init__( self, session: AsyncSession, passwords: PasswordUtils, tokens: JWTUtils, ) -> None: self.repo = UserRepository(session) self.passwords = passwords self.tokens = tokens async def login(self, data: LoginSchema) -> TokenResponseSchema: user = await self.repo.get_or_none({"email": data.email}) if user is None or not self.passwords.verify( data.password, user.password_hash ): # Same error for both cases — don't leak which one failed. raise UnauthorizedException(message="E-mail ou senha inválidos") token = self.tokens.encode({"sub": str(user.id)}) return TokenResponseSchema(access_token=token) ``` ```python # src/api/routers/auth.py from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession from src.api.app import db from src.core.security import passwords, tokens from src.schemas.auth import LoginSchema, TokenResponseSchema from src.services.auth import AuthService router = APIRouter(prefix="/auth", tags=["auth"]) def get_auth_service( session: AsyncSession = Depends(db.session_dependency), ) -> AuthService: return AuthService(session, passwords, tokens) @router.post("/login", response_model=TokenResponseSchema) async def login( data: LoginSchema, service: AuthService = Depends(get_auth_service), ) -> TokenResponseSchema: return await service.login(data) ``` #### Proteja uma rota — dependência JWT Use `make_jwt_user_dependency` para conectar o esquema bearer + decode do JWT + carga do usuário em uma chamada. A única costura é `user_loader(subject)`, um callable async que mapeia o claim de subject do JWT para o seu `UserModel` de domínio. ```python # src/api/dependencies/auth.py from uuid import UUID from tempest_fastapi_sdk import make_jwt_user_dependency from src.api.app import db from src.core.security import tokens from src.db.models import UserModel from src.db.repositories import UserRepository async def load_user(subject: str) -> UserModel: """Resolve the JWT subject (a UUID string) to a persisted user. Opens its own session so the dependency stays request-scope-agnostic (the loader is called once per request, and SDK exceptions raised inside translate to the canonical 401/404 envelope). """ async with db.get_session_context() as session: repo = UserRepository(session) return await repo.get_by_id(UUID(subject)) get_current_user = make_jwt_user_dependency(tokens, load_user) get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True) ``` ```python # Use in any route @router.get("/me", response_model=UserResponseSchema) async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema: return UserResponseSchema.model_validate(current) ``` #### Auth suave (usuário opcional) `get_current_user_or_none` acima já usa `soft=True` — ele retorna `None` em vez de levantar em um token ausente ou inválido, para que endpoints funcionem tanto autenticados quanto anônimos: ```python @router.get("/feed") async def feed( current: UserModel | None = Depends(get_current_user_or_none), ) -> FeedResponseSchema: return await feed_service.list(viewer=current) ``` Por baixo dos panos, `soft=True` chama `tokens.decode_or_none` (sem exceção em tokens expirados/inválidos) e pula o loader quando o subject está ausente. --- ## Upload de arquivos Endpoint de avatar com validação + limpeza. Requer o extra `[upload]`. ```python # src/core/storage.py from tempest_fastapi_sdk import UploadUtils from src.core.settings import settings avatar_storage = UploadUtils( f"{settings.UPLOAD_DIR}/avatars", max_size_bytes=5 * 1024 * 1024, # 5 MiB allowed_extensions={"png", "jpg", "jpeg", "webp"}, allowed_mimetypes={"image/png", "image/jpeg", "image/webp"}, verify_magic_bytes=True, # sniff bytes, reject polyglots ) ``` `verify_magic_bytes=True` lê os primeiros bytes de cada upload e confirma que o arquivo *realmente é* um dos tipos permitidos — um payload HTML+JS enviado como `image/png` é rejeitado mesmo que sua extensão e header `Content-Type` pareçam válidos. Só ative quando todo formato aceito é um que o `sniff_mime` reconhece (JPEG, PNG, GIF, BMP, WebP, PDF); caso contrário, um upload legítimo mas não-snifável seria recusado. Para controle mais fino, passe um predicado `content_validator` para `save()` (`save(file, content_validator=lambda b: sniff_mime(b) in {"image/png"})`), e passe `filename="..."` para um nome determinístico e endereçável (ex.: `f"{user_id}.jpg"`) em vez do UUID padrão. ```python # src/api/routers/users.py (extension) from fastapi import UploadFile from src.api.dependencies import get_user_controller from src.controllers.user import UserController from src.core.storage import avatar_storage @router.post("/{user_id}/avatar", response_model=UserResponseSchema) async def upload_avatar( user_id: UUID, file: UploadFile, current: UserModel = Depends(get_current_user), controller: UserController = Depends(get_user_controller), ) -> UserResponseSchema: if current.id != user_id: raise ForbiddenException(message="Só pode editar o próprio avatar") path = await avatar_storage.save(file, subdir=str(user_id)) return await controller.set_avatar(user_id, str(path)) ``` Adicione `set_avatar` tanto ao service quanto ao controller (o controller fica como um pass-through fino a menos que orquestração seja necessária — ex.: disparar um evento de "avatar atualizado"): ```python # src/services/user.py class UserService: async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema: user = await self.repo.get_by_id(user_id) # Delete previous file when replacing. if user.avatar_path and user.avatar_path != path: await avatar_storage.delete(user.avatar_path) user.avatar_path = path user = await self.repo.update(user) return self.repo.map_to_response(user) # src/controllers/user.py class UserController: async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema: return await self.service.set_avatar(user_id, path) ``` `UploadUtils.save()` levanta `FileTooLargeException` (413) ou `InvalidFileTypeException` (415) na rejeição — o exception handler do SDK já retorna o status code certo com um campo `code` na resposta. #### Servindo o arquivo de volta Uploads em disco local são melhor servidos por um upstream (nginx / Caddy) para que o FastAPI não fique transmitindo bytes. Para dev: ```python from fastapi.staticfiles import StaticFiles app.mount( "/static/uploads", StaticFiles(directory=settings.UPLOAD_DIR), name="uploads", ) ``` Construa a URL pública no schema de resposta: ```python class UserResponseSchema(BaseResponseSchema): name: str email: EmailStr avatar_url: str | None = None @field_validator("avatar_url", mode="before") @classmethod def _absolute_url(cls, value: str | None) -> str | None: if value is None: return None # avatar_path stored as relative path → public URL return f"/static/uploads/{value}" ``` #### Servindo arquivos privados pela API (`DownloadUtils`) Quando um arquivo deve ficar **atrás de auth** — faturas, contratos, exames médicos — uma URL pública `/static` o vaza para qualquer um que descubra o caminho. `DownloadUtils` transmite os bytes pelo próprio endpoint, para que os mesmos `Depends(get_current_user)` / checks de permissão que guardam toda outra rota guardem o download também. Nenhum link público é exposto. Não precisa de **nenhum extra** (usa `FileResponse` / `StreamingResponse` do Starlette, que vêm com o FastAPI). ```python # src/core/storage.py from tempest_fastapi_sdk import DownloadUtils from src.core.settings import settings invoice_files = DownloadUtils(f"{settings.UPLOAD_DIR}/invoices") ``` ```python # src/api/routers/invoices.py from fastapi.responses import FileResponse from src.api.dependencies import get_invoice_controller from src.controllers.invoice import InvoiceController from src.core.storage import invoice_files @router.get("/{invoice_id}/file") async def download_invoice( invoice_id: UUID, current: UserModel = Depends(get_current_user), controller: InvoiceController = Depends(get_invoice_controller), ) -> FileResponse: invoice = await controller.get_by_id(invoice_id) if invoice.owner_id != current.id: raise ForbiddenException(message="Fatura de outro usuário") # base_dir confines the read — a stored "../../etc/passwd" path 404s. return invoice_files.file_response( invoice.file_path, # relative to base_dir filename=f"fatura-{invoice.number}.pdf", as_attachment=True, # force a download dialog ) ``` Qualquer caminho relativo que escape de `base_dir` (traversal `../`, caminhos absolutos, escapes via symlink) levanta `NotFoundException` (404) em vez de vazar o arquivo — o mesmo 404 que você ganha para um arquivo genuinamente ausente, então callers nunca distinguem "proibido" de "ausente". `file_response` adivinha o tipo MIME pelo nome do arquivo (sobrescreva com `media_type=`), e `as_attachment=False` serve **inline** (ex.: pré-visualizar um PDF no navegador). Para payloads construídos na hora — um relatório gerado, um zip em memória, bytes descriptografados — use `stream()` em vez de tocar o disco: ```python import io from fastapi.responses import StreamingResponse from src.core.storage import invoice_files @router.get("/{invoice_id}/receipt.csv") async def download_receipt( invoice_id: UUID, current: UserModel = Depends(get_current_user), controller: InvoiceController = Depends(get_invoice_controller), ) -> StreamingResponse: csv_bytes: bytes = await controller.render_receipt_csv(invoice_id, current.id) return invoice_files.stream( csv_bytes, # bytes, or a (sync/async) byte generator filename="recibo.csv", ) ``` `stream()` aceita `bytes` cru, um `Iterable[bytes]` sync ou um `AsyncIterable[bytes]`, então um export grande pode ser entregue pedaço a pedaço sem bufferizar tudo em memória. Ambos os métodos definem um `Content-Disposition` seguro em UTF-8 (nomes de arquivo não-ASCII sobrevivem via o parâmetro `filename*` da RFC 5987); `build_content_disposition()` é exportado se você precisar definir esse header em uma resposta feita à mão. --- ## E-mail transacional Fluxo de reset de senha usando `EmailUtils` + um JWT de vida curta. Requer o extra `[email]`. ```python # src/core/mailer.py from tempest_fastapi_sdk import EmailUtils from src.core.settings import settings mailer = EmailUtils( host=settings.SMTP_HOST, port=settings.SMTP_PORT, from_addr=settings.SMTP_FROM_ADDR, username=settings.SMTP_USERNAME, password=settings.SMTP_PASSWORD, use_starttls=True, ) ``` ```python # src/services/password_reset.py from datetime import timedelta from tempest_fastapi_sdk import EmailUtils, JWTUtils, NotFoundException from src.db.repositories import UserRepository class PasswordResetService: def __init__( self, repo: UserRepository, tokens: JWTUtils, mailer: EmailUtils, ) -> None: self.repo = repo self.tokens = tokens self.mailer = mailer async def request_reset(self, email: str) -> None: """Send a password-reset link to `email`. Always returns silently — don't reveal whether the email is registered or not (avoids account enumeration). """ user = await self.repo.get_or_none({"email": email}) if user is None: return token = self.tokens.encode( {"sub": str(user.id), "purpose": "password_reset"}, ttl=timedelta(minutes=15), ) reset_url = f"https://my-app.com/reset-password?token={token}" await self.mailer.send( to=user.email, subject="Reset your password", body=f"Click here to reset your password: {reset_url}", html=f'

Click here to reset.

', ) async def consume_reset( self, token: str, new_password: str, passwords: PasswordUtils, ) -> None: # `decode` raises InvalidTokenException / ExpiredTokenException # (both 401). Caught by the SDK handler. payload = self.tokens.decode(token) if payload.get("purpose") != "password_reset": raise InvalidTokenException() user = await self.repo.get_by_id(UUID(payload["sub"])) user.password_hash = passwords.hash(new_password) await self.repo.update(user) ``` --- --- # HTTP client (saída) Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/http-client/ # HTTP client (saída) `HTTPClient` é um wrapper tipado sobre o `httpx.AsyncClient` para **chamar serviços externos** com retry + backoff exponencial, circuit-breaker, timeouts padrão e propagação do `X-Request-ID`. É a contraparte de saída do [middleware HTTP](http.md) (que cuida do tráfego de entrada). Requer o extra `[http]` (`httpx`). ## Uso básico O client é seguro pra compartilhar entre requests no mesmo event loop — ele reusa o connection pool interno. Use como async context manager (ou guarde um singleton em [`resources.py`](../architecture.md) e feche no lifespan). ```python from typing import Any from tempest_fastapi_sdk import HTTPClient client = HTTPClient(base_url="https://api.example.com", timeout=10.0) async def fetch_user(user_id: str) -> dict[str, Any]: """GET /users/{id} no serviço externo.""" async with client: response = await client.get(f"/users/{user_id}") response.raise_for_status() return response.json() ``` Métodos: `get` / `post` / `put` / `patch` / `delete` (e `request` genérico), todos repassando kwargs pro httpx (`json=`, `params=`, `headers=`, ...) e devolvendo um `httpx.Response`. ## Retry + backoff + circuit-breaker Passe um `RetryPolicy` e ajuste os limites do breaker na construção: ```python from tempest_fastapi_sdk import CircuitOpenError, HTTPClient, RetryPolicy client = HTTPClient( base_url="https://api.example.com", timeout=5.0, retry_policy=RetryPolicy( max_attempts=3, # 1 tentativa + 2 retries backoff_initial_seconds=0.5, # 0.5s, 1s, 2s... (exponencial) backoff_max_seconds=8.0, # teto por espera ), failure_threshold=5, # abre o circuito após 5 falhas seguidas recovery_seconds=30.0, # meio-aberto após 30s default_headers={"X-Api-Key": "..."}, propagate_request_id=True, # encaminha o X-Request-ID do request atual ) async def call() -> None: try: async with client: await client.post("/charge", json={"amount": 100}) except CircuitOpenError: # O circuito está aberto — não martele o upstream caído. ... ``` - **Retry**: refeito em erros transitórios (timeouts, 5xx, falhas de conexão) até `max_attempts`, com backoff exponencial limitado por `backoff_max_seconds`. - **Circuit-breaker**: após `failure_threshold` falhas consecutivas o circuito **abre** e as chamadas levantam `CircuitOpenError` imediatamente (sem tocar a rede) até passar `recovery_seconds`, quando entra em meio-aberto pra testar. - **Request-ID**: com `propagate_request_id=True`, o `X-Request-ID` do request em curso (via `RequestIDMiddleware`) é repassado ao upstream, costurando os logs ponta-a-ponta. !!! tip "Guarde como singleton em resources.py" Crie o `HTTPClient` uma vez (em `src/api/dependencies/resources.py`), exponha um `get_http_client`, e feche no lifespan com `await client.aclose()` — assim o connection pool é reaproveitado entre requests. ## Recap - `HTTPClient` = `httpx.AsyncClient` tipado + retry/backoff/circuit-breaker + X-Request-ID. - Extra `[http]`. Métodos `get/post/put/patch/delete/request` → `httpx.Response`. - `RetryPolicy(max_attempts, backoff_initial_seconds, backoff_max_seconds)` controla o retry. - `failure_threshold` / `recovery_seconds` controlam o breaker; `CircuitOpenError` quando aberto. - Compartilhe um singleton e feche com `aclose()` no shutdown. --- # Cache Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/cache/ # Cache Primitivos de cache apoiados em Redis. Requer o extra `[cache]`. ## AsyncRedisManager `AsyncRedisManager` embrulha `redis.asyncio` com a mesma superfície de connect/disconnect/health-check do `AsyncDatabaseManager`. Instale com `[cache]`. ```python # src/api/app.py from collections.abc import AsyncIterator from contextlib import asynccontextmanager from fastapi import FastAPI from tempest_fastapi_sdk.cache import AsyncRedisManager from src.core.settings import settings cache = AsyncRedisManager(**settings.redis_kwargs()) # url + decode_responses @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: await cache.connect() # primeira chamada — sem isso, .client levanta RuntimeError try: yield finally: await cache.disconnect() app = FastAPI(lifespan=lifespan) # Uso direto (dentro de um handler, depois do startup do lifespan) await cache.client.set("user:123:name", "Ana", ex=300) name = await cache.client.get("user:123:name") # Dependência FastAPI — entrega o client ativo. from fastapi import Depends from redis.asyncio import Redis @router.get("/cached") async def cached_endpoint( redis: Redis = Depends(cache.client_dependency), ) -> dict[str, str]: value = await redis.get("greeting") or "hello" return {"value": value} ``` Conecte o health check no router canônico com `make_health_router(checks={"redis": cache.health_check})` para que as readiness probes falhem quando o Redis cair. ## Decorator @cached `@cached(redis, ttl=..., key_prefix=...)` memoiza o resultado de uma função async no Redis. As chaves de cache são derivadas do `__qualname__` da função mais um SHA-256 de args/kwargs; passe `key_prefix=` para dar namespace às entradas. Para invalidar **antes** do TTL, use tags/namespace (abaixo) em vez de scan de prefixo. ```python from tempest_fastapi_sdk.cache import AsyncRedisManager, cached from src.core.settings import settings redis = AsyncRedisManager(settings.REDIS_URL) @cached(redis, ttl=300, key_prefix="users:") async def get_user_profile(user_id: str) -> dict[str, str]: """Hits Redis on warm cache; runs the body once every 5 minutes.""" return await load_from_db(user_id) # Pula o cache (leitura E escrita) seletivamente em algumas chamadas @cached( redis, ttl=60, skip_cache=lambda args, kwargs: kwargs.get("fresh") is True, ) async def list_orders(user_id: str, *, fresh: bool = False) -> list[dict]: ... ``` Defaults: `ttl=300` segundos (`0` desabilita a expiração), `serializer=json.dumps` / `deserializer=json.loads`. Sobrescreva `serializer` / `deserializer` para payloads não-JSON (modelos Pydantic — passe `model_dump_json` / `MyModel.model_validate_json`, ou use `pickle.dumps` / `pickle.loads` para objetos arbitrários). Valores corrompidos no cache caem de volta para rodar a função embrulhada e emitem um warning no logger do SDK. ### Invalidação por tag / namespace Esperar o TTL não serve quando um dado **muda**. Marque cada entrada com um `namespace` (um balde por decorator) e/ou `tags` (rótulos finos, estáticos ou derivados dos argumentos da chamada). No write, a chave da entrada também entra num SET Redis por rótulo; o `CacheInvalidator` apaga todas as entradas de um rótulo de uma vez. ```python from typing import Any from tempest_fastapi_sdk.cache import AsyncRedisManager, CacheInvalidator, cached redis = AsyncRedisManager(settings.REDIS_URL) @cached( redis, ttl=300, key_prefix="users:", namespace="profiles", # balde coarse tags=lambda args, kwargs: [f"user:{kwargs['user_id']}"], # rótulo por usuário ) async def get_profile(*, user_id: int) -> dict[str, Any]: """Lê do banco; fica em cache por usuário.""" return await load_profile(user_id) ``` Quando o usuário muda, o service que faz a mutação derruba só as entradas dele: ```python async def update_profile(user_id: int, data: dict[str, Any]) -> None: """Atualiza o perfil e invalida o cache só desse usuário. Args: user_id (int): O usuário alterado. data (dict[str, Any]): Os campos novos. """ await save_profile(user_id, data) invalidator = CacheInvalidator(redis, key_prefix="users:") # mesmo prefixo! await invalidator.invalidate_tag(f"user:{user_id}") ``` O `CacheInvalidator` expõe `invalidate_namespace(ns)`, `invalidate_tag(tag)`, `invalidate_tags(*tags)` (dedupe entre tags) e `invalidate_keys(*keys)` (chaves cruas) — cada um devolve o número de entradas apagadas. !!! warning "Use o mesmo `key_prefix`" O `CacheInvalidator` precisa do **mesmo `key_prefix`** dos decorators `@cached` que ele invalida — é assim que os nomes dos SETs de registro batem. Os SETs de registro herdam o TTL da entrada, então se auto-limpam depois do membro mais novo expirar; apagar uma chave já expirada é no-op inofensivo. --- # Feature flags Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/feature-flags/ # Feature flags Ligue e desligue funcionalidades **sem redeploy**: rollouts graduais, kill-switches, beta por trás de um flag. O SDK traz um serviço `FeatureFlags` sobre backends plugáveis (env estático, Redis em runtime, ou os dois em camadas) e uma dependência FastAPI que protege rotas. ## Começo rápido ```python from tempest_fastapi_sdk import FeatureFlags, MemoryFeatureFlagBackend flags = FeatureFlags(MemoryFeatureFlagBackend({"new_checkout": True})) if await flags.is_enabled("new_checkout"): ... # caminho novo ``` `is_enabled(name)` devolve o valor do flag, ou o `default` do serviço (`False`) quando o flag não existe. Passe `default=True` por chamada para inverter isso pontualmente. Toggle com `enable` / `disable` / `set`, e liste tudo com `all()`. ## Backends | Backend | Quando usar | | --- | --- | | `MemoryFeatureFlagBackend(initial=...)` | Testes e dev (em processo). | | `EnvFeatureFlagBackend(prefix="FEATURE_")` | Config **estática** — `new_checkout` lê `FEATURE_NEW_CHECKOUT`. Read-only (`set` levanta). | | `RedisFeatureFlagBackend(redis_client, key="feature_flags")` | Toggle em **runtime**, compartilhado entre réplicas (um hash Redis). | | `CompositeFeatureFlagBackend([redis, env])` | Camadas: o override do Redis vence o default do env. | !!! info "Valores aceitos como verdadeiros" `1`, `true`, `yes`, `on`, `t`, `y` (case-insensitive) viram `True`; qualquer outra coisa é `False`. Vale para env e Redis. ### Produção: Redis sobre env O padrão recomendado usa o env como default estático (versionado no deploy) e o Redis como override de runtime (o time liga/desliga sem subir release): ```python from redis.asyncio import Redis from tempest_fastapi_sdk import ( CompositeFeatureFlagBackend, EnvFeatureFlagBackend, FeatureFlags, RedisFeatureFlagBackend, ) def build_flags(redis: Redis) -> FeatureFlags: """Monta o serviço de flags com override Redis sobre default de env. Args: redis (Redis): Cliente async de Redis conectado. Returns: FeatureFlags: O serviço pronto para injetar. """ backend = CompositeFeatureFlagBackend( [ RedisFeatureFlagBackend(redis, key="feature_flags"), # runtime EnvFeatureFlagBackend(prefix="FEATURE_"), # default ] ) return FeatureFlags(backend) ``` ## Protegendo uma rota `make_flag_dependency(flags, name)` devolve uma dependência async que deixa a rota passar só quando o flag está ligado. Caso contrário levanta `AppException` no envelope do SDK — `404` por padrão, para a feature simplesmente "não existir": ```python from fastapi import APIRouter, Depends from tempest_fastapi_sdk import make_flag_dependency from src.api.dependencies.resources import get_flags router = APIRouter() flags = get_flags() @router.get( "/checkout/v2", dependencies=[Depends(make_flag_dependency(flags, "new_checkout"))], ) async def checkout_v2() -> dict[str, bool]: """Só responde quando ``new_checkout`` está ligado.""" return {"ok": True} ``` Para um **kill-switch** de algo legado, inverta o gate com `enabled=False` (a rota só responde enquanto o flag está desligado): ```python @router.get( "/legacy", dependencies=[ Depends(make_flag_dependency(flags, "legacy_disabled", enabled=False)), ], ) async def legacy() -> dict[str, bool]: """Para de responder no instante em que ``legacy_disabled`` é ligado.""" return {"ok": True} ``` `status_code`, `detail` e `code` são configuráveis — use `status_code=403` quando quiser sinalizar "existe mas é proibido" em vez de esconder com `404`. ## Recapitulando - `FeatureFlags(backend, default=False)` — `is_enabled` / `enable` / `disable` / `set` / `all`. - Backends: `Memory` (dev), `Env` (estático read-only), `Redis` (runtime), `Composite` (camadas). - `make_flag_dependency(flags, name, enabled=True, status_code=404)` protege rotas. - Override Redis sobre default de env é o padrão de produção — toggle sem redeploy. --- # Tempo real Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/realtime/ # Tempo real Empurre dados para os clientes sem que o cliente fique fazendo polling. SSE para broadcasts servidor→navegador com a página aberta; para notificações que chegam mesmo com a página fechada, veja a receita dedicada de [Web Push](webpush.md). ## Server-Sent Events (SSE) SSE (servidor → navegador, uma conexão HTTP de longa duração) tem **receita própria** agora — endpoint, broadcast pra vários clientes e alinhamento com o `tempest-react-sdk`: **[Server-Sent Events (SSE) »](sse.md)** ## Notificações Web Push Web Push (notificações que chegam mesmo com a página fechada) tem receita própria — veja **[Web Push »](webpush.md)**. --- # WebSocket router Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/websocket/ # WebSocket router Desde v0.33.0 o SDK fornece `make_websocket_router` + `WebSocketHub` — abstração equivalente a SSE mas **bidirecional**, com bearer auth no handshake, heartbeat ping/pong automático e registro centralizado pra broadcast / per-user / por tópico. ## O que o router resolve WebSocket bare do FastAPI te dá `await ws.receive_json()` / `await ws.send_json()` — só isso. Tudo o que vem depois é boilerplate que **todo** projeto reimplementa: 1. **Auth no handshake** — browser não pode setar header `Authorization` no construtor `new WebSocket(...)`. Sobram dois caminhos: query param (`?token=`) ou subprotocol (`Sec-WebSocket-Protocol: bearer,`). O SDK aceita os dois. 2. **Heartbeat** — load balancers (Nginx, AWS ALB) fecham conexões "ociosas" depois de 60s. Sem ping/pong, o cliente vê a conexão "viva" enquanto o servidor já a perdeu. 3. **Registry compartilhado** — pra fazer `broadcast("orders", payload)` ou `send_to(user_id, payload)` de qualquer handler HTTP, você precisa de uma estrutura global indexada por user_id + tópicos. 4. **Cleanup determinístico** — quando o cliente cai (refresh, fechou aba, perdeu wifi), as estruturas precisam ser limpas senão vazam memória. O `make_websocket_router` resolve os 4 itens; seu handler só vê a conexão pronta + o hub pra fan-out. ## Conteúdo da receita 1. **[Setup mínimo](#setup-minimo)** — wire de 3 objetos (`WebSocketHub`, `bearer_resolver`, `make_websocket_router`). 2. **[Bearer auth — query vs subprotocol](#bearer-auth)** — quando usar cada. 3. **[Cliente JavaScript / browser](#cliente-javascript)** — `new WebSocket(...)` com heartbeat + reconnect. 4. **[Broadcast / send_to / topics](#broadcast)** — fan-out via `WebSocketHub`. 5. **[Heartbeat e codes de fechamento](#heartbeat)** — códigos 4401/4408/4429 e como o cliente reage. 6. **[Settings (`WebSocketSettings`)](#settings)** — flags + defaults. 7. **[Trade-offs e quando NÃO usar](#trade-offs)** — single-process, fan-out multi-replica, escolha SSE vs WS. --- ## Setup mínimo Três objetos: o **hub** (estado em memória), o **resolver** (token → user UUID) e o **handler** (loop de mensagens). ```python # src/api/app.py from uuid import UUID from fastapi import FastAPI, WebSocket from tempest_fastapi_sdk import ( JWTUtils, WSEnvelope, WebSocketConnection, WebSocketHub, WebSocketSettings, make_websocket_router, ) from src.core.settings import settings ws_settings = WebSocketSettings() hub = WebSocketHub(max_per_user=ws_settings.WS_MAX_CONNECTIONS_PER_USER) tokens = JWTUtils(secret=settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) async def bearer_resolver(token: str) -> UUID | None: """Decode JWT and return the subject (user id) — None on bad token.""" try: payload = tokens.decode(token) except Exception: # noqa: BLE001 — any decode failure = reject return None return UUID(payload["sub"]) async def handler( ws: WebSocket, connection: WebSocketConnection, hub: WebSocketHub, ) -> None: """Bidirectional loop — every connection runs this until disconnect.""" while True: message = await ws.receive_json() envelope = WSEnvelope.model_validate(message) if envelope.type == "pong": continue # heartbeat — handled by the router if envelope.type == "subscribe": await hub.subscribe(connection.connection_id, envelope.data["topic"]) continue if envelope.type == "chat.message": # Broadcast pra todo mundo subscrito em `chat:` await hub.broadcast( WSEnvelope( type="chat.message", data={ "from": str(connection.user_id), "text": envelope.data["text"], }, ), topic=envelope.data["room"], ) app = FastAPI() app.include_router( make_websocket_router( handler, hub=hub, bearer_resolver=bearer_resolver, settings=ws_settings, path="/ws", ) ) ``` Pronto. Agora `ws://localhost:8000/ws?token=` aceita conexões; `hub.broadcast(...)` e `hub.send_to(...)` ficam disponíveis em **qualquer handler HTTP** do mesmo app pra empurrar eventos pros sockets. --- ## Bearer auth O SDK aceita o token de dois lugares — em ordem de preferência: | Mecanismo | Browser-friendly | Aparece em logs? | Quando usar | |---|---|---|---| | `Sec-WebSocket-Protocol: bearer,` | **Sim** (via 2º arg do `new WebSocket(...)`) | Não (header) | **Preferido** — funciona no browser, esconde o token de logs de proxy. | | `?token=` query string | Sim (URL nativa) | Sim (request log, Referer, history) | Só quando precisa de fallback ou um cliente mais limitado. | Quando ambos vêm, **subprotocol vence**. Resolver retornando `None` → o SDK fecha o socket com código `4401` antes do handler rodar. --- ## Cliente JavaScript ```javascript // Preferido — subprotocol bearer const ws = new WebSocket("wss://api.example.com/ws", ["bearer", jwtToken]); ws.addEventListener("open", () => { ws.send(JSON.stringify({ type: "subscribe", data: { topic: "chat:lobby" } })); }); ws.addEventListener("message", (event) => { const envelope = JSON.parse(event.data); // Heartbeat — responda imediato ou o servidor te derruba em 60s if (envelope.type === "ping") { ws.send(JSON.stringify({ type: "pong", data: {} })); return; } // Sua app if (envelope.type === "chat.message") { console.log("got", envelope.data); } }); // Reconnect com backoff exponencial ws.addEventListener("close", (event) => { const code = event.code; if (code === 4401) { // token inválido/expirado → redirect pro login window.location.href = "/login"; return; } setTimeout(() => connect(), Math.min(30_000, 1_000 * 2 ** attempts++)); }); ``` --- ## Broadcast `WebSocketHub` expõe três patterns: ```python # 1. send_to — todos os sockets de um usuário (multi-tab) await hub.send_to(user_id, WSEnvelope(type="notification", data={"text": "..."})) # 2. broadcast com topic — só quem se inscreveu naquele tópico await hub.broadcast( WSEnvelope(type="order.paid", data={"id": str(order_id)}), topic=f"order:{order_id}", ) # 3. broadcast sem topic — TODO mundo conectado (use raramente) await hub.broadcast( WSEnvelope(type="system.announcement", data={"text": "Servidor em manutenção"}), ) ``` Subscription lifecycle controlada pelo handler: ```python await hub.subscribe(connection.connection_id, "order:01HE...") # ... mais tarde await hub.unsubscribe(connection.connection_id, "order:01HE...") ``` Sockets mortos são detectados na hora do `send_to`/`broadcast` (a chamada `send_json` falha) — o hub remove automaticamente do registry. --- ## Heartbeat A cada `WS_HEARTBEAT_SECONDS` (default 30s) o SDK envia: ```json {"type": "ping", "data": {}, "request_id": null} ``` O cliente **deve** responder com `{"type": "pong", "data": {}}` dentro de `WS_HEARTBEAT_TIMEOUT_SECONDS` (default 60s) — caso contrário, o socket é fechado com código `4408` (Request Timeout custom no espaço 4000-4999 reservado pra apps). Códigos de fechamento que o router emite: | Código | Quando | |---|---| | `1000` | Saída normal (handler retornou ou cliente desconectou limpo) | | `1009` | Frame inbound maior que `WS_MAX_MESSAGE_BYTES` | | `4401` | Token inválido / expirado / faltando no handshake | | `4408` | Heartbeat timeout — cliente não respondeu `pong` | | `4429` | Limite `WS_MAX_CONNECTIONS_PER_USER` excedido — conexão **mais antiga** do user é evictada | --- ## Settings Mixe `WebSocketSettings` na sua classe `Settings`: ```python # src/core/settings.py from tempest_fastapi_sdk import BaseAppSettings, WebSocketSettings class Settings(WebSocketSettings, BaseAppSettings): pass ``` ```bash # .env WS_HEARTBEAT_SECONDS=30 # default WS_HEARTBEAT_TIMEOUT_SECONDS=60 # default WS_MAX_CONNECTIONS_PER_USER=5 # default WS_MAX_MESSAGE_BYTES=65536 # 64 KiB default ``` --- ## Trade-offs **Single-process por design.** `WebSocketHub` guarda estado em memória do processo. Em deploy multi-réplica: - **Opção 1 — Sticky sessions**: load balancer roteia o mesmo cliente sempre pra mesma réplica. Funciona, mas perde balanceamento. - **Opção 2 — Fan-out via pub/sub** (futuro v0.34+): handler HTTP publica num Redis pub/sub / RabbitMQ topic, e cada réplica do hub re-emite pros seus sockets locais. Surface idêntica, plumbing transparente. **Não shipped ainda** — pra v0.33.0 use Opção 1 ou rode 1 réplica do serviço WS atrás de um balanceador HTTP separado. **Quando preferir SSE em vez de WebSocket:** - Só servidor → cliente (notificações, status de pedido, dashboards live). - Cliente raramente envia (1 request/min). - Quer reconnect automático "grátis" — `EventSource` reconecta sozinho com backoff; WebSocket exige código custom. - Atrás de proxy/CDN que **não** suporta WebSocket bem (alguns ALBs / Cloudflare em planos baixos). **Quando WebSocket é a escolha certa:** - Bidirecional intenso (chat, colaboração simultânea, jogos, drawing apps). - Latência ultra-baixa em ambas direções. - Protocolo customizado por message-type que SSE não modela bem. - Volume de mensagens cliente → servidor é alto. ## Próximos passos - **[Auth flow »](auth-flow.md)** — o JWT que vai no `?token=` ou no subprotocol vem direto do `POST /auth/login` do `UserAuthService`. - **[Tempo real (SSE) »](realtime.md)** — quando só servidor → cliente serve. - **[Cache »](cache.md)** — Redis pub/sub futuro pra fan-out multi-réplica. --- # Fila e Tarefas Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/queue-tasks/ # Fila e Tarefas Trabalho em background — filas de mensagens at-least-once (FastStream/RabbitMQ), filas de tarefas (TaskIQ), schedulers periódicos e o padrão de outbox transacional. ## Filas de mensagens — FastStream `AsyncBrokerManager` embrulha qualquer broker FastStream (RabbitMQ, Kafka, NATS, Redis Streams) com uma superfície uniforme de connect/disconnect/health-check. A instância do broker é injetada para que o SDK não fixe um único transporte. Instale com `[queue]` (puxa `faststream[rabbit]`). Escolha o extra FastStream correspondente para outros transportes. ```python # src/queue/__init__.py from faststream.rabbit import RabbitBroker from pydantic import BaseModel from tempest_fastapi_sdk.queue import AsyncBrokerManager from src.core.settings import settings broker = RabbitBroker(settings.RABBITMQ_URL) queue = AsyncBrokerManager(broker) class OrderMessage(BaseModel): order_id: str user_id: str @broker.subscriber("orders.paid") async def handle_order_paid(msg: OrderMessage) -> None: await mark_order_paid(msg.order_id, msg.user_id) # src/api/app.py lifespan await queue.connect() ... await queue.disconnect() # Publique de qualquer lugar da aplicação await queue.publish(OrderMessage(order_id="abc", user_id="x"), queue="orders.paid") ``` O manager expõe: - `connect()` / `disconnect()` — idempotentes; seguros de chamar a partir do lifespan do FastAPI. - `publish(message, *args, **kwargs)` — passthrough para `broker.publish` com uma guarda `RuntimeError` quando o broker não está iniciado. - `lifespan()` — context manager async que lida com start/stop, útil para scripts curtos. - `broker_dependency` — `Depends` do FastAPI que entrega o broker ativo. - `health_check()` / `is_connected` — verdadeiro enquanto o broker está iniciado. Conecte-o no health router com `make_health_router(checks={"queue": queue.health_check})`. ## Tarefas em background — TaskIQ `AsyncTaskBrokerManager` embrulha qualquer broker TaskIQ (AioPika para RabbitMQ, Redis, in-memory para testes). Instale com `[tasks]` (puxa `taskiq` + `taskiq-aio-pika`). ```python # src/tasks/__init__.py from taskiq_aio_pika import AioPikaBroker from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager from src.core.settings import settings tasks = AsyncTaskBrokerManager(AioPikaBroker(settings.TASKIQ_BROKER_URL)) @tasks.task async def send_welcome_email(to: str, name: str) -> None: await email_utils.send( to=to, subject="Bem-vindo!", body=f"Olá, {name} — sua conta foi criada.", ) # src/api/app.py lifespan await tasks.connect() ... await tasks.disconnect() # Enfileire de um handler de request await send_welcome_email.kiq(to=user.email, name=user.name) ``` `register_task(callable, task_name=..., **kwargs)` registra uma função sem a sintaxe de decorator — útil ao conectar callables de terceiros que você não pode decorar no ponto de definição. Para testes, troque o broker por `taskiq.InMemoryBroker()` para que as tarefas executem de forma síncrona. As mesmas guarda de lifespan do manager de fila se aplicam: `connect()`/`disconnect()`/`lifespan()`/`broker_dependency`/`health_check()`/`is_connected`. ## Scheduler de tarefas periódicas `AsyncTaskScheduler` embrulha `taskiq.TaskiqScheduler` + `LabelScheduleSource` para que tarefas periódicas sejam declaradas com decorators ao lado de tarefas normais e o scheduler seja dirigido pelo lifespan do FastAPI. Ele **não executa os corpos das tarefas** — ele as enfileira no mesmo broker que o `AsyncTaskBrokerManager` embrulha, então um processo worker precisa estar rodando para consumi-las. Requer o extra `[tasks]`. ```python # src/tasks/__init__.py from datetime import timedelta from taskiq_aio_pika import AioPikaBroker from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager, AsyncTaskScheduler from src.core.settings import settings # Use TASKIQ_BROKER_URL (de TaskIQSettings) quando o scheduler / # broker de tarefas for um broker diferente da fila FastStream # (RABBITMQ_URL). Reutilize a mesma URL do RabbitMQ quando # compartilharem o broker — ambas env vars podem apontar pro mesmo valor. broker = AioPikaBroker(settings.TASKIQ_BROKER_URL) tasks = AsyncTaskBrokerManager(broker) scheduler = AsyncTaskScheduler(broker) @tasks.task async def reconcile_invoices(batch_size: int = 100) -> None: """Background task — kicked by handlers or the scheduler.""" ... @scheduler.cron("*/5 * * * *") # every five minutes async def heartbeat() -> None: """Liveness ping written to the audit log.""" ... @scheduler.cron("0 9 * * MON-FRI", cron_offset="-03:00") # 09:00 BRT, weekdays async def daily_digest() -> None: ... @scheduler.interval(seconds=30) # every 30s async def poll_remote_queue() -> None: ... @scheduler.interval(timedelta(minutes=15)) async def warm_cache() -> None: ... ``` Conecte-o ao lifespan do app, ao lado do manager de broker: ```python # src/api/app.py @asynccontextmanager async def lifespan(_: FastAPI) -> AsyncIterator[None]: await tasks.connect() await scheduler.connect() await scheduler.run_in_background() # dev / single-process services try: yield finally: await scheduler.disconnect() await tasks.disconnect() ``` Superfície de decorators: | Método | Quando usar | | --- | --- | | `@scheduler.cron("*/5 * * * *", cron_offset=None)` | Expressão cron; passe `cron_offset` (string como `"-03:00"` ou `timedelta`) para ancorar a um timezone diferente de UTC. | | `@scheduler.interval(seconds=30)` / `@scheduler.interval(timedelta(...))` | Recorrência em intervalo fixo. | | `@scheduler.schedule([{...}, {...}])` | Lista crua de schedule do TaskIQ — combine triggers, use `time` de uma vez só, etc. | | `scheduler.register(func, schedule=[...], task_name=...)` | Registro sem sintaxe de decorator (callables de terceiros). | Deploys de produção com múltiplos workers devem rodar a CLI standalone do scheduler em vez de `run_in_background()`, para que só um scheduler esteja ativo no cluster: ```bash taskiq scheduler src.tasks:scheduler.scheduler ``` (`scheduler.scheduler` é a instância interna `TaskiqScheduler` exposta em `AsyncTaskScheduler`.) O processo worker continua o mesmo: ```bash taskiq worker src.tasks:tasks.broker ``` Os controles de ciclo de vida espelham o manager de broker: `connect()` / `disconnect()` / `lifespan()` / `run_in_background()` / `health_check()` / `is_connected`. ## Padrão outbox dispatcher O padrão de outbox transacional mantém uma tabela "a publicar" no mesmo banco das linhas de domínio, para que escrever a linha e registrar o efeito colateral aconteçam em uma única transação. Um worker lê o outbox em ordem e publica no RabbitMQ (FastStream) / TaskIQ, marcando cada linha como despachada só depois que o broker dá ACK. Crashes entre o commit e o publish reproduzem com segurança no próximo poll. O SDK **não** traz um primitivo dedicado `OutboxDispatcher` — a implementação é curta, opinativa e se beneficia de ficar na fronteira `db/models/` + `tasks/` do serviço. Use a receita abaixo. ```python # src/db/models/outbox.py from sqlalchemy import JSON, String from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import BaseModel class OutboxEventModel(BaseModel): """One row per domain event waiting to be published.""" topic: Mapped[str] = mapped_column(String(128), nullable=False, index=True) payload: Mapped[dict] = mapped_column(JSON, nullable=False) status: Mapped[str] = mapped_column( String(16), nullable=False, default="pending", index=True, ) # is_active / created_at / updated_at come from BaseModel. ``` ```python # src/db/repositories/outbox.py from sqlalchemy import select, update from tempest_fastapi_sdk import BaseRepository from src.db.models import OutboxEventModel class OutboxRepository(BaseRepository[OutboxEventModel]): def __init__(self, session: AsyncSession) -> None: super().__init__(session, model=OutboxEventModel) async def claim_pending(self, *, limit: int = 100) -> list[OutboxEventModel]: """Lock-free claim — fine for single-worker dispatcher.""" stmt = ( select(OutboxEventModel) .where(OutboxEventModel.status == "pending") .order_by(OutboxEventModel.created_at) .limit(limit) ) result = await self.session.execute(stmt) return list(result.scalars().all()) async def mark_dispatched(self, ids: list[str]) -> None: await self.session.execute( update(OutboxEventModel) .where(OutboxEventModel.id.in_(ids)) .values(status="dispatched"), ) await self.session.commit() ``` ```python # src/services/orders.py — lado produtor from src.db.models import OrderModel, OutboxEventModel class OrderService: async def place_order(self, data: OrderCreateSchema) -> OrderResponseSchema: order = OrderModel(**data.to_dict()) self.repo.session.add(order) # Same transaction as the order row. self.repo.session.add( OutboxEventModel( topic="orders.placed", payload={"order_id": str(order.id), "amount": order.amount}, ), ) await self.repo.session.flush() await self.repo.session.commit() return self.repo.map_to_response(order) ``` ```python # src/tasks/__init__.py — lado dispatcher from tempest_fastapi_sdk.tasks import AsyncTaskScheduler from src.api.app import broker as queue_broker # FastStream AsyncBrokerManager from src.api.app import db, taskiq_broker scheduler = AsyncTaskScheduler(taskiq_broker) @scheduler.interval(seconds=5) async def dispatch_outbox() -> None: """Poll the outbox and publish each pending event.""" async with db.get_session_context() as session: repo = OutboxRepository(session) events = await repo.claim_pending(limit=100) if not events: return dispatched: list[str] = [] for event in events: try: await queue_broker.publish(event.payload, event.topic) dispatched.append(str(event.id)) except Exception: # noqa: BLE001 — retry on next tick continue if dispatched: await repo.mark_dispatched(dispatched) ``` Trade-offs para ter em mente: - **A ordem é best-effort.** Quando um lote contém um publish que falha, todo evento posterior no mesmo lote ainda roda — mas eles continuam sendo publicados em ordem de `created_at`. Se a ordem estrita importa, pare na primeira falha. - **Dispatcher único.** O `claim_pending` ingênuo não trava linhas; rodar múltiplos workers dispatcher vai publicar em duplicidade. Use `SELECT ... FOR UPDATE SKIP LOCKED` no PostgreSQL quando precisar escalar horizontalmente. - **Retenção.** Adicione um job periódico no estilo `TRUNCATE` para apagar linhas `dispatched` mais antigas que N dias, senão a tabela de outbox cresce sem limite. - **At-least-once.** Os consumidores devem ser idempotentes — o dispatcher pode crashar depois de publicar mas antes do `mark_dispatched`. --- # Outbox transacional (eventos confiáveis) Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/outbox/ # Outbox transacional (eventos confiáveis) Quando um handler **grava uma linha** e **publica um evento**, fazer as duas coisas como operações independentes é frágil: se o processo morre *depois* do commit mas *antes* do publish, o evento some; se morre depois do publish mas antes do commit, sobra um evento fantasma apontando pra uma linha que nunca existiu. Isso é o **dual-write problem**. O padrão **outbox** resolve: grave a linha de negócio **e** uma linha `outbox` na **mesma transação**. Ou as duas comitam, ou nenhuma. Um *relay* separado lê as linhas pendentes e publica no broker, marcando cada uma como enviada. O broker pode ficar minutos fora do ar — os eventos esperam, duráveis, na tabela. !!! info "Onde isso encaixa" Complementa o [`AsyncBrokerManager`](queue-tasks.md): o broker *publica*, o outbox *garante* que o evento existe pra ser publicado. O relay usa qualquer callable de publish — então funciona com FastStream, webhook, o que for. ## 1. A tabela outbox `BaseOutboxModel` é abstrata — o projeto cria a tabela concreta (igual a `BaseUserModel`): ```python from tempest_fastapi_sdk import BaseOutboxModel class OutboxModel(BaseOutboxModel): """Tabela de eventos pendentes do serviço.""" __tablename__ = "outbox" ``` Ela já traz `topic`, `payload` (JSON), `status`, `attempts`, `max_attempts`, `available_at`, `sent_at` e `last_error` — além das quatro colunas canônicas do `BaseModel` (`id` / `is_active` / `created_at` / `updated_at`). Gere a migration com o [`AlembicHelper`](database.md) como qualquer outra tabela. ## 2. Gravar de forma atômica No service/repository, use `save_with_outbox` em vez de `add`: ele insere o modelo de negócio **e** o evento numa transação só. ```python from tempest_fastapi_sdk import BaseRepository from src.db.models import OrderModel, OutboxModel async def place_order(repo: BaseRepository[OrderModel], data: dict[str, object]) -> OrderModel: """Cria o pedido e enfileira o evento na mesma transação.""" order = OrderModel(**data) event = OutboxModel.new_event("orders.created", {"order": data}) return await repo.save_with_outbox(order, event) ``` Se o `commit` falhar (ex.: constraint única), **as duas** linhas são revertidas — nunca sobra evento órfão. ## 3. Drenar e publicar (o relay) `OutboxRelay` lê as linhas pendentes e chama o seu callable de publish. Ele não importa nenhum broker específico — você passa a função: ```python import asyncio from tempest_fastapi_sdk import AsyncDatabaseManager, BaseOutboxModel, OutboxRelay from src.db.models import OutboxModel async def run_relay(db: AsyncDatabaseManager, broker: object) -> None: """Publica eventos pendentes continuamente.""" async def publish(event: BaseOutboxModel) -> None: """Encaminha um evento pro broker.""" await broker.publish(event.payload, event.topic) # type: ignore[attr-defined] relay: OutboxRelay = OutboxRelay(db, model=OutboxModel, publish=publish) await relay.run(poll_interval=1.0) # loop até a task ser cancelada ``` Rode o relay como um processo/worker separado (ou uma task no lifespan). Cada evento publicado vira `status="sent"` com `sent_at` preenchido. ### Falhas e retry Se o `publish` levanta exceção, o relay **não** marca como enviado: ele incrementa `attempts`, guarda o erro em `last_error` e reagenda o evento com backoff exponencial (`available_at` no futuro). Quando `attempts` chega em `max_attempts`, a linha vira `status="failed"` e fica na tabela pra inspeção manual (nunca mais é retentada automaticamente). !!! tip "Múltiplos workers" Em PostgreSQL/MySQL o relay trava o lote com `FOR UPDATE SKIP LOCKED`, então você pode rodar **vários** workers de relay sem publicar o mesmo evento duas vezes. Em SQLite (sem lock de linha) ele cai pra um `SELECT` simples — use um worker só. ### Drenar uma vez (testes / cron) Pra cenários sem loop (um teste, um cron job), chame `drain_once()`, que devolve quantos eventos foram publicados: ```python published: int = await relay.drain_once() ``` ## Recap - `BaseOutboxModel` → tabela concreta `OutboxModel(__tablename__="outbox")`. - `repo.save_with_outbox(model, event)` grava negócio + evento **atômico**. - `OutboxRelay(db, model=..., publish=...).run()` publica os pendentes, com retry/backoff e marcação `sent` / `failed`. - `OutboxModel.new_event(topic, payload)` monta o evento; `drain_once()` drena um lote pra testes/cron. --- # Email transacional Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/email/ # Email transacional Envio de email via SMTP com `EmailUtils` — corpo texto + alternativa HTML, anexos, e renderização de templates Jinja2. Requer o extra `[email]` (`aiosmtplib` + `jinja2` + `email-validator`). ## Configuração `EmailSettings` traz os campos SMTP prontos (`SMTP_HOST`, `SMTP_PORT`, `SMTP_USERNAME`, `SMTP_PASSWORD`, `SMTP_FROM_ADDR`, `SMTP_USE_TLS`, `SMTP_USE_SSL`, `SMTP_TIMEOUT_SECONDS`). Componha no seu `Settings` e use `email_kwargs()` para construir o utilitário sem mapear campo a campo: ```python # src/core/mailer.py from tempest_fastapi_sdk import EmailUtils from src.core.settings import settings mailer = EmailUtils(**settings.email_kwargs()) ``` `email_kwargs()` faz a ponte dos nomes SMTP para os do `EmailUtils`: `SMTP_USE_TLS` (STARTTLS, porta 587) → `use_starttls`; `SMTP_USE_SSL` (TLS desde o início, porta 465) → `use_tls`. !!! tip "STARTTLS vs SSL/TLS — oportunístico por padrão" Porta **587** → `SMTP_USE_TLS=true` (default): conecta em texto e faz upgrade via STARTTLS. Porta **465** → `SMTP_USE_SSL=true`: conecta já cifrado. O STARTTLS é **oportunístico**: o `EmailUtils` só faz o upgrade quando o servidor anuncia STARTTLS, então um servidor plain (**MailHog em `:1025`**, ou `:25`) funciona de cara — sem mais `SMTPException: SMTP STARTTLS extension not supported by server.` (desde a v0.38.1). Para forçar texto puro sem nem tentar o upgrade, use `SMTP_USE_TLS=false`; o `.env.example` gerado pelo `tempest new` com `[email]` já vem assim para o MailHog. ## Produção: SMTP real e credenciais Em produção o SMTP **não é opcional** — você manda email por um provedor real (Gmail/Workspace, AWS SES, SendGrid, Mailgun, ...) e isso exige **host, porta, usuário e senha**. A regra de ouro: essas credenciais **vêm sempre do ambiente** (`.env` / secret manager / variáveis do container), **nunca** ficam no código nem versionadas no Git. !!! danger "Credencial SMTP é segredo — trate como senha de banco" - `SMTP_PASSWORD` **nunca** entra no repositório. Mantenha `.env` no `.gitignore` e versione só um `.env.example` com valores fake. - Para Gmail/Workspace **não use a senha da conta** — gere uma **App Password** () com 2FA ligado. A senha normal falha com `535 Authentication failed`. - O `SMTP_FROM_ADDR` deve ser um endereço de um domínio que você controla e autenticou (SPF/DKIM/DMARC), senão o email cai em spam ou é rejeitado. Os serviços de produção já seguem esse padrão — declare os campos SMTP no seu `Settings` e leia tudo do ambiente: ```python # src/core/settings.py from pydantic import Field from tempest_fastapi_sdk import BaseAppSettings, EmailSettings class Settings(BaseAppSettings, EmailSettings): """Settings da aplicação. SMTP herdado de EmailSettings.""" FRONTEND_URL: str = Field( default="http://localhost:3000", description="Base URL do frontend (usada nos links dos emails).", ) settings: Settings = Settings() ``` ```ini # .env.example (commitado — valores fake) # .env real (NÃO commitado) tem as credenciais de verdade SMTP_HOST=smtp.gmail.com SMTP_PORT=587 SMTP_USERNAME=seu_email@example.com SMTP_PASSWORD="sua_app_password" SMTP_FROM_ADDR=seu_email@example.com SMTP_USE_TLS=true # STARTTLS na 587 SMTP_USE_SSL=false # use true (e PORT=465) para TLS implícito ``` Provedores comuns e a combinação de porta/TLS: | Provedor | `SMTP_HOST` | `SMTP_PORT` | `SMTP_USE_TLS` (STARTTLS) | `SMTP_USE_SSL` (TLS implícito) | | ------------------- | ---------------------- | ----------- | ------------------------- | ------------------------------ | | Gmail / Workspace | `smtp.gmail.com` | `587` | `true` | `false` | | Gmail (TLS direto) | `smtp.gmail.com` | `465` | `false` | `true` | | AWS SES | `email-smtp..amazonaws.com` | `587` | `true` | `false` | | SendGrid | `smtp.sendgrid.net` | `587` | `true` | `false` | | MailHog (dev local) | `localhost` | `1025` | `false` | `false` | !!! note "Como alofans-api e transport-backend fazem" São os dois padrões em produção: **alofans-api** usa **Gmail na porta 587 (STARTTLS) com App Password** — `SMTP_USE_TLS=true`. O **transport-backend** usa **porta 465 (TLS implícito)** — `SMTP_USE_SSL=true`, `SMTP_PORT=465`. Os dois leem `SMTP_*` do ambiente e nunca embutem a senha no código. Escolha STARTTLS (587) ou SSL (465) conforme o seu provedor; **não ligue os dois ao mesmo tempo**. ## Enviar um email Cada `send()` abre uma conexão SMTP nova. `to` aceita uma string ou um iterável; o `body` (texto puro) é sempre enviado e o `html` vira a alternativa multipart quando presente. ```python await mailer.send( to="ana@example.com", subject="Bem-vinda!", body="Sua conta foi criada.", html="

Sua conta foi criada.

", ) ``` Parâmetros opcionais por mensagem: `cc`, `bcc`, `attachments` (`Iterable[Path]`), `reply_to` e `from_addr` (sobrescreve o remetente padrão). Qualquer erro SMTP é re-levantado como `aiosmtplib.errors.SMTPException` para o chamador tratar. ## Templates Jinja2 Passe `template_dir=` na construção e renderize com `render_template()` — o ambiente Jinja2 é criado preguiçosamente na primeira chamada e memoizado. Autoescape liga para `.html` / `.htm` / `.xml`. ```python # src/core/mailer.py mailer = EmailUtils( host=settings.SMTP_HOST, port=settings.SMTP_PORT, from_addr=settings.SMTP_FROM_ADDR, template_dir="src/templates/emails", ) html: str = mailer.render_template( "welcome.html", {"user_name": "Ana", "app_url": "https://app.example.com"}, ) await mailer.send( to="ana@example.com", subject="Bem-vinda!", body="Bem-vinda, Ana!", html=html, ) ``` !!! info "Templates bundled do fluxo de auth" O fluxo bundled de auth (`make_auth_router`) já manda email de ativação e reset de senha usando templates embutidos (`activation.html`, `password_reset.html`). Coloque arquivos de mesmo nome no seu `template_dir` para sobrescrevê-los. Veja [Auth flow](auth-flow.md). ## Exemplo: reset de senha Serviço que manda um link de reset com um JWT de vida curta. Note que `request_reset` retorna em silêncio para um email não cadastrado — evita enumeração de contas. ```python # src/services/password_reset.py from datetime import timedelta from tempest_fastapi_sdk import EmailUtils, JWTUtils from src.db.repositories import UserRepository class PasswordResetService: def __init__( self, repo: UserRepository, tokens: JWTUtils, mailer: EmailUtils, ) -> None: self.repo: UserRepository = repo self.tokens: JWTUtils = tokens self.mailer: EmailUtils = mailer async def request_reset(self, email: str) -> None: """Manda um link de reset para ``email`` (silencioso se não existe).""" user = await self.repo.get_or_none({"email": email}) if user is None: return token: str = self.tokens.encode( {"sub": str(user.id), "purpose": "password_reset"}, ttl=timedelta(minutes=15), ) reset_url: str = f"https://app.example.com/reset-password?token={token}" await self.mailer.send( to=user.email, subject="Redefina sua senha", body=f"Abra para redefinir: {reset_url}", html=f'

Clique aqui para redefinir.

', ) ``` ## Recap - Instale `[email]` e componha `EmailSettings` no seu `Settings`. - Uma instância de `EmailUtils` por app; `send()` é async e abre conexão por chamada. - `body` texto é obrigatório; `html` é a alternativa multipart opcional. - `render_template()` (com `template_dir`) gera o HTML a partir de Jinja2. - O fluxo bundled de auth já envia ativação/reset com templates sobrescrevíveis. --- # Web Push Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/webpush/ # Web Push Notificações Web Push (assinadas com VAPID) para navegadores via `WebPushDispatcher`. Embrulha o `pywebpush` síncrono em `asyncio.to_thread` e expõe os dois erros que a aplicação realmente trata: `WebPushGoneError` (HTTP 404/410 — apague a inscrição) e `WebPushError` (qualquer outra falha). Requer o extra `[webpush]` (`pywebpush` + `cryptography`). ## Configuração VAPID `WebPushSettings` traz `VAPID_PUBLIC_KEY`, `VAPID_PRIVATE_KEY` e `VAPID_SUBJECT`. A chave **pública** vai para o frontend (no `pushManager.subscribe`); a **privada** assina cada push no backend. O `sub` deve ser um URI `mailto:` ou `https:`. ```python # src/services/notifications.py from tempest_fastapi_sdk import WebPushDispatcher from src.core.settings import settings # settings.webpush_kwargs() -> vapid_private_key + vapid_subject + ttl_seconds dispatcher = WebPushDispatcher(**settings.webpush_kwargs()) ``` ## Tabela + serviço (recomendado) Para guardar os aparelhos do usuário e entregar com poda automática, o SDK traz a **tabela base** `BaseWebPushSubscriptionModel` (uma linha por device, `endpoint` único) e o **serviço base** `WebPushSubscriptionService` (salva, remove e envia, podando as mortas sozinho). Igual ao padrão de auth, o SDK fornece a linha abstrata e o projeto cria a tabela concreta com a FK pro seu `UserModel`: ```python # src/db/models/web_push_subscription.py from sqlalchemy import ForeignKey from sqlalchemy.orm import Mapped, mapped_column from uuid import UUID from tempest_fastapi_sdk import BaseWebPushSubscriptionModel class WebPushSubscriptionModel(BaseWebPushSubscriptionModel): """Inscrição Web Push de um device do usuário.""" __tablename__ = "web_push_subscriptions" user_id: Mapped[UUID] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True, ) ``` Monte o serviço com um `BaseRepository` da tabela + o dispatcher VAPID: ```python # src/api/dependencies/resources.py from tempest_fastapi_sdk import BaseRepository, WebPushSubscriptionService from src.core.settings import settings from src.db.models import WebPushSubscriptionModel def get_push_service(session: AsyncSession) -> WebPushSubscriptionService: repo = BaseRepository(session, model=WebPushSubscriptionModel) dispatcher = WebPushDispatcher(**settings.webpush_kwargs()) return WebPushSubscriptionService(repo, dispatcher) ``` O serviço expõe: | Método | O que faz | | --- | --- | | `subscribe(user_id, subscription, *, user_agent=None)` | Salva a inscrição, **idempotente por `endpoint`** — re-subscribe atualiza, não duplica. | | `unsubscribe(endpoint)` | Remove a inscrição (no-op se não existe). | | `list_for_user(user_id)` | Lista os devices do usuário. | | `notify_user(user_id, payload)` | Envia pra todos os devices e **poda os mortos** (404/410) antes de retornar. Devolve quantos receberam. | ## Alinhado com o tempest-react-sdk O `WebPushClient` do [`tempest-react-sdk`](https://github.com/mauriciobenjamin700/tempest-react-sdk) chama `onSubscribe(subscription)` e `onUnsubscribe(subscription)` com o `PushSubscription.toJSON()` cru. Esse JSON é exatamente o `WebPushSubscriptionSchema` (aliasa `expiration_time` ↔ `expirationTime`), então o front bate direto nos endpoints abaixo: ```python # src/api/routers/push.py from fastapi import APIRouter, Depends, status from tempest_fastapi_sdk import WebPushSubscriptionSchema, WebPushSubscriptionService router = APIRouter(prefix="/api/push", tags=["push"]) @router.post("/subscribe", status_code=status.HTTP_201_CREATED) async def subscribe( subscription: WebPushSubscriptionSchema, user: CurrentUser, # sua dependency de auth service: WebPushSubscriptionService = Depends(get_push_service), ) -> dict[str, str]: """Recebe o onSubscribe do WebPushClient e persiste o device.""" await service.subscribe(user.id, subscription) return {"status": "subscribed"} @router.post("/unsubscribe", status_code=status.HTTP_200_OK) async def unsubscribe( subscription: WebPushSubscriptionSchema, service: WebPushSubscriptionService = Depends(get_push_service), ) -> dict[str, str]: """Recebe o onUnsubscribe e remove o device.""" await service.unsubscribe(subscription.endpoint) return {"status": "unsubscribed"} ``` ### Router pronto (opt-in) Não quer escrever os dois endpoints? `make_web_push_router` monta `/subscribe` + `/unsubscribe` já ligados ao serviço — estilo `make_auth_router`. Você só injeta como o serviço e o usuário atual são resolvidos: ```python # src/api/app.py from tempest_fastapi_sdk import BaseRepository, WebPushSubscriptionService, make_web_push_router from src.api.dependencies import get_current_user_id, get_session from src.core.settings import settings from src.db.models import WebPushSubscriptionModel def _service(session: AsyncSession) -> WebPushSubscriptionService: repo = BaseRepository(session, model=WebPushSubscriptionModel) return WebPushSubscriptionService(repo, WebPushDispatcher(**settings.webpush_kwargs())) app.include_router( make_web_push_router( service_factory=_service, session_factory=get_session, current_user_id=get_current_user_id, # dependency -> UUID ) ) # POST /api/push/subscribe (201) e POST /api/push/unsubscribe (200) ``` O `User-Agent` da requisição vira o rótulo do device (`store_user_agent=True`, default). Ambos os endpoints exigem autenticação via `current_user_id`. Enviar pra um usuário (todos os devices, poda automática embutida): ```python delivered: int = await service.notify_user( user.id, {"title": "Pagamento confirmado", "body": "Pedido aprovado."}, ) ``` ## Enviar uma notificação (dispatcher direto) O `payload` aceita `WebPushPayloadSchema`, `dict`, `str` ou `bytes` (models e dicts viram JSON). Trate `WebPushGoneError` para podar a inscrição morta do seu store. ```python from tempest_fastapi_sdk import ( WebPushGoneError, WebPushPayloadSchema, WebPushSubscriptionSchema, ) async def notify_order_paid( subscription: WebPushSubscriptionSchema, order_id: str, ) -> None: payload = WebPushPayloadSchema( title="Pagamento confirmado", body=f"Pedido {order_id} aprovado.", icon="/static/icons/order.png", data={"orderId": order_id, "url": f"/orders/{order_id}"}, ) try: await dispatcher.send(subscription, payload) except WebPushGoneError: await subscriptions_repo.delete_by_endpoint(subscription.endpoint) ``` ## Broadcast com poda automática `send_many()` dispara o mesmo payload concorrentemente (`asyncio.gather`) e **retorna os endpoints mortos** (404/410) para você remover — outras falhas são logadas, não levantadas. ```python async def broadcast( subs: list[WebPushSubscriptionSchema], payload: WebPushPayloadSchema, ) -> None: gone: list[str] = await dispatcher.send_many(subs, payload) if gone: await subscriptions_repo.delete_by_endpoints(gone) ``` !!! warning "Sempre pode as inscrições mortas" Inscrições expiram quando o usuário troca de device ou revoga a permissão. Ignorar `WebPushGoneError` / o retorno do `send_many` acumula endpoints zumbis e desperdiça dispatch. Apague-os assim que o push service responder 404/410. ## Recap - Instale `[webpush]` e configure `WebPushSettings` (chaves VAPID). - Chave pública → frontend; privada → assina os pushes no backend. - Tabela `BaseWebPushSubscriptionModel` (1 linha por device, `endpoint` único) + `WebPushSubscriptionService` (`subscribe`/`unsubscribe`/`notify_user`) — o caminho recomendado, com poda automática. - O JSON do `WebPushClient` (tempest-react-sdk) é o próprio `WebPushSubscriptionSchema` — `subscribe`/`unsubscribe` batem direto. - `make_web_push_router` monta `/subscribe` + `/unsubscribe` prontos (estilo `make_auth_router`) se você não quiser escrever as rotas. - Caminho baixo nível: `send()` para um destino, `send_many()` para broadcast (retorna mortos); trate `WebPushGoneError` (404/410) podando o store. --- # Logging Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/logging/ # Logging `configure_logging` instala um handler JSON no logger raiz que emite registros JSON de uma linha carregando o request ID ativo. `LogUtils` é uma fachada fina que adiciona métodos por nível aceitando `**fields` estruturados. ```python from tempest_fastapi_sdk import LogUtils, configure_logging from tempest_fastapi_sdk.core import get_request_id # Imperativo — chame uma vez durante o bootstrap. configure_logging(level="INFO", json_output=True) # Fachada — útil para singletons de serviço. log = LogUtils("app.users", level="INFO") log.info("user_created", user_id=str(user.id), email=user.email) log.warning("login_throttled", ip="1.2.3.4", attempts=5) try: risky() except RuntimeError: log.exception("risky_failed", op="reconcile") # appends traceback # Exponha o ID de correlação fora da linha de log, se necessário. request_id = get_request_id() ``` Saída JSON (uma linha — formatada aqui para legibilidade): ```json { "timestamp": "2026-05-16T20:14:33.412Z", "level": "INFO", "logger": "app.users", "message": "user_created", "request_id": "d83e4b0c-7c2f-4bd6-aaa1-7d4f6cf5e5e9", "user_id": "9c1a5b2d-...", "email": "ana@example.com" } ``` O middleware aceita um nome de header customizado (`RequestIDMiddleware(app, header_name="X-Correlation-ID")`); o mesmo header é ecoado de volta em toda resposta. ## Arquivos por nível + `500.log` isolado **Por padrão, o SDK escreve simultaneamente no stdout E em `logs/`** (um arquivo JSON por nível). Cada arquivo recebe **apenas o seu próprio nível** (correspondência exata — um `ERROR` nunca cai no `warning.log`), então toda severidade vira um fluxo isolado e fácil de inspecionar com `grep`. ```python from tempest_fastapi_sdk import configure_logging # Defaults — stdout + logs/{debug,info,warning,error,critical,500}.log configure_logging(level="INFO") # Customizar diretório configure_logging(level="INFO", log_dir="/var/log/myapp") # Desligar arquivos (stdout puro — útil em serverless ou FS read-only) configure_logging(level="INFO", file_output=False) # Desligar stdout (sidecar coleta de disco) configure_logging(level="INFO", stdout=False) ``` !!! warning "Não desligue os dois" `configure_logging(stdout=False, file_output=False)` lança `ValueError` — silenciar todos os handlers deixa a aplicação cega. !!! check "Log em arquivo é best-effort — nunca derruba o boot" Se o `log_dir` não puder ser criado ou seus arquivos não puderem ser abertos (FS read-only, falta de permissão de escrita, container endurecido, serverless, CI), o SDK **pula** os handlers de arquivo, emite um aviso (no logger quando o stdout está ligado, senão direto no `stderr`) e segue rodando só com stdout — em vez de morrer no import com `PermissionError: [Errno 13] ... 'logs'`. Para abrir mão do log em arquivo de forma explícita, passe `file_output=False`. O resultado em disco: ```text logs/ ├── debug.log # só registros DEBUG ├── info.log # só registros INFO ├── warning.log # só registros WARNING ├── error.log # só registros ERROR (um 500 também cai aqui) ├── critical.log # só registros CRITICAL └── 500.log # só erros 500 não tratados (isolado) ``` !!! danger "Erros 500 são graves — por isso ganham arquivo próprio" O handler catch-all registrado por `register_exception_handlers` marca toda exceção não tratada com o `extra` `http_500=True`. O `configure_logging(log_dir=...)` roteia esses registros para um `500.log` dedicado, **além** do `error.log`. Assim a falha mais grave nunca fica soterrada no meio dos outros erros. !!! tip "Sempre nos logs, nunca no body" O traceback vai para os arquivos/terminal via logging — **não** para o corpo da resposta. O body de um 500 é só o envelope genérico (`{"detail": "Internal server error", "code": "INTERNAL_SERVER_ERROR"}`). Veja [Camada HTTP](http.md) para os flags `log_traceback` / `include_traceback`. !!! note "Arquivos são sempre JSON" Os handlers de arquivo usam o `JSONFormatter` independente de `json_output`, para que o endpoint `/logs` consiga parseá-los. O `json_output` controla apenas o formato do stdout. No scaffold, o diretório vem de `LOG_DIR` (padrão `"logs"`; deixe vazio para desativar o log em arquivo). Adicione `logs/` ao `.gitignore`. ## Lendo logs por HTTP — `make_logs_router` `make_logs_router` monta `GET /logs`, que lê os arquivos JSON em disco e devolve um `BasePaginationSchema[LogEntrySchema]` paginado (mais recentes primeiro). ```python from tempest_fastapi_sdk import make_logs_router app.include_router( make_logs_router(log_dir="logs", token_secret=settings.TOKEN_SECRET), ) ``` !!! warning "Proteja o endpoint em produção" O payload expõe tracebacks e metadados de request. O endpoint é protegido por um header de segredo compartilhado `X-Token` via `make_token_dependency`. Um `TOKEN_SECRET` vazio **desativa** a checagem (apenas dev) — nunca exponha `/logs` sem auth em produção. Exemplos de consulta: ```bash # Últimos 20 registros de todos os níveis curl -H "X-Token: $TOKEN_SECRET" "http://localhost:8000/logs" # Só os 500 isolados, página 1, 50 por página curl -H "X-Token: $TOKEN_SECRET" "http://localhost:8000/logs?source=500&page_size=50" # Erros mencionando "timeout" numa janela de tempo curl -H "X-Token: $TOKEN_SECRET" \ "http://localhost:8000/logs?source=error&q=timeout&start=2026-05-31T00:00:00Z" ``` Parâmetros de query: | Parâmetro | Valores | Descrição | | --- | --- | --- | | `source` | `all` (padrão), `debug`, `info`, `warning`, `error`, `critical`, `500` | Qual arquivo ler. `all` mescla todos os níveis; `500` retorna só os 500 isolados. | | `q` | texto | Substring (case-insensitive) na mensagem. | | `start` / `end` | ISO-8601 | Limita os registros a uma janela de tempo. | | `page` / `page_size` | inteiros | Paginação (1-indexada). | !!! check "Recap" - `configure_logging(log_dir=...)` → stdout **+** um arquivo por nível. - Exatidão por nível: cada arquivo só recebe a sua severidade. - `500.log` isola erros 500 não tratados (marcador `http_500`). - `make_logs_router` serve esses arquivos paginados e autenticados. ## Enums base `BaseStrEnum` / `BaseIntEnum` estendem o `Enum` da stdlib com helpers ajustados para o round-trip Pydantic + SQLAlchemy (lookup por valor, herança serializável `str` / `int` em JSON, `__contains__` que aceita valores crus). Use-os em todo enum que cruza a fronteira da API. ```python from tempest_fastapi_sdk import BaseIntEnum, BaseStrEnum class OrderStatus(BaseStrEnum): PENDING = "pending" PAID = "paid" SHIPPED = "shipped" CANCELLED = "cancelled" class Priority(BaseIntEnum): LOW = 0 NORMAL = 1 HIGH = 2 assert OrderStatus.PENDING == "pending" # str inheritance assert "paid" in OrderStatus # raw value membership assert OrderStatus("paid") is OrderStatus.PAID # canonical lookup assert Priority.NORMAL + 1 == Priority.HIGH # int math ``` Por herdarem de `str` / `int`, o Pydantic os serializa de forma transparente como o valor subjacente e o SQLAlchemy consegue persisti-los pela coluna `Enum` padrão sem um conversor extra. --- # Observabilidade (tracing + slow queries) Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/observability/ # Observabilidade (tracing + slow queries) Logs te dizem **o que** aconteceu num serviço; tracing distribuído te diz **onde** o tempo foi gasto numa request que cruza vários serviços, e o `SlowQueryLogger` te aponta **qual** query está arrastando o p99. Esta receita cobre os dois. !!! info "Onde isso encaixa" O [`RequestIDMiddleware`](http.md) correlaciona **logs** por request; o OpenTelemetry correlaciona **spans** entre serviços. Eles se complementam — use os dois juntos. ## Tracing distribuído com OpenTelemetry `setup_tracing` instala um provider OpenTelemetry e auto-instrumenta as camadas mais comuns de um serviço Tempest: FastAPI (requests de entrada), SQLAlchemy (queries) e httpx (chamadas de saída). Requer o extra `[otel]`: ```bash pip install "tempest-fastapi-sdk[otel]" ``` Chame uma vez no startup, depois que a app existe e (quando quiser tracear queries) depois que o banco conectou: ```python from fastapi import FastAPI from tempest_fastapi_sdk import AsyncDatabaseManager, setup_tracing app: FastAPI = FastAPI() db: AsyncDatabaseManager = AsyncDatabaseManager("postgresql+asyncpg://...") @app.on_event("startup") async def _startup() -> None: """Conecta o banco e liga o tracing.""" await db.connect() setup_tracing( app, service_name="orders-api", otlp_endpoint="http://otel-collector:4317", sqlalchemy_engine=db.engine, ) ``` Pronto: cada request vira um span pai, cada query e cada chamada httpx vira um span filho, e o trace inteiro aparece no Jaeger / Tempo / Honeycomb sob o nome `orders-api`. ### Sem coletor (debug local) Passe `otlp_endpoint=None` pra instalar um exportador de console — os spans saem no stdout, sem precisar subir um coletor: ```python from fastapi import FastAPI from tempest_fastapi_sdk import setup_tracing app: FastAPI = FastAPI() setup_tracing(app, service_name="orders-api", otlp_endpoint=None) ``` ### Amostragem (sampling) Em produção com tráfego alto, traçar 100% das requests é caro. Passe `sample_ratio` pra amostrar uma fração (decisão head-based, propagada pra spans filhos): ```python from fastapi import FastAPI from tempest_fastapi_sdk import setup_tracing app: FastAPI = FastAPI() setup_tracing( app, service_name="orders-api", otlp_endpoint="http://otel-collector:4317", sample_ratio=0.1, # ~10% das requests resource_attributes={"deployment.environment": "prod"}, ) ``` !!! tip "Argumentos, não env vars" O endpoint, o sampling e os atributos vêm dos **argumentos** da função — o call site é a única fonte de verdade. Nada de configurar metade no código e metade em `OTEL_*` env vars. !!! note "Instrumentação best-effort" SQLAlchemy e httpx só são instrumentados se os pacotes `opentelemetry-instrumentation-sqlalchemy` / `...-httpx` estiverem instalados (o extra `[otel]` já traz os dois). Se faltarem, a instrumentação é pulada em silêncio em vez de quebrar o boot. ## Slow query logger `SlowQueryLogger` registra um listener nos eventos do engine SQLAlchemy e emite uma linha de log toda vez que uma statement passa de um limite configurável. É a forma mais barata de achar o N+1 ou o índice faltando. **Não precisa de extra** — usa só SQLAlchemy. ```python import logging from tempest_fastapi_sdk import AsyncDatabaseManager, SlowQueryLogger db: AsyncDatabaseManager = AsyncDatabaseManager("postgresql+asyncpg://...") async def wire_slow_query_log() -> None: """Liga o log de queries lentas no startup.""" await db.connect() slow: SlowQueryLogger = SlowQueryLogger( db.engine, threshold_ms=200.0, # loga queries >= 200ms level=logging.WARNING, ) slow.attach() ``` Cada query lenta vira uma linha tipo: ```text WARNING ... slow query: 312.4ms >= 200.0ms threshold | SELECT users.id, ... ``` ### Parâmetros e EXPLAIN (só em dev) Por padrão os bind parameters **não** entram no log (costumam carregar PII/segredos). Em desenvolvimento, ligue `log_parameters=True` e/ou `explain=True` pra ver o plano de execução: ```python import logging from tempest_fastapi_sdk import SlowQueryLogger slow: SlowQueryLogger = SlowQueryLogger( db.engine, threshold_ms=50.0, log_parameters=True, # inclui os binds — dev only explain=True, # roda EXPLAIN e anexa o plano — custa 1 round-trip ) slow.attach() ``` !!! warning "EXPLAIN custa um round-trip" Com `explain=True` cada query lenta dispara um `EXPLAIN` extra. Deixe desligado em produção, ligue só quando estiver caçando um plano ruim. Pra desligar (ex.: num shutdown ou teste), chame `slow.detach()`. ## Recap - `setup_tracing(app, service_name=..., otlp_endpoint=...)` liga tracing distribuído com auto-instrumentação de FastAPI/SQLAlchemy/httpx — extra `[otel]`. - `otlp_endpoint=None` exporta spans pro console (debug local); `sample_ratio` controla a amostragem. - `SlowQueryLogger(engine, threshold_ms=...).attach()` loga queries lentas sem extra nenhum; parâmetros e `EXPLAIN` ficam atrás de flags opt-in. --- # Object storage — MinIO / S3 Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/storage/ # Object storage — MinIO / S3 `AsyncMinIOClient` é uma fachada async sobre o pacote oficial `minio`. Cobre o que serviço FastAPI típico precisa: bucket (ensure/exists/list/remove), object I/O (put/get/stream/stat/list/remove/copy) e presigned URLs (GET/PUT). Operações avançadas (versioning, lifecycle XML, SSE-KMS, multipart fine-tuning) ficam acessíveis via atributo `.client`. !!! tip "Por que esse wrapper existe" `minio-py` é **síncrono**. Chamar `client.put_object(...)` direto dentro de uma rota FastAPI bloqueia o event loop durante o upload inteiro. O wrapper envolve cada chamada em `asyncio.to_thread`, então o loop continua respondendo enquanto a operação roda no executor. ## Instalação ```bash pip install "tempest-fastapi-sdk[minio]" # ou: uv add "tempest-fastapi-sdk[minio]" ``` O pacote `minio` é lazy-loaded — só carrega quando `AsyncMinIOClient` é instanciado. Projetos sem storage não precisam do extra. ## Configuração via settings mixin ```python from tempest_fastapi_sdk import ( BaseAppSettings, MinIOSettings, ServerSettings, ) class Settings( ServerSettings, MinIOSettings, BaseAppSettings, ): """Service settings — herda MinIO defaults.""" ``` `.env`: ```bash MINIO_ENDPOINT=minio.internal:9000 MINIO_ACCESS_KEY=... MINIO_SECRET_KEY=... MINIO_SECURE=true MINIO_REGION=us-east-1 MINIO_DEFAULT_BUCKET=uploads ``` ## Wiring no `create_app()` ```python from contextlib import asynccontextmanager from collections.abc import AsyncIterator from fastapi import FastAPI from tempest_fastapi_sdk import AsyncMinIOClient from src.core.settings import settings # settings.minio_kwargs() mapeia MINIO_* -> endpoint/access_key/secret_key/ # default_bucket/secure/region, então não precisa repetir campo a campo. storage = AsyncMinIOClient(**settings.minio_kwargs()) @asynccontextmanager async def lifespan(_: FastAPI) -> AsyncIterator[None]: """Garante que o bucket padrão existe antes de servir tráfego.""" await storage.ensure_bucket() yield def create_app() -> FastAPI: """Build the configured FastAPI instance.""" return FastAPI(lifespan=lifespan) ``` ## Receitas ### Upload de `UploadFile` (FastAPI) ```python from fastapi import APIRouter, UploadFile from src.api.app import storage router = APIRouter() @router.post("/files") async def upload_file(file: UploadFile) -> dict[str, str]: """Persiste o arquivo recebido no bucket padrão.""" body = await file.read() etag = await storage.put_object( file.filename or "unnamed", body, content_type=file.content_type or "application/octet-stream", metadata={"original-name": file.filename or ""}, ) return {"key": file.filename or "unnamed", "etag": etag} ``` ### Streaming de download !!! tip "Atalho: `download_response` (ou `DownloadUtils`)" O `AsyncMinIOClient.download_response(key, ...)` já faz stat + stream + Content-Disposition/Type/Length numa chamada só — e o [`DownloadUtils(minio)`](downloads.md) embrulha isso. O exemplo manual abaixo é só pra mostrar as peças. Use para arquivos grandes — chunk-a-chunk evita carregar tudo em memória: ```python from fastapi import APIRouter from starlette.responses import Response from src.api.dependencies.resources import storage router = APIRouter() @router.get("/files/{key}") async def download_file(key: str) -> Response: """Stream do objeto no bucket padrão (uma chamada).""" return await storage.download_response(key) ``` ### Presigned URL — upload direto do browser Padrão recomendado pra arquivos grandes: o cliente faz `PUT` direto no MinIO/S3, os bytes não passam pelo FastAPI. ```python from datetime import timedelta from uuid import uuid4 from fastapi import APIRouter from pydantic import BaseModel from src.api.app import storage router = APIRouter() class PresignedUploadResponse(BaseModel): key: str url: str @router.post("/uploads/presign") async def presign_upload() -> PresignedUploadResponse: """Devolve URL temporária pro cliente fazer PUT direto.""" key = f"uploads/{uuid4().hex}" url = await storage.presigned_put_url(key, expires=timedelta(minutes=15)) return PresignedUploadResponse(key=key, url=url) ``` Cliente JS: ```javascript const { key, url } = await fetch("/uploads/presign", { method: "POST" }).then(r => r.json()); await fetch(url, { method: "PUT", body: file }); ``` ### Presigned URL — download temporário Para servir arquivos privados sem rotear bytes pela API: ```python from datetime import timedelta from fastapi import APIRouter from src.api.app import storage router = APIRouter() @router.get("/files/{key}/url") async def get_download_url(key: str) -> dict[str, str]: """URL de download válida por 1 hora.""" url = await storage.presigned_get_url(key, expires=timedelta(hours=1)) return {"url": url} ``` ### Endpoint público separado para presigned URLs *(v0.88.0+)* Cenário comum em produção: o backend fala com o MinIO por uma **rede privada rápida** (`servus-storage:9000`, sem TLS), mas o **browser** não alcança esse host — precisa de um host **público com HTTPS**. Se você assinar a presigned URL com o endpoint interno, o link vem com `servus-storage:9000` e o navegador não abre. Solução: `MINIO_PUBLIC_ENDPOINT`. As presigned URLs (`presigned_get_url` / `presigned_put_url`) passam a ser **assinadas contra o host público**, enquanto **todas as operações servidor→MinIO continuam no endpoint interno**. ```bash # .env MINIO_ENDPOINT=servus-storage:9000 # rede interna Docker (ops) MINIO_SECURE=false MINIO_PUBLIC_ENDPOINT=https://storage.example.com # browser (presigned) # MINIO_PUBLIC_SECURE=true # opcional; https:// já implica true ``` !!! info "Por que dois clients e não um replace de host" A presigned URL é assinada (SigV4) incluindo o header `Host`. Trocar o host **depois** de assinar invalida a assinatura. Por isso o SDK mantém um segundo `minio.Minio` (mesmas credenciais) só para **assinar** contra o host público — o `AsyncMinIOClient.client` interno segue fazendo put/get/stat/ensure_bucket pela rede privada. !!! tip "Sem `MINIO_PUBLIC_ENDPOINT`" Comportamento inalterado: presigned URLs são assinadas com `MINIO_ENDPOINT` (modo endpoint único). O split é 100% opt-in. O proxy do host público precisa rotear para a **API S3 do MinIO (porta 9000)** com TLS e repassar o `Host` correto (a assinatura valida o host). ### Listar objetos por prefixo ```python from fastapi import APIRouter from src.api.app import storage router = APIRouter() @router.get("/files") async def list_files(prefix: str = "") -> list[str]: """Lista chaves no bucket padrão sob ``prefix``.""" return await storage.list_objects(prefix) ``` `list_objects` devolve `[]` quando nada bate — em linha com a convenção do SDK ("nenhum match não é erro"). ### Copiar / mover ```python await storage.copy_object("uploads/draft-1", "uploads/final-1") await storage.remove_object("uploads/draft-1") ``` ## Quando NÃO usar `AsyncMinIOClient` - Quando você precisa de operações **fora** das listadas (SSE-KMS, ACLs S3 v2, bucket replication). Use `storage.client.` direto — `minio-py` continua acessível. - Para uploads gigantes (> 5 GiB) com retomada — `minio-py` faz multipart automático mas não suporta `tus` ou resume. Considere `tus.io` separadamente. ## Próximos passos - O backend pluggable de upload `MinIOUploadStorage` está disponível desde a v0.24.0 — para o pipeline que alterna disco local ↔ MinIO/S3 via flag de settings, veja a [receita de uploads](uploads.md). --- # Uploads — disco local + S3 / MinIO Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/uploads/ # Uploads — disco local + S3 / MinIO `UploadUtils` escolhe o backend **uma vez no construtor**: passe uma **pasta** para gravar em disco local, ou um **`AsyncMinIOClient`** para gravar num bucket S3/MinIO. O resto do código de upload é idêntico nos dois casos. Requer o extra `[upload]` (e `[minio]` quando usar MinIO). !!! warning "Mudança em v0.41.0 (breaking)" O backend agora vem no construtor — o antigo `save(file, storage=...)` por chamada **foi removido**. `save()` devolve a **key** de storage (relativa), e `delete()` virou **async**. Veja a migração no fim. !!! tip "Validação fica no `UploadUtils`" Tamanho, extensão, MIME, magic bytes e `content_validator` são validados no `UploadUtils` antes de qualquer byte ir pro backend — o storage só recebe dados já validados. ## Disco local ```python from fastapi import APIRouter, UploadFile from tempest_fastapi_sdk import UploadUtils router = APIRouter() uploads = UploadUtils("var/uploads", max_size_bytes=10 * 1024 * 1024) @router.post("/files") async def upload(file: UploadFile) -> dict[str, str]: """Valida e grava em disco; devolve a key (relativa ao base dir).""" key = await uploads.save(file) return {"key": str(key)} ``` ## MinIO / S3 Passe o `AsyncMinIOClient` direto — nada mais muda: ```python from tempest_fastapi_sdk import AsyncMinIOClient, UploadUtils from src.core.settings import settings minio = AsyncMinIOClient(**settings.minio_kwargs()) uploads = UploadUtils(minio, max_size_bytes=10 * 1024 * 1024) # idêntico ao caso local: key = await uploads.save(file, filename="logo.png") # grava no bucket ``` !!! tip "Centralize em `resources.py`" Construa o `uploads` (e o `minio`) uma vez em [`src/api/dependencies/resources.py`](../architecture.md) e injete via `Depends(get_uploads)`, em vez de instanciar por request. ## Restringir extensões (allowlist) Passe `allowed_extensions` no construtor com o conjunto de extensões que você aceita. Tudo fora da lista é rejeitado com **HTTP 415** (`InvalidFileTypeException`) **antes de qualquer byte ser lido** — então um `.zip` malicioso nunca chega ao backend nem ocupa memória: ```python from tempest_fastapi_sdk import UploadUtils # Só modelos ONNX — qualquer outra extensão é bloqueada. uploads = UploadUtils( "var/models", allowed_extensions={".onnx", ".ort"}, max_size_bytes=200 * 1024 * 1024, ) ``` ```python @router.post("/models") async def upload_model(file: UploadFile) -> dict[str, str]: """Aceita só .onnx / .ort; um .zip levanta 415 aqui dentro do save().""" key = await uploads.save(file) # file.zip -> InvalidFileTypeException (415) return {"key": str(key)} ``` !!! info "Ponto e case são normalizados" `{".onnx", ".ort"}`, `{"onnx", "ort"}` e `{".ONNX"}` são equivalentes — o `UploadUtils` tira o ponto inicial e baixa pra minúsculo. A extensão vem de `Path(file.filename).suffix`, então `modelo.ONNX` passa e `pacote.zip` não. !!! warning "Extensão não é o conteúdo" Conferir extensão impede o engano honesto e o `.zip` óbvio, mas o nome do arquivo é controlado pelo cliente. Pra formatos com assinatura conhecida (imagens, PDF) ligue `verify_magic_bytes=True` + `allowed_mimetypes={...}` pra casar os **bytes reais** contra a allowlist. Formatos binários sem assinatura no `sniff_mime` (como `.onnx` / `.ort`) **devem** manter `verify_magic_bytes=False` (o default) — senão o sniff não reconhece a assinatura e rejeita tudo. Pra esses, valide o conteúdo com um `content_validator=...` no `save()`. ### Via settings (`.env`) Quando preferir configurar por ambiente, o `UploadSettings` já expõe `UPLOAD_ALLOWED_EXTENSIONS` (e `UPLOAD_ALLOWED_MIMETYPES`): ```bash # .env UPLOAD_ALLOWED_EXTENSIONS=[".onnx", ".ort"] UPLOAD_MAX_SIZE_BYTES=209715200 ``` ```python uploads = UploadUtils( settings.UPLOAD_DIR, allowed_extensions=settings.UPLOAD_ALLOWED_EXTENSIONS, max_size_bytes=settings.UPLOAD_MAX_SIZE_BYTES, ) ``` ## Alternar por settings Escolha o argumento do construtor conforme uma flag do seu `Settings` — não precisa de backend pluggável manual: ```python # src/api/dependencies/resources.py from tempest_fastapi_sdk import AsyncMinIOClient, UploadUtils from src.core.settings import settings if settings.UPLOAD_BACKEND == "minio": uploads = UploadUtils(AsyncMinIOClient(**settings.minio_kwargs())) else: uploads = UploadUtils(settings.UPLOAD_DIR) ``` (`UPLOAD_BACKEND` é um campo do seu `Settings`; o SDK só carrega `UPLOAD_DIR` / `UPLOAD_MAX_SIZE_BYTES` / `UPLOAD_ALLOWED_EXTENSIONS` / `UPLOAD_ALLOWED_MIMETYPES` via `UploadSettings`.) ## Operações comuns ```python key = await uploads.save(file, filename="logo.png") # -> Path("logo.png") removed = await uploads.delete(key) # async; True/False ``` ### Trocar um arquivo (avatar, anexo) — `replace` O caso clássico: o usuário manda uma foto de perfil nova e você quer **gravar a nova e apagar a antiga**. Em vez de fazer `save` + `delete` na mão (e arriscar apagar pelo backend errado), use `replace`: ```python # old_key é o que está salvo hoje no model (pode ser None no 1º upload) new_key = await uploads.replace( user.profile_picture, file, filename=f"{user.id}.jpg" ) user.profile_picture = str(new_key) ``` !!! tip "A ordem importa — e o `replace` acerta pra você" O `replace` **grava a nova primeiro** e só então apaga a antiga. Se a validação reprovar o arquivo novo (extensão/MIME/tamanho), a antiga fica **intacta** — você nunca fica sem imagem nenhuma. Passe `old_key=None` no primeiro upload (não há nada pra apagar) e o método só salva. Tudo passa pelo **mesmo backend** configurado (local ou MinIO), evitando o erro de salvar num e apagar no outro. Para **baixar** o que foi enviado (local ou MinIO), use o [`DownloadUtils`](downloads.md) — ele aceita o mesmo backend no construtor. ## Quando usar presigned PUT direto Pra arquivos > 50 MB, evite buffer em memória — mande o cliente fazer `PUT` direto no MinIO via URL presigned. Veja [Storage MinIO/S3](storage.md#presigned-url-upload-direto-do-browser). ## Migração de < v0.41.0 - `UploadUtils("./dir")` continua igual (disco local). - `UploadUtils("./tmp")` + `save(file, storage=MinIOUploadStorage(client))` → vira `UploadUtils(client)` + `save(file)`. - `save()` agora devolve a **key** (relativa), não um caminho absoluto — guarde a key e use `DownloadUtils.download(key)` pra servir. - `utils.delete(path)` (sync) → `await utils.delete(key)` (async). --- # Arquivo no serviço — `StoredFileServiceMixin` Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/stored-files/ # Arquivo no serviço — `StoredFileServiceMixin` Uma entidade quase sempre carrega **uma** chave de storage: o avatar do usuário, o banner de um evento, a capa de um produto, um anexo. E todo serviço que cuida disso reescreve a mesma dança na mão: 1. resolver a entidade, 2. subir o arquivo novo e apagar o antigo, 3. gravar a chave nova no model, 4. dar `commit`, 5. devolver uma URL temporária de download. O `StoredFileServiceMixin` faz esse fluxo **uma vez**, parametrizado pelo **nome do campo** — então um mesmo serviço cuida de vários campos de arquivo sem repetição. Ele monta por cima do [`UploadUtils`](uploads.md) (upload + validação) e do [`AsyncMinIOClient`](storage.md) (presigned URL); requer os extras `[upload]` e `[minio]`. !!! info "Escopo: o caso comum" Cobre **uma chave por campo → URL presigned**. Thumbnails, variantes (S/M/L), bucket público/CDN e galerias (um-para-muitos) ficam de fora — pra esses, componha o `UploadUtils` direto. ## Misturando no seu serviço O mixin não constrói nada: ele lê dois colaboradores de `self` — `upload_utils` e `storage`. Quem mistura continua dono da configuração (tamanho, tipos aceitos, bucket): ```python from tempest_fastapi_sdk import ( AsyncMinIOClient, BaseService, StoredFileServiceMixin, UploadUtils, ) from src.db.models import UserModel from src.db.repositories import UserRepository from src.schemas import UserResponseSchema class UserService( BaseService[UserRepository, UserResponseSchema], StoredFileServiceMixin[UserModel], ): def __init__( self, repository: UserRepository, storage: AsyncMinIOClient, upload_utils: UploadUtils, ) -> None: super().__init__(repository) self.storage = storage self.upload_utils = upload_utils ``` A ordem das bases importa: `BaseService` traz o `repository`; o mixin só adiciona os métodos de arquivo por cima. ## Trocar o arquivo — `set_file` ```python async def update_profile_picture( self, user: UUID | UserModel, image: UploadFile ) -> UserResponseSchema: """Sobe a foto nova, apaga a antiga e devolve o perfil com a URL.""" updated = await self.set_file( user, image, field="profile_picture", subdir="profiles" ) response = await self._map_to_response(updated) response.profile_picture_url = await self.file_url(updated.profile_picture) return response ``` Foi isso. Comparado às ~13 linhas manuais, o `set_file` resolve a entidade, chama `replace` (grava a nova **antes** de apagar a antiga), grava a chave e dá `commit` — tudo num passo. !!! tip "Seguro com o usuário autenticado" O `set_file` re-resolve a entidade na sessão do request via `repository.resolve()`. Se você passar o `UserModel` que veio de `get_current_user` (que em apps mal-fiados vinha *detached*), o `resolve` o reanexa antes da escrita — sem o `InvalidRequestError: Instance is not persistent within this Session`. ## Servir a URL — `file_url` ```python url = await self.file_url(user.profile_picture) # 1h de validade url = await self.file_url(user.profile_picture, expires=timedelta(minutes=5)) ``` Devolve `None` quando a chave é vazia, então você pode jogar o resultado direto num campo do schema de resposta sem `if`: ```python response.profile_picture_url = await self.file_url(updated.profile_picture) ``` ## Remover o arquivo — `clear_file` ```python updated = await self.clear_file(user, field="profile_picture") ``` Apaga o objeto do storage e zera o campo. Quando o campo já está vazio, é um **no-op**: a entidade volta sem `commit` e sem chamar o storage. ## Vários campos? Mesmo mixin `field=` é só um argumento — o mesmo serviço cuida de quantos campos quiser: ```python await self.set_file(event, cover, field="cover_image", subdir="events/covers") await self.set_file(event, banner, field="banner_image", subdir="events/banners") ``` ## Recapitulando - Misture `StoredFileServiceMixin[Model]` no serviço e exponha `upload_utils` + `storage`. - `set_file(ref, file, *, field, subdir=...)` → sobe, troca a antiga, persiste. Detach-safe. - `file_url(key, *, expires=...)` → URL presigned ou `None`. - `clear_file(ref, *, field)` → apaga + zera (no-op se já vazio). - Caso comum (uma chave + presigned). Pra resize/variantes/galeria, use o [`UploadUtils`](uploads.md) direto. --- # Downloads Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/downloads/ # Downloads `DownloadUtils` serve arquivos para download/inline — de **disco local** ou direto de um **bucket MinIO/S3**. Escolha o backend **uma vez no construtor** (igual ao [Uploads](uploads.md)): passe uma pasta, ou um `AsyncMinIOClient`. Depois chame `download(key)` — funciona igual nos dois. Faz parte do SDK base (sem extra; MinIO precisa do `[minio]`). No modo local há **proteção contra path traversal**: qualquer caminho que escape do `base_dir` (`../`, absoluto, symlink) levanta `NotFoundException` — o mesmo 404 de arquivo inexistente, então o cliente nunca distingue "não existe" de "proibido". ## Disco local ```python # src/api/routers/files.py from fastapi import APIRouter from starlette.responses import Response from tempest_fastapi_sdk import DownloadUtils router = APIRouter(prefix="/files", tags=["files"]) downloads = DownloadUtils("var/uploads") @router.get("/{name}") async def download(name: str) -> Response: """Baixa um arquivo de var/uploads (forçando download).""" return await downloads.download(name) ``` ## MinIO / S3 Mesmo código, só muda o construtor — `download(key)` faz proxy do objeto do bucket (sem cair em disco, sem carregar inteiro na memória): ```python from tempest_fastapi_sdk import AsyncMinIOClient, DownloadUtils from src.core.settings import settings minio = AsyncMinIOClient(**settings.minio_kwargs()) downloads = DownloadUtils(minio) @router.get("/files/{name}") async def download(name: str) -> Response: """Baixa o objeto do bucket via streaming, atrás do auth da app.""" return await downloads.download(name, subdir="invoices") ``` Parâmetros do `download`: `subdir=` (pasta local / prefixo da key), `filename=` (nome mostrado ao cliente), `media_type=` (senão vem do content-type do objeto / extensão), `as_attachment=False` (servir **inline** — ex.: abrir um PDF no navegador), `headers=`. !!! tip "Proxy (app) vs presigned (direto)" `download()` faz o **proxy** pela app — ideal quando o download precisa passar pelo auth ou o MinIO não é público. Quando o cliente pode falar direto com o MinIO, prefira `presigned_get_url` (veja [Storage](storage.md)) e devolva um redirect — offload total do tráfego. ## Servir um arquivo do disco (controle fino) No modo local, `file_response` dá controle direto e devolve um `FileResponse` transmitido em chunks pelo Starlette (suporta range requests): ```python return downloads.file_response(name, subdir="invoices", as_attachment=False) ``` Parâmetros: `subdir=`, `filename=`, `media_type=`, `as_attachment=`, `headers=`. (Só no modo local; num `DownloadUtils` MinIO levanta `RuntimeError` — use `download()`.) !!! danger "Path traversal é bloqueado por construção" `downloads.file_response("../../etc/passwd")` levanta `NotFoundException` (404), não vaza o arquivo. Sempre construa o `DownloadUtils` com um `base_dir` dedicado a conteúdo servível. ## Transmitir bytes gerados na hora Quando o payload é produzido em runtime (relatório, zip em memória, bytes descriptografados) e **não** vem do disco, use `stream` — aceita `bytes`, um iterável sync ou um async-iterable: ```python from collections.abc import AsyncIterator from fastapi.responses import StreamingResponse @router.get("/report.csv") async def report() -> StreamingResponse: """Gera um CSV sob demanda e baixa como report.csv.""" async def rows() -> AsyncIterator[bytes]: yield b"id,name\n" for i in range(1000): yield f"{i},item-{i}\n".encode() return downloads.stream(rows(), filename="report.csv", media_type="text/csv") ``` ## Header `Content-Disposition` Para montar o header manualmente (fora do `DownloadUtils`), use `build_content_disposition` — ela escapa o nome do arquivo corretamente (RFC 5987, com fallback ASCII): ```python from tempest_fastapi_sdk import build_content_disposition header: str = build_content_disposition("relatório 2026.pdf", as_attachment=True) # -> attachment; filename="relatorio 2026.pdf"; filename*=UTF-8''relat%C3%B3rio%202026.pdf ``` ## Recap - `DownloadUtils(pasta)` ou `DownloadUtils(minio_client)` — backend no construtor. - `await downloads.download(key, ...)` — unificado: `FileResponse` (local) ou streaming (MinIO). - `stream(content, filename=...)` para bytes/geradores produzidos na hora (qualquer modo). - `file_response(...)` é local-only (controle fino); MinIO usa `download()`. - `as_attachment=False` serve inline; `as_attachment=True` (default) força download. - Local: path traversal vira `NotFoundException` — seguro por construção. --- # Idempotência Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/idempotency/ # Idempotência `IdempotencyMiddleware` implementa o padrão `Idempotency-Key` usado por Stripe, AWS, GitHub e Plaid: o cliente envia um header único, o servidor processa **uma vez** e devolve a mesma resposta a qualquer retry, sem duplicar linha no banco / cobrar duas vezes. ## Como funciona 1. Cliente envia `POST /charge` com `Idempotency-Key: chk_`. 2. Middleware processa, salva a resposta completa indexada por `(method, path, key)`. 3. Cliente retentou? Middleware devolve a **mesma resposta cacheada**. Handler não roda de novo. Só verbos mutantes (`POST` / `PUT` / `PATCH` / `DELETE`) são elegíveis — `GET` é naturalmente idempotente. !!! warning "Opt-in por requisição" Sem o header, o middleware deixa passar normal. Endpoints existentes não quebram — só quem precisar da garantia envia o header. ## Setup mínimo (single-replica / dev) ```python from fastapi import FastAPI from tempest_fastapi_sdk import ( IdempotencyMiddleware, MemoryIdempotencyStore, ) app = FastAPI() app.add_middleware( IdempotencyMiddleware, store=MemoryIdempotencyStore(), ttl_seconds=24 * 3600, ) ``` `MemoryIdempotencyStore` guarda em dict local — funciona só pra uma réplica. Pra produção use Redis. ## Setup produção (multi-réplica via Redis) ```python from fastapi import FastAPI from redis.asyncio import Redis from tempest_fastapi_sdk import ( IdempotencyMiddleware, RedisIdempotencyStore, ) from src.core.settings import settings redis = Redis.from_url(settings.REDIS_URL) app = FastAPI() app.add_middleware( IdempotencyMiddleware, store=RedisIdempotencyStore(redis, prefix="idem:"), ttl_seconds=24 * 3600, ) ``` Stripe usa 24h por padrão — coerente com retry exponencial do lado do cliente. ## Cliente ```python import uuid import httpx async def create_charge(amount_cents: int) -> dict[str, object]: """POST idempotente com retry automático.""" key = uuid.uuid4().hex async with httpx.AsyncClient() as c: for _ in range(3): try: r = await c.post( "https://api/charge", json={"amount_cents": amount_cents}, headers={"Idempotency-Key": key}, timeout=10, ) return r.json() except httpx.ReadTimeout: continue raise RuntimeError("3 retries failed") ``` Em qualquer das 3 tentativas que chegar ao servidor, o resultado final é o mesmo recurso criado uma única vez — réplicas restantes recebem a resposta cacheada. ## Quando usar - Pagamento / cobrança - Envio de webhook (cliente retenta com mesmo key) - Operações de side-effect externo (envio de email, SMS) - Qualquer `POST /create` cujo retry pode duplicar registro ## Quando NÃO usar - `GET` (já idempotente) - Operações trivialmente reentrantes (`PATCH` que reescreve mesmo valor) - Quando a duplicação não tem consequência (logs, métricas) ## Backend customizado Implemente o protocolo `IdempotencyStore`: ```python from tempest_fastapi_sdk import CachedResponse, IdempotencyStore class DynamoIdempotencyStore: """Exemplo de backend DynamoDB.""" async def get(self, key: str) -> CachedResponse | None: ... async def set( self, key: str, response: CachedResponse, *, ttl_seconds: int, ) -> None: ... # Funciona com o middleware igual aos backends nativos: assert isinstance(DynamoIdempotencyStore(), IdempotencyStore) ``` --- # Sync offline-first (delta) Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/offline-sync/ # Sync offline-first (delta) Apps móveis e PWAs trabalham offline: capturam dados sem rede e sincronizam quando a conexão volta. O backend precisa responder uma pergunta só — *"o que mudou desde a última vez que falei com você?"* — e isso inclui **registros deletados**, senão eles ficam órfãos para sempre no aparelho. Este recipe monta esse fluxo bidirecional (push + pull) com `BaseRepository.changes_since`, sem reescrever lógica de cursor por projeto. ## O problema O cliente guarda os dados localmente (IndexedDB, SQLite) e mantém um **watermark**: o instante do último sync bem-sucedido. No próximo sync ele quer: 1. **Push** — enviar o que criou/editou offline. Como pode reenviar (retry, rede instável), a escrita precisa ser **idempotente**. 2. **Pull** — receber tudo que mudou no servidor desde o watermark, incluindo exclusões, para espelhar localmente. ## O modelo Use o `id` gerado pelo cliente como chave primária (idempotência de graça) e misture `SoftDeleteMixin` para que exclusões virem **tombstones** em vez de sumirem da query. ```python from uuid import UUID from sqlalchemy import ForeignKey, String from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import BaseModel, SoftDeleteMixin class AnalysisModel(BaseModel, SoftDeleteMixin): """Uma análise sincronizável, com id vindo do dispositivo.""" __tablename__ = "analyses" user_id: Mapped[UUID] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True, ) animal_id: Mapped[str] = mapped_column(String(120), nullable=False, default="") notes: Mapped[str] = mapped_column(String(2000), nullable=False, default="") ``` `BaseModel` já entrega `id`, `created_at`, `updated_at` e `is_active`; `SoftDeleteMixin` adiciona `deleted_at` + `is_deleted` + `mark_deleted()`. ## O repositório `changes_since` é o único método novo de que você precisa. Crie um repositório fino para mapear linha → schema: ```python from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import BaseRepository class AnalysisRepository(BaseRepository[AnalysisModel]): """Acesso a dados das análises sincronizáveis.""" def __init__(self, session: AsyncSession) -> None: super().__init__(session, model=AnalysisModel) ``` ## Push idempotente O `id` é do cliente, então "upsert por id" não duplica em retry: ```python from uuid import UUID async def upsert_analysis( repo: AnalysisRepository, *, user_id: UUID, analysis_id: UUID, animal_id: str, notes: str, ) -> AnalysisModel: """Cria ou atualiza uma análise, idempotente por id do cliente. Args: repo (AnalysisRepository): O repositório de análises. user_id (UUID): O dono do registro. analysis_id (UUID): O id gerado no dispositivo (chave primária). animal_id (str): Identificador do animal (brinco / herd id). notes (str): Observações livres. Returns: AnalysisModel: A linha persistida. """ existing = await repo.get_or_none({"id": analysis_id, "user_id": user_id}) if existing is not None: existing.animal_id = animal_id existing.notes = notes return await repo.update(existing) return await repo.add( AnalysisModel( id=analysis_id, user_id=user_id, animal_id=animal_id, notes=notes, ) ) ``` ## Pull (o delta) `changes_since(since)` devolve só o que mudou após o watermark, em ordem crescente de `updated_at`, paginado por cursor, **com os tombstones**: ```python from datetime import datetime from uuid import UUID async def pull_changes( repo: AnalysisRepository, *, user_id: UUID, since: datetime | None, cursor: str | None = None, ) -> dict[str, object]: """Retorna as análises que mudaram desde o watermark do cliente. Args: repo (AnalysisRepository): O repositório de análises. user_id (UUID): Escopo do dono — nunca sincronize sem ele. since (datetime | None): Watermark do último sync. None faz o sync completo (primeira vez). cursor (str | None): Cursor da página anterior; None pega a primeira página. Returns: dict[str, object]: O envelope de cursor + `server_time`. """ return await repo.changes_since( since, filters={"user_id": user_id}, cursor=cursor, limit=100, ) ``` !!! danger "Sempre passe o escopo do dono" `changes_since` **não** filtra por usuário sozinho. Passe sempre `filters={"user_id": user_id}` (ou o escopo do tenant), senão um cliente puxa o delta do mundo inteiro. ## O endpoint `SyncFilterSchema` e `SyncPaginationSchema` casam exatamente com os argumentos e o retorno de `changes_since`: ```python from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, Query from tempest_fastapi_sdk import SyncFilterSchema, SyncPaginationSchema router = APIRouter(prefix="/api/analyses", tags=["sync"]) @router.get("/changes") async def get_changes( filters: Annotated[SyncFilterSchema, Query()], repo: AnalysisRepository = Depends(get_analysis_repository), user_id: UUID = Depends(get_current_user_id), ) -> SyncPaginationSchema[AnalysisResponseSchema]: """Endpoint de pull: o delta desde o watermark do cliente.""" page = await repo.changes_since( filters.since, filters={"user_id": user_id}, cursor=filters.cursor, limit=filters.limit, include_deleted=filters.include_deleted, ) return SyncPaginationSchema[AnalysisResponseSchema]( items=[AnalysisResponseSchema.model_validate(r) for r in page["items"]], next_cursor=page["next_cursor"], has_more=page["has_more"], limit=page["limit"], server_time=page["server_time"], ) ``` ## O protocolo de watermark Esta é a parte que costuma dar bug. Siga à risca: 1. **Primeiro sync:** chame com `since=None`. Drene todas as páginas via `next_cursor` até `has_more` ser `False`. 2. **Guarde o `server_time` da resposta** como próximo `since` — **não** use o maior `updated_at` dos itens, nem o relógio do dispositivo. 3. **Próximo sync:** mande aquele `server_time` como `since`. O filtro é `updated_at > since` (estrito). !!! tip "Por que `server_time` e não o relógio do cliente" O `server_time` é capturado no servidor **antes** da query rodar. Como ele é um marco do próprio relógio do banco, qualquer linha escrita depois tem `updated_at` maior e aparece no pull seguinte — imune ao clock skew do aparelho. !!! warning "Tombstones não são opcionais" Deixe `include_deleted=True` (padrão). Um pull que esconde os deletados deixa linhas excluídas presas no dispositivo para sempre, porque o cliente nunca fica sabendo que elas saíram. ## Filtros de comparação `changes_since` é açúcar em cima de um recurso mais geral: o sufixo `__` em qualquer `filters`. Disponível em `list`, `paginate`, `cursor_paginate`, `count` etc. ```python # updated_at > watermark (precisão de timestamp) await repo.list(filters={"updated_at__gt": watermark}) # faixa: 1 <= value <= 10 await repo.list(filters={"value__gte": 1, "value__lte": 10}) # diferente de await repo.list(filters={"status__ne": "archived"}) ``` Operadores: `gt`, `gte`, `lt`, `lte`, `ne`. Um valor `None` ignora a condição, igual a qualquer outro filtro. !!! note "Diferente de `start_in` / `end_in`" `start_in` / `end_in` filtram por **dia inteiro** sobre `created_at`. Os operadores `__gt` etc. são por **timestamp**, em qualquer coluna — é o que o watermark de sync precisa. ## Recap - Use o **id do cliente como PK** → push vira upsert idempotente. - Misture **`SoftDeleteMixin`** → exclusões viram tombstones que o pull entrega. - **`changes_since(since, filters={"user_id": ...})`** é o pull inteiro: delta por `updated_at`, ordem estável, cursor e tombstones. - Persista o **`server_time`** da resposta como próximo `since` — não o relógio do dispositivo. - Por baixo, os operadores **`__gt/gte/lt/lte/ne`** funcionam em qualquer `filters`. --- # Bundled auth flow (signup / activate / login / reset) Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/auth-flow/ # Bundled auth flow (signup / activate / login / reset) Desde v0.31.0 o SDK fornece o ciclo completo de conta local — signup com email/senha, ativação por link, login com JWT pair, reset de senha — via `UserAuthService` + `make_auth_router`. **Endpoints prontos pra mount** (incluindo `POST /auth/refresh` desde v0.65.0), templates Jinja2 bundled, settings flags controlando se o link sai por e-mail ou no body da resposta, e quatro modos pré-pensados pra dev / staging / produção / CI. ## Conteúdo da receita 1. **[Setup mínimo](#setup-minimo)** — instalação dos extras + wiring de quatro objetos (`AsyncDatabaseManager`, `EmailUtils`, `UserAuthService`, `make_auth_router`). 2. **[UserTokenModel concreto](#usertokenmodel-concreto)** — `BaseUserTokenModel` é abstrato, projeto cria a tabela final. 3. **[Endpoints](#endpoints)** — tabela dos 5 endpoints + payload + comportamento. 4. **[Settings — variáveis de ambiente](#settings-variaveis-de-ambiente)** — env vars em **seis grupos** (JWT, política de senha, fluxo de e-mail, TTL, URLs/templates, páginas backend) — cada uma em tabela tipada, não num blob. 5. **[Anatomia de um e-mail: como link, template e URL se encaixam](#anatomia-de-um-e-mail)** — desambigua os três conceitos que mais confundem. 6. **[Cinco modos de operação](#cinco-modos-de-operacao)** — produção, dev com SMTP local (Mailhog / smtp4dev), dev sem SMTP, CI sem ativação e **backend-only** (links e páginas servidas direto pelo backend). 7. **[Mailhog vs smtp4dev — qual escolher pra dev local](#mailhog-vs-smtp4dev)** — comparativo + receitas docker-compose copy-paste. 8. **[Customizando os templates de e-mail](#customizando-templates)** — override do `activation.html` e `password_reset.html` + variáveis disponíveis no contexto Jinja2. 9. **[Segurança](#seguranca)** — como o token é armazenado, TTL, anti-enumeração. 10. **[Próximos passos](#proximos-passos)**. --- ## Setup mínimo Requer: - `[auth]` (bcrypt + PyJWT) — obrigatório, sempre. - `[email]` (aiosmtplib + Jinja2 + email-validator) — opcional; quando ausente, os links vão no body da resposta em vez de e-mail. ```bash uv add "tempest-fastapi-sdk[auth,email]>=0.31.0" ``` ```python # src/api/app.py from tempest_fastapi_sdk import ( AsyncDatabaseManager, EmailUtils, UserAuthService, make_auth_router, ) from src.core.settings import settings from src.db.models import UserModel, UserTokenModel db = AsyncDatabaseManager(settings.DATABASE_URL) # EmailUtils — só instancie se [email] estiver instalado E você quiser e-mail # real (modos A e B abaixo). Nos modos C e D, passe email=None pro service. emails = EmailUtils( host=settings.SMTP_HOST, port=settings.SMTP_PORT, username=settings.SMTP_USERNAME, password=settings.SMTP_PASSWORD, from_addr=settings.SMTP_FROM_ADDR, template_dir="emails", # diretório onde seus templates custom moram ) auth_service = UserAuthService( db=db, # necessário pra current_user_dependency (ver seção final) user_model=UserModel, token_model=UserTokenModel, auth_settings=settings, # mistura AuthSettings (ver seção 4) jwt_settings=settings, # mistura JWTSettings email=emails, # ou None — controla envio real vs link no body ) app.include_router( make_auth_router( auth_service, session_factory=db.session_dependency, ), ) ``` !!! tip "TL;DR de quatro objetos" `AsyncDatabaseManager` → conexão. `EmailUtils` → SMTP + Jinja2. `UserAuthService` → regras de negócio (5 métodos). `make_auth_router` → cola tudo em 5 endpoints HTTP. --- ## UserTokenModel concreto `BaseUserTokenModel` é abstrato — projeto cria a tabela concreta porque a FK pra `users` precisa do nome da sua tabela. Exemplo `src/db/models/user_token.py`: ```python from uuid import UUID from sqlalchemy import ForeignKey from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import BaseUserTokenModel class UserTokenModel(BaseUserTokenModel): """Concrete token table for activation / reset / email-verification.""" __tablename__ = "user_tokens" user_id: Mapped[UUID] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True, ) ``` E importar em `src/db/models/__init__.py` pra Alembic ver: ```python from src.db.models.user import UserModel from src.db.models.user_token import UserTokenModel __all__: list[str] = ["UserModel", "UserTokenModel"] ``` Gerar migration (a primeira vez, bootstrape o Alembic com `tempest db init`): ```bash # Só na primeira vez — gera alembic/, alembic.ini e env.py: uv run tempest db init # A partir daí, ciclo normal de revisão: uv run tempest db revision -m "users + user_tokens" uv run tempest db upgrade ``` --- ## Endpoints | Método | Path | Body / Output | Comportamento | |--------|------|---------------|---------------| | POST | `/auth/signup` | `SignupSchema` → `SignupResponseSchema` | Cria user. Emite e-mail (modos A/B) **ou** devolve link no body (modo C). Se `AUTH_AUTO_ACTIVATE=True`, user nasce ativo e JWT pair volta direto (modo D). | | POST | `/auth/activate/{token}` | — → `ActivationResponseSchema` | Consome token + `is_active=True` + emite JWT pair. | | POST | `/auth/login` | `LoginSchema` → `LoginResponseSchema` | Email + senha → JWT pair. Erros genéricos (não enumera contas). | | POST | `/auth/password-reset/request` | `PasswordResetRequestSchema` → `PasswordResetResponseSchema` | Sempre HTTP 202 + corpo genérico. Link via e-mail (A/B) ou no body (C). | | POST | `/auth/password-reset/confirm` | `PasswordResetConfirmSchema` → `LoginResponseSchema` | Consome token + grava nova senha + emite JWT pair. | | POST | `/auth/password-change` | `PasswordChangeSchema` → `204` | **Autenticado** (bearer token). Troca a própria senha: confirma a senha atual + grava a nova. Sem token de e-mail. | | POST | `/auth/refresh` *(v0.65.0+)* | `RefreshSchema` → `LoginResponseSchema` | Troca um **refresh token** válido por um JWT pair novo. **Sem email/senha.** Rejeita access token replayado (401) e conta inativa (403). | !!! tip "`password-reset/confirm` vs `password-change` — qual é qual?" São fluxos **diferentes**, não confunda: - **`/auth/password-reset/confirm`** — o usuário **esqueceu** a senha. Ele não está logado; prova identidade com o **token** que recebeu por e-mail. (Veja `/auth/password-reset/request` antes.) - **`/auth/password-change`** — o usuário **lembra** a senha e está **logado**. Manda o `access_token` no header `Authorization: Bearer …` e reconfirma a `current_password`. Não envolve e-mail nem token de reset. Retorna **204** e os tokens atuais continuam válidos. ### Renovando a sessão com o refresh token O `access_token` é curto por design (`JWT_ACCESS_TTL_SECONDS`, 1 h por padrão). Quando ele expira, **não force o usuário a logar de novo** — troque o `refresh_token` (longo, 7 dias por padrão) por um par novo em `POST /auth/refresh`: ```bash curl -X POST localhost:8000/auth/refresh \ -H "Content-Type: application/json" \ -d '{"refresh_token": "eyJhbGciOi…"}' ``` ```json { "user_id": "7d8e4d5a-9f4b-4c3a-bd0a-1234567890ab", "access_token": "eyJhbGciOi…(novo)", "refresh_token": "eyJhbGciOi…(novo)", "mfa_required": false, "mfa_token": null } ``` O endpoint decodifica o token, exige que ele realmente carregue a claim `refresh` (um **access** token replayado aqui é rejeitado com **401**), resolve o `sub` pra um usuário **ativo** e emite um par novo. !!! warning "Os dois tokens rotacionam" A resposta traz um `refresh_token` **novo**. Persista esse e descarte o que você enviou. No modo **stateless** (default) o SDK emite JWTs, então o par antigo não é revogado — ele continua válido até o próprio `exp`. !!! tip "Precisa de revogação real? O SDK já entrega" Não escreva sua própria tabela de refresh tokens — desde a **v0.66.0** o SDK oferece refresh tokens **DB-backed opacos** com rotação single-use, **detecção de reuso** (token roubado revoga a família inteira) e `POST /auth/logout`. É opt-in: passe um `refresh_token_model` pro `UserAuthService`. Veja a receita [Refresh tokens (rotação/revogação)](refresh-tokens.md). !!! tip "Quando o refresh token também expira" Aí não tem renovação possível — o **401** é definitivo e o cliente cai de volta no `POST /auth/login` com email + senha. Sem frontend? O método de serviço por trás do endpoint é público — `await service.refresh_tokens(session, refresh_token=...)` devolve `(user, access_token, refresh_token)`. --- ## Settings — variáveis de ambiente Toda a config do flow vem de mixins de settings. Mixe-as na sua classe `Settings`: ```python # src/core/settings.py from tempest_fastapi_sdk import ( AuthSettings, BaseAppSettings, DatabaseSettings, EmailSettings, JWTSettings, ServerSettings, ) class Settings( ServerSettings, DatabaseSettings, EmailSettings, JWTSettings, AuthSettings, BaseAppSettings, ): pass settings: Settings = Settings() ``` !!! info "Nome do atributo **é** o nome da env var" Cada atributo das tabelas abaixo é lido de uma variável de ambiente de **mesmo nome**, case-sensitive, **sem prefixo**. `AUTH_PASSWORD_MIN_LENGTH` no `.env` → `settings.AUTH_PASSWORD_MIN_LENGTH`. Todos têm default — você só seta o que quer mudar. As variáveis se dividem em **dois mixins** e **seis grupos de concern**. Elas estão separadas de propósito: senha não é a mesma coisa que e-mail, e autenticação (JWT) não é a mesma coisa que ativação de conta. ### Grupo 1 — Autenticação / JWT (`JWTSettings`) Controla a assinatura e validade dos tokens que o login devolve. **É o mesmo `JWT_SECRET` que a dependency `get_current_user` usa pra validar** (veja [Pegando o `current_user`](#pegando-o-current_user-da-requisicao)). | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `JWT_SECRET` | `str` (≥32 bytes) | `change-me-…-32` | Segredo HMAC que assina o JWT. **Obrigatório trocar em produção.** | | `JWT_ALGORITHM` | `str` | `HS256` | Algoritmo JOSE. `HS256`/`HS512` (segredo simétrico) ou `RS256` (par de chaves). | | `JWT_ACCESS_TTL_SECONDS` | `int` (≥1) | `3600` | Validade do **access token** (1 h). Curto por design — renove via refresh. | | `JWT_REFRESH_TTL_SECONDS` | `int` (≥1) | `604800` | Validade do **refresh token** (7 dias). | | `JWT_ISSUER` | `str \| None` | `None` | Claim `iss`. `None` omite o claim. | !!! danger "`JWT_SECRET` default vaza tokens" O default `change-me-change-me-change-me-32` existe só pra subir local. Em produção, **qualquer um** com o default consegue forjar um JWT válido. Gere um segredo forte (`openssl rand -base64 48`) e injete por secret manager — nunca commite. ### Grupo 2 — Política de senha (`AuthSettings`) | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `AUTH_PASSWORD_MIN_LENGTH` | `int` (≥1) | `12` | Comprimento mínimo aceito no signup **e** no reset. | | `AUTH_PASSWORD_REQUIRE_COMPLEXITY` | `bool` | `false` | `true` = exige 1 minúscula + 1 maiúscula + 1 dígito + 1 caractere especial. | Os dois interagem — **é aqui que costuma confundir**. A regra exata: - **`complexity=false` (default):** só o comprimento importa. Qualquer senha com `≥ AUTH_PASSWORD_MIN_LENGTH` caracteres passa, sem exigência de composição. - **`complexity=true`:** além das 4 classes de caracteres, o piso de comprimento **efetivo** vira `max(AUTH_PASSWORD_MIN_LENGTH, 8)`. Ou seja, um `AUTH_PASSWORD_MIN_LENGTH` abaixo de 8 é **ignorado** enquanto a complexidade está ligada. Tabela de decisão: | `MIN_LENGTH` | `REQUIRE_COMPLEXITY` | Senha aceita quando | |--------------|---------------------|---------------------| | `12` | `false` | `≥ 12` chars, qualquer composição | | `4` | `false` | `≥ 4` chars, qualquer composição (piso baixo, dev-only) | | `4` | `true` | `≥ 8` chars (piso 8 **sobrescreve** o 4) **+** as 4 classes | | `16` | `true` | `≥ 16` chars **+** as 4 classes | !!! warning "O piso é fonte única da verdade" Os schemas de request (`SignupSchema`, `PasswordResetConfirmSchema`) **não** impõem limite próprio de comprimento — eles delegam pra essas duas vars. Baixar `AUTH_PASSWORD_MIN_LENGTH` pra `4` realmente afrouxa a validação na rota também. Não há um segundo limite escondido no schema "te protegendo". ### Grupo 3 — Controle do fluxo de e-mail (`AuthSettings`) Decidem **se** e **como** o link chega ao usuário. Mapeiam direto nos [cinco modos de operação](#cinco-modos-de-operacao). | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `AUTH_AUTO_ACTIVATE` | `bool` | `false` | `true` = user nasce ativo, pula activation, signup devolve JWT pair direto (Modo D). **Nunca em produção.** | | `AUTH_RETURN_TOKEN_IN_RESPONSE` | `bool` | `false` | `true` = link de ativação/reset vai no corpo JSON em vez do e-mail (Modo C). | ### Grupo 4 — TTL dos tokens de conta (`AuthSettings`) Validade dos tokens **de uso único** (ativação / reset) — distintos dos JWT do grupo 1. | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `AUTH_ACTIVATION_TTL_SECONDS` | `int` (≥60) | `604800` | Validade do token de ativação (7 dias). | | `AUTH_PASSWORD_RESET_TTL_SECONDS` | `int` (≥60) | `3600` | Validade do token de reset (1 h). Curto é mais seguro. | ### Grupo 5 — URLs e templates de e-mail (`AuthSettings`) | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `AUTH_ACTIVATION_URL_TEMPLATE` | `str` | `http://localhost:3000/activate?token={token}` | URL que vai no e-mail; `{token}` é substituído. **Aponta pro frontend** (exceto no Modo E). | | `AUTH_PASSWORD_RESET_URL_TEMPLATE` | `str` | `http://localhost:3000/reset-password?token={token}` | Idem, para reset. | | `AUTH_ACTIVATION_TEMPLATE` | `str` | `activation.html` | Nome do arquivo Jinja2 do **HTML do e-mail** de ativação, resolvido no `template_dir` do `EmailUtils`. | | `AUTH_PASSWORD_RESET_TEMPLATE` | `str` | `password_reset.html` | Idem, para reset. | !!! warning "URL template ≠ Jinja2 template" `*_URL_TEMPLATE` é uma string `.format()` com `{token}` — é o **link**. `*_TEMPLATE` é o nome de um arquivo `.html` — é o **e-mail que embrulha o link**. Confundir os dois é o erro nº 1. Detalhe completo em [Anatomia de um e-mail](#anatomia-de-um-e-mail). ### Grupo 6 — Páginas renderizadas pelo backend (Modo E, `AuthSettings`) Só relevantes quando `AUTH_BACKEND_LINKS=true`. Veja o [Modo E](#cinco-modos-de-operacao) para o fluxo completo. | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `AUTH_BACKEND_LINKS` | `bool` | `false` | `true` = monta 3 endpoints HTML extras; o link do e-mail aponta pro **backend**, não pro frontend. | | `AUTH_LOGIN_URL` | `str \| None` | `None` | URL de login no botão "Ir pro login" das páginas de sucesso. `None` esconde o botão. | | `AUTH_ACTIVATION_SUCCESS_TEMPLATE` | `str` | `activation_success.html` | Página HTML de ativação OK. | | `AUTH_ACTIVATION_ERROR_TEMPLATE` | `str` | `activation_error.html` | Página HTML de ativação com erro. | | `AUTH_PASSWORD_RESET_FORM_TEMPLATE` | `str` | `password_reset_form.html` | Form HTML de nova senha. | | `AUTH_PASSWORD_RESET_SUCCESS_TEMPLATE` | `str` | `password_reset_success.html` | Página HTML de reset OK. | | `AUTH_PASSWORD_RESET_ERROR_TEMPLATE` | `str` | `password_reset_error.html` | Página HTML de reset com erro. | ### Grupo 7 — Idioma dos e-mails e páginas (`AuthSettings`) | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `AUTH_DEFAULT_LOCALE` | `str` | `pt-BR` | Idioma dos **e-mails** e **páginas HTML** bundled. Aceita `pt-BR` e `en-US` (normalizado: `PT-BR`, `pt_br`, `ptbr` → `pt-BR`). | Tem uma seção inteira só pra isso, explicada bem devagar: [Idioma dos e-mails e páginas (i18n)](#idioma-dos-e-mails-e-paginas-i18n). ### Grupo 8 — Entrega de token: bearer / cookie / both (`AuthSettings`) *(v0.87.0+)* | Env var | Tipo | Default | O que faz | |---------|------|---------|-----------| | `AUTH_TOKEN_DELIVERY` | `"bearer" \| "cookie" \| "both"` | `bearer` | Como o login/refresh devolvem o par JWT. Veja [Entrega de token](#entrega-de-token). | | `AUTH_COOKIE_SECURE` | `bool` | `true` | Marca os cookies como `Secure` (só trafegam via HTTPS). **Desligue só em HTTP puro** — senão o browser descarta o cookie. | | `AUTH_COOKIE_SAMESITE` | `"lax" \| "strict" \| "none"` | `lax` | SPA cross-site precisa `none` (+ `Secure=true`). | | `AUTH_COOKIE_DOMAIN` | `str \| None` | `None` | `Domain` do cookie. `None` = host exato. Use `.example.com` pra compartilhar entre subdomínios. | | `AUTH_ACCESS_COOKIE_NAME` | `str` | `access_token` | Nome do cookie do access token. | | `AUTH_REFRESH_COOKIE_NAME` | `str` | `refresh_token` | Nome do cookie do refresh token (escopado ao path do endpoint de refresh). | !!! note "MFA / TOTP tem suas próprias vars" Quando `AUTH_MFA_ENABLED=true`, o `AuthSettings` ainda expõe `AUTH_MFA_ISSUER`, `AUTH_MFA_RECOVERY_CODES_COUNT`, `AUTH_MFA_TOKEN_TTL_SECONDS` e `AUTH_MFA_VERIFY_WINDOW`. Ficam fora do escopo desta receita (signup/activate/login/reset) — são cobertos na receita de MFA. --- ## Anatomia de um e-mail Três conceitos diferentes que parecem o mesmo. Eis o que cada um faz, exatamente uma vez, em pseudo-código: ```text 1. SDK gera um token opaco aleatório (string de 64 chars). 2. AUTH_ACTIVATION_URL_TEMPLATE.format(token=…) → link com o token embutido. 3. Renderiza AUTH_ACTIVATION_TEMPLATE (Jinja2 HTML) passando { user, activation_url, expires_at, expires_at_str }. 4. EmailUtils.send(to=user.email, subject=..., html=). ``` Em prosa: - **Token opaco** — string aleatória que o SDK gera, hasheia (SHA-256) e grava na tabela `user_tokens`. O plaintext sai pelo e-mail **uma única vez**; o banco só guarda o hash. - **URL template** (`AUTH_ACTIVATION_URL_TEMPLATE`) — formato literal pra montar a URL que vai pro usuário clicar. **Aponta pro frontend, não pro backend.** O frontend recebe `?token=…`, capta da query string e chama `POST /auth/activate/{token}` no backend. - **Jinja2 template** (`AUTH_ACTIVATION_TEMPLATE`) — nome do arquivo HTML dentro do `template_dir` do `EmailUtils`. É **o HTML do e-mail**, não a URL. Recebe o contexto `{ user, activation_url, expires_at, expires_at_str }` e renderiza o markup final. Use `{{ expires_at_str }}` no template — é a data de expiração já formatada e curta (ex.: `21/06/2026 23:25 (UTC)`, sem segundos); `expires_at` continua disponível como `datetime` cru se você quiser formatar do seu jeito. !!! warning "URL template ≠ Jinja2 template" `AUTH_ACTIVATION_URL_TEMPLATE` é uma string Python `.format()`-style — só tem o placeholder `{token}`. **Não confunda** com o arquivo `.html` que o Jinja2 renderiza. A URL formatada **é injetada como variável** no contexto do Jinja2 sob o nome `activation_url`, e o template HTML embrulha ela num botão. Fluxo visual: ```mermaid sequenceDiagram participant U as Usuário participant F as Frontend participant API as Backend (SDK) participant E as SMTP participant DB as Banco U->>F: preenche email + senha F->>API: POST /auth/signup API->>DB: INSERT user (is_active=false) + INSERT token (hash, TTL) API->>API: token plaintext + AUTH_ACTIVATION_URL_TEMPLATE.format(token=…) API->>API: render Jinja2 (user, activation_url, expires_at) alt AUTH_RETURN_TOKEN_IN_RESPONSE=false API->>E: SMTP send (HTML renderizado) API->>F: 201 + {message: "check your email"} else AUTH_RETURN_TOKEN_IN_RESPONSE=true API->>F: 201 + {activation_url: "https://app/activate?token=..."} end Note right of F: usuário (ou dev) abre a URL F->>API: POST /auth/activate/{token} API->>DB: hash(token) match? expirou? já usou? API->>F: 200 + JWT pair ``` --- ## Idioma dos e-mails e páginas (i18n) Desde a **v0.59.0**, os e-mails e as páginas HTML que o SDK já traz prontos falam **dois idiomas**: 🇧🇷 **Português do Brasil (`pt-BR`)** — que é o **padrão** — e 🇺🇸 **Inglês americano (`en-US`)**. Você não precisa criar nenhum template pra isso funcionar. 🚀 ### A regra de ouro (decore só isso) Existem **duas coisas diferentes** que escolhem o idioma, e elas funcionam de jeitos diferentes. Preste atenção: | O que | Como o idioma é escolhido | |-------|---------------------------| | **E-mails** (ativação, reset) | **Sempre** usam `AUTH_DEFAULT_LOCALE`. Ponto final. | | **Páginas HTML** (Modo E, backend) | Usam o `Accept-Language` do **navegador** do usuário; se o navegador não disser nada, caem no `AUTH_DEFAULT_LOCALE`. | !!! info "Por que o e-mail não negocia idioma?" Quando o SDK **monta** o e-mail, não existe nenhum navegador pedindo nada — é um processo de servidor mandando uma mensagem. Não tem de onde "adivinhar" o idioma. Por isso o e-mail é sempre fixo no `AUTH_DEFAULT_LOCALE`. Já a **página HTML** é aberta por um navegador de verdade, que manda o cabeçalho `Accept-Language` dizendo "eu prefiro português" — aí dá pra respeitar a preferência da pessoa. ### Passo 1 — escolher o idioma padrão Só uma variável de ambiente. É isso: ```env # .env AUTH_DEFAULT_LOCALE=pt-BR # padrão — pode até omitir ``` Quer tudo em inglês? Troque por: ```env AUTH_DEFAULT_LOCALE=en-US ``` !!! tip "Não precisa acertar a caixa/formato exato" O valor é normalizado pra você. Tudo isso vira `pt-BR`: `pt-BR`, `PT-BR`, `pt_br`, `ptbr`, `pt`. E tudo isso vira `en-US`: `en-US`, `EN_us`, `enus`, `en`. Se você digitar algo que o SDK não conhece (tipo `klingon`), ele cai no padrão `pt-BR` em vez de quebrar. ### Passo 2 — (opcional) deixar a página HTML seguir o navegador Isso **já vem ligado de graça** no Modo E (`AUTH_BACKEND_LINKS=true`). Você não faz nada. Quando o usuário clica no link do e-mail e o navegador dele está em português, ele vê a página em português; se estiver em inglês, vê em inglês. Se o navegador não mandar `Accept-Language`, a página usa o `AUTH_DEFAULT_LOCALE`. ```text Navegador em pt-BR → Accept-Language: pt-BR → página em Português Navegador em en-US → Accept-Language: en-US → página em Inglês Navegador sem header → cai no AUTH_DEFAULT_LOCALE ``` ### Passo 3 — (opcional) traduzir/customizar você mesmo Os templates bundled ficam em subpastas por idioma (`pt-BR/`, `en-US/`). Pra trocar **só** o texto/visual de um idioma, crie um arquivo de mesmo nome na subpasta certa do seu `template_dir` (ex.: `template_dir/pt-BR/activation_success.html`). A ordem de busca completa está na dica "Override por idioma" mais abaixo, no **Modo E**. ### Bônus — data de expiração curta e legível Antes, o e-mail mostrava a expiração crua e feia assim: ```text This link expires at 2026-06-21 23:25:49.742054+00:00 ``` Agora o SDK injeta no template a variável `expires_at_str`, já formatada e **sem segundos**, no formato do idioma: | Idioma | Como aparece | |--------|--------------| | `pt-BR` | `21/06/2026 23:25 (UTC)` | | `en-US` | `2026-06-21 23:25 (UTC)` | Nos seus templates custom, use `{{ expires_at_str }}` (curto e bonito). Se quiser formatar do seu jeito, o `datetime` cru ainda está disponível em `{{ expires_at }}`. !!! check "Recapitulando" - **Uma variável** manda no idioma dos e-mails: `AUTH_DEFAULT_LOCALE`. - **Páginas HTML** seguem o navegador (Accept-Language) e caem no `AUTH_DEFAULT_LOCALE` quando não há header. - **Padrão é `pt-BR`.** Coloque `en-US` se quiser inglês. - Use `{{ expires_at_str }}` pra mostrar a expiração sem segundos. --- ## Cinco modos de operação | Modo | Quando usar | Flags | Onde o link aparece | |------|-------------|-------|--------------------| | **A. Produção (SPA)** | SaaS público, e-mail real, frontend SPA dono das páginas | `AUTH_AUTO_ACTIVATE=false`
`AUTH_RETURN_TOKEN_IN_RESPONSE=false`
`AUTH_BACKEND_LINKS=false`
SMTP real (Mailgun, SES, Postmark…) | Inbox real → frontend processa o token | | **B. Dev local com SMTP fake** | Desenvolvimento diário sem mandar e-mail real | `AUTH_AUTO_ACTIVATE=false`
`AUTH_RETURN_TOKEN_IN_RESPONSE=false`
SMTP apontando pra Mailhog (`localhost:1025`) ou smtp4dev (`localhost:2525`) | UI web do Mailhog/smtp4dev em `localhost:8025` / `localhost:5000` | | **C. Dev sem SMTP** | Validação rápida sem subir nenhum container de e-mail | `AUTH_AUTO_ACTIVATE=false`
`AUTH_RETURN_TOKEN_IN_RESPONSE=true`
`email=None` ou SMTP inválido | Body da resposta HTTP do signup | | **D. CI / testes** | Suite de testes que não exercita activation | `AUTH_AUTO_ACTIVATE=true` | Nenhum — signup já devolve JWT pair | | **E. Backend-only** *(v0.32.0+)* | Quer 100% de controle no backend — sem responsabilidade no frontend. Ideal pra APIs sem SPA, MVPs, intranets. | `AUTH_BACKEND_LINKS=true`
URL templates apontam pro **backend** (`https://api.example.com/auth/activate/{token}`)
`AUTH_LOGIN_URL=https://app.example.com/login` (opcional, mostra botão "Ir pro login" nas páginas HTML) | Backend renderiza HTML success/error direto — usuário só clica no link do e-mail | ### Modo A — produção ```bash AUTH_AUTO_ACTIVATE=false AUTH_RETURN_TOKEN_IN_RESPONSE=false SMTP_HOST=smtp.mailgun.org SMTP_PORT=587 SMTP_USERNAME=postmaster@mg.example.com SMTP_PASSWORD=... # secret, não commitar SMTP_FROM_ADDR=noreply@example.com AUTH_ACTIVATION_URL_TEMPLATE=https://app.example.com/activate?token={token} AUTH_PASSWORD_RESET_URL_TEMPLATE=https://app.example.com/reset?token={token} ``` Fluxo: signup → e-mail real chega no inbox → usuário clica → frontend chama `POST /auth/activate/{token}` → login. ### Modo B — dev com SMTP local (Mailhog ou smtp4dev) Mesmo `.env` do modo A, mas apontando o SMTP para um container local que **intercepta** os e-mails em vez de mandar de verdade. **Use este modo no dia-a-dia** — o fluxo é idêntico ao de produção, então você pega bugs de template, encoding, charset, etc. ao mesmo tempo que evita spammar e-mail real. ```bash # .env.dev AUTH_AUTO_ACTIVATE=false AUTH_RETURN_TOKEN_IN_RESPONSE=false SMTP_HOST=localhost SMTP_PORT=1025 # Mailhog SMTP padrão SMTP_USERNAME= # vazio — Mailhog não autentica SMTP_PASSWORD= SMTP_FROM_ADDR=dev@local AUTH_ACTIVATION_URL_TEMPLATE=http://localhost:5173/activate?token={token} AUTH_PASSWORD_RESET_URL_TEMPLATE=http://localhost:5173/reset?token={token} ``` Abra `http://localhost:8025` (Mailhog) ou `http://localhost:5000` (smtp4dev) pra ver os e-mails interceptados. Veja a seção **[Mailhog vs smtp4dev](#mailhog-vs-smtp4dev)** abaixo. ### Modo C — dev sem SMTP (link no body) Sem container de SMTP nenhum. O signup devolve o link de ativação no JSON da resposta: ```bash AUTH_AUTO_ACTIVATE=false AUTH_RETURN_TOKEN_IN_RESPONSE=true AUTH_ACTIVATION_URL_TEMPLATE=http://localhost:5173/activate?token={token} ``` Request: ```bash curl -X POST localhost:8000/auth/signup \ -H 'content-type: application/json' \ -d '{"email":"dev@local","password":"abcdefghijkl","name":"Dev"}' ``` Resposta (formato real do `SignupResponseSchema`): ```json { "user_id": "0193e9ea-7c4b-7c8e-bc05-2a3a8d9f7e10", "activation_required": true, "activation_url": "http://localhost:5173/activate?token=aBcD...xYz", "access_token": null, "refresh_token": null } ``` Cole a URL no navegador / curl pra exercitar `POST /auth/activate/{token}`. ### Modo D — CI / testes (skip total) ```bash AUTH_AUTO_ACTIVATE=true ``` Signup pula ativação inteira e devolve `{access_token, refresh_token}` direto. Use **só em testes** ou quando o produto for interno e cada usuário já é confiável. ### Modo E — backend-only (v0.32.0+) Quando você prefere que **toda** a experiência do link aconteça no backend, sem nenhuma página no frontend, ative `AUTH_BACKEND_LINKS=True`. O router passa a montar **três endpoints HTML** adicionais — `GET /auth/activate/{token}`, `GET /auth/password-reset/{token}` e `POST /auth/password-reset/{token}` (form-encoded). O e-mail aponta o usuário direto pra esses endpoints; o backend ativa a conta / processa o reset / renderiza HTML success ou error — usando templates Jinja2 bundled que você pode shadowar. ```bash # .env — Modo E (backend-only) AUTH_BACKEND_LINKS=true AUTH_AUTO_ACTIVATE=false AUTH_RETURN_TOKEN_IN_RESPONSE=false # IMPORTANTE: URL templates apontam pro BACKEND, não pro frontend. AUTH_ACTIVATION_URL_TEMPLATE=https://api.example.com/auth/activate/{token} AUTH_PASSWORD_RESET_URL_TEMPLATE=https://api.example.com/auth/password-reset/{token} # Opcional: URL do seu login. Quando setado, aparece um botão "Ir pro login" # nas páginas de success/error renderizadas pelo backend. Quando null, # o botão é omitido (puro server-side, zero acoplamento com frontend). AUTH_LOGIN_URL=https://app.example.com/login SMTP_HOST=smtp.mailgun.org SMTP_PORT=587 SMTP_FROM_ADDR=noreply@example.com ``` !!! danger "Link dá 404? Alinhe o template com o prefixo onde o router está montado" Os endpoints acima são **relativos ao ponto onde você monta o `make_auth_router`**. Se você inclui o router com um prefixo — comum pra separar rotas de negócio: ```python app.include_router(make_auth_router(...), prefix="/api") ``` então a rota real de ativação vira `GET /api/auth/activate/{token}`, **não** `/auth/activate/{token}`. Mas o `AUTH_ACTIVATION_URL_TEMPLATE` é uma string literal — ele **não** sabe do prefixo. Se o template apontar pra `.../auth/activate/{token}` (sem `/api`), o link do e-mail bate numa rota inexistente e responde **404**, mesmo com o signup retornando `201`. ```bash # ❌ router montado com prefix="/api", mas o template não tem /api → 404 AUTH_ACTIVATION_URL_TEMPLATE=https://api.example.com/auth/activate/{token} # ✅ template alinhado com o prefixo real de montagem AUTH_ACTIVATION_URL_TEMPLATE=https://api.example.com/api/auth/activate/{token} ``` Duas checagens ao configurar o Modo E: **(1)** o host é o **domínio público** do backend (nunca `localhost` — o link roda no browser do usuário, não no servidor); **(2)** o path inclui **todo prefixo** com que você montou o router. Fluxo: ```mermaid sequenceDiagram participant U as Usuário participant E as Inbox participant API as Backend participant DB as Banco U->>API: POST /auth/signup API->>DB: INSERT user (is_active=false) + token (hash, TTL) API->>E: e-mail com link https://api.example.com/auth/activate/{token} U->>E: clica no link E->>API: GET /auth/activate/{token} API->>DB: hash(token) válido? não-usado? não-expirado? alt token válido API->>DB: is_active=true + token.used_at=now API->>U: HTML activation_success.html (botão "Ir pro login" se AUTH_LOGIN_URL setado) else token inválido / expirado API->>U: HTML activation_error.html (HTTP 400) end ``` Password reset segue padrão similar: GET renderiza form HTML; POST (form-encoded) consome o token e renderiza success/error. **Templates HTML bundled (override droppando o mesmo nome no `template_dir`):** | Template | Endpoint que renderiza | Variáveis Jinja2 disponíveis | |----------|------------------------|------------------------------| | `activation_success.html` | `GET /auth/activate/{token}` (sucesso) | `user`, `login_url` | | `activation_error.html` | `GET /auth/activate/{token}` (falha) | `reason`, `login_url` | | `password_reset_form.html` | `GET /auth/password-reset/{token}` | `user`, `form_action`, `min_length`, `error`, `login_url` | | `password_reset_success.html` | `POST /auth/password-reset/{token}` (sucesso) | `user`, `login_url` | | `password_reset_error.html` | `POST /auth/password-reset/{token}` (token inválido) | `reason`, `login_url` | **Pra override:** passe `template_dir` no `make_auth_router` e crie arquivos de mesmo nome. ```python app.include_router( make_auth_router( auth_service, session_factory=db.session_dependency, template_dir="src/templates/auth", # opcional ), ) ``` !!! tip "Override por idioma (desde v0.59.0)" Os templates bundled agora moram em **subpastas por idioma** (`pt-BR/` e `en-US/`). Você tem duas formas de override, e o SDK procura nesta ordem (a primeira que existir vence): 1. `template_dir//activation_success.html` — override **só daquele idioma** (ex.: `src/templates/auth/pt-BR/...`). 2. `template_dir/activation_success.html` — override **flat**, vale pra todos os idiomas (compatível com versões anteriores à 0.59.0; continua funcionando sem mudar nada). Ou seja: se você já tinha templates flat, **não precisa mexer em nada**. Se quiser um visual diferente por idioma, crie a subpasta. **Trade-offs do Modo E:** - ✅ **Zero dependência do frontend** — backend é fonte única da verdade do fluxo de auth. - ✅ **MVP em minutos** — sem precisar criar rotas SPA pra processar tokens. - ✅ **Funciona em projetos sem frontend** — APIs públicas, intranets, ferramentas internas. - ⚠️ **JWT não é entregue automaticamente** — após ativação, o usuário precisa fazer login manualmente (clicando em "Ir pro login" e usando as credenciais). Por design: zero leak de token em URL, history, ou server logs. - ⚠️ **Requer `[email]` extra** (Jinja2) pra renderizar as páginas HTML — mesma dependência do template de e-mail. - ⚠️ **CSRF na form de reset** — o form HTML usa POST tradicional sem token CSRF. Aceita a request porque o token de reset é one-shot + TTL curto + bound a um user específico, mas considere acoplar `CSRFMiddleware` se atacantes conseguirem prever URLs ativas. Os endpoints **JSON** (`POST /auth/activate/{token}`, `POST /auth/password-reset/confirm`) continuam montados — você pode usar Modo E + manter SPA endpoints ao mesmo tempo. --- ## Entrega de token *(v0.87.0+)* Por padrão o login devolve `access_token` / `refresh_token` **no corpo** da resposta e o cliente reenvia como `Authorization: Bearer `. É ótimo pra apps mobile/API, mas um SPA no browser precisa guardar o token em algum lugar acessível por JS — exposto a XSS. O `AUTH_TOKEN_DELIVERY` deixa você escolher. | Modo | O que muda | Pra quem | |------|------------|----------| | `bearer` *(default)* | Tokens **só no body**. Comportamento histórico, retrocompatível. | Mobile, APIs, clientes que mandam `Authorization`. | | `cookie` | Tokens setados como cookies **`HttpOnly`** nos mesmos paths (`/auth/login`, `/auth/refresh`, `/auth/logout`); o body volta com os tokens `null`. | SPA no browser — o token nunca fica visível pro JS (defesa contra XSS). | | `both` | Endpoints bearer ficam em `/auth/*` **e** um conjunto cookie paralelo é montado em `/auth/cookie/*`. | Um backend que serve web (cookie) **e** mobile (bearer) ao mesmo tempo. | !!! danger "Cookie `Secure` exige HTTPS" Com `AUTH_COOKIE_SECURE=true` (default) o browser **só** reenvia o cookie por HTTPS. Servindo o backend em **HTTP puro**, o cookie é descartado e a sessão nunca persiste (login parece funcionar mas nada fica logado). Em produção, ponha TLS na frente e mantenha `true`; em HTTP local de dev, `AUTH_COOKIE_SECURE=false`. ### Modo cookie ```bash # .env AUTH_TOKEN_DELIVERY=cookie AUTH_COOKIE_SECURE=true # false só em HTTP puro AUTH_COOKIE_SAMESITE=lax # "none" (+Secure) se o SPA for cross-site ``` ```python app.include_router(make_auth_router(auth_service, session_factory=db.session_dependency)) ``` Fluxo no frontend — **não guarda token nenhum**, só chama os endpoints com `credentials: "include"`: ```javascript // login: o browser grava os cookies HttpOnly sozinho await fetch("/auth/login", { method: "POST", credentials: "include", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ email, password }), }); // requests autenticadas: o cookie access viaja automático await fetch("/api/me", { credentials: "include" }); // renovar sessão: o cookie refresh é lido no backend, sem body await fetch("/auth/refresh", { method: "POST", credentials: "include" }); // logout: limpa os cookies (e revoga o refresh se houver refresh_token_model) await fetch("/auth/logout", { method: "POST", credentials: "include" }); ``` A dependency de `current_user` (veja [Pegando o `current_user`](#pegando-o-current_user-da-requisicao)) passa a **ler o access token do cookie** automaticamente quando a entrega envolve cookie — o header `Authorization` ainda tem prioridade se vier. ### Modo both ```bash AUTH_TOKEN_DELIVERY=both ``` Monta os dois conjuntos, sem colisão de rota: ```text # bearer (body): POST /auth/login POST /auth/refresh POST /auth/logout # cookie (HttpOnly): POST /auth/cookie/login POST /auth/cookie/refresh POST /auth/cookie/logout ``` !!! info "O que continua no body" A entrega cookie cobre o ciclo **login / refresh / logout**. Ativação (`POST /auth/activate/{token}`), auto-login no signup (`AUTH_AUTO_ACTIVATE`) e o `POST /auth/mfa/verify` ainda retornam o par JWT no body, independente do `AUTH_TOKEN_DELIVERY`. !!! tip "CORS com credenciais" Pra um SPA em outra origem enviar/receber cookies, o backend precisa de `allow_credentials=True` no CORS **e** `AUTH_COOKIE_SAMESITE=none` + `AUTH_COOKIE_SECURE=true` (portanto HTTPS). Same-origin (frontend servido do mesmo domínio da API) roda com o default `lax`. --- ## Mailhog vs smtp4dev Os dois interceptam SMTP local e renderizam os e-mails numa UI web. Diferenças relevantes: | Aspecto | Mailhog | smtp4dev | |---------|---------|----------| | Imagem Docker | `mailhog/mailhog:latest` | `rnwood/smtp4dev:latest` | | Porta SMTP padrão | `1025` | `2525` (configurável) | | Porta da UI | `8025` | `5000` | | Tamanho da imagem | ~10 MB | ~120 MB (.NET) | | Multi-conta / multi-inbox | não — uma única caixa | sim — filtra por destinatário | | API HTTP / REST | sim (`/api/v2/messages`) | sim (Swagger built-in) | | Validação de DKIM / SPF | não | sim | | Manutenção upstream | arquivado em 2020, ainda funciona | ativa | **Sugestão:** comece com Mailhog (mais leve, zero-config) e migre pra smtp4dev quando precisar de multi-inbox ou inspeção de DKIM. Para o ciclo signup → activate → reset, **Mailhog é suficiente**. ### `docker-compose.yaml` — Mailhog ```yaml services: mailhog: image: mailhog/mailhog:latest container_name: mailhog ports: - "1025:1025" # SMTP — aponte SMTP_HOST aqui - "8025:8025" # UI web ``` `SMTP_PORT=1025`, abra `http://localhost:8025`. ### `docker-compose.yaml` — smtp4dev ```yaml services: smtp4dev: image: rnwood/smtp4dev:latest container_name: smtp4dev ports: - "2525:25" # SMTP — aponte SMTP_HOST aqui - "5000:80" # UI web environment: - ServerOptions__HostName=smtp4dev ``` `SMTP_PORT=2525`, abra `http://localhost:5000`. !!! tip "Já tem `tempest generate --docker`?" Em v0.32+ o gerador de docker-compose vai aceitar `--with mailhog` como atalho. Hoje (v0.31.x) você cola um dos blocos acima no `docker-compose.yaml` gerado pela CLI. --- ## Customizando templates O SDK ship dois templates Jinja2 bundled (`activation.html` + `password_reset.html`) — HTML responsivo, inline styles, mobile-friendly. Você nunca precisa mexer neles pra um MVP funcionar. Quando quiser branding próprio, basta criar um arquivo com o **mesmo nome** dentro do `template_dir` que você passou no `EmailUtils`: ```text emails/ # ← template_dir="emails" ├── activation.html # override do default do SDK └── password_reset.html # override do default do SDK ``` `EmailUtils` usa um `ChoiceLoader` interno do Jinja2 que procura **primeiro** no seu diretório e **só cai** no template bundled se não achar. Você pode sobrescrever um, o outro, ou ambos — sem precisar copiar o template inteiro. ### Variáveis disponíveis no contexto Jinja2 | Variável | Tipo | Em quais templates | Exemplo | |----------|------|--------------------|---------| | `user` | instância de `UserModel` | ambos | `{{ user.email }}`, `{{ user.name }}` (quando seu modelo expõe a coluna) | | `activation_url` | `str` | `activation.html` | `https://app.example.com/activate?token=aBcD...xYz` | | `reset_url` | `str` | `password_reset.html` | `https://app.example.com/reset?token=aBcD...xYz` | | `expires_at` | `datetime` (UTC, timezone-aware) | ambos | o valor cru, se quiser formatar do seu jeito | | `expires_at_str` | `str` | ambos | **recomendado** — já formatado e curto, sem segundos: `21/06/2026 23:25 (UTC)` | !!! tip "Prefira `expires_at_str`" Use `{{ expires_at_str }}` em vez de `{{ expires_at }}` — os templates bundled fazem isso. Ele já vem localizado (conforme `AUTH_DEFAULT_LOCALE`) e sem os segundos/microssegundos barulhentos. O `expires_at` cru continua disponível se você precisar de um formato próprio. ### Exemplo: `emails/activation.html` enxuto ```html

Bem-vindo(a){% if user.name %}, {{ user.name }}{% endif %}!

Para ativar sua conta, clique no botão abaixo:

Ativar conta

Link válido até {{ expires_at_str }}. Se você não criou esta conta, ignore este e-mail.

``` !!! note "O Jinja2 só roda quando há e-mail real" Nos modos C (`AUTH_RETURN_TOKEN_IN_RESPONSE=true`) e D (`AUTH_AUTO_ACTIVATE=true`) o template Jinja2 **não é renderizado** — o link sai cru no JSON, sem HTML. Só os modos A e B (e-mail SMTP real ou interceptado) exercitam o template. --- ## Segurança - **Token armazenado como hash SHA-256.** O plaintext sai pelo e-mail uma única vez; o banco nunca tem como reproduzir o token original. Vazamento da tabela `user_tokens` **não** permite ativação retroativa. - **One-shot.** `used_at` é carimbado no consume; replay rejeitado com `UnauthorizedException`. - **TTL-bounded.** `expires_at` calculado a partir de `AUTH_ACTIVATION_TTL_SECONDS` / `AUTH_PASSWORD_RESET_TTL_SECONDS`. Tokens expirados rejeitados. - **Anti-enumeração.** `POST /auth/password-reset/request` retorna sempre HTTP 202 + corpo genérico, independente de o e-mail existir ou não. `POST /auth/login` levanta a mesma `UnauthorizedException` para email-errado vs senha-errada. - **Password floor aplicado duas vezes.** `SignupSchema` valida no input; `UserAuthService` revalida antes do hash — defesa em profundidade caso alguém bypasse o schema. --- ## Pegando o `current_user` da requisição `make_auth_router` **emite** o JWT pair (login/activate devolvem `access_token` + `refresh_token`). Mas e depois? Quando o frontend manda `Authorization: Bearer ` nas **suas próprias** rotas, você precisa de uma dependency que decodifica o token e resolve o usuário. Desde a v0.49.0, o próprio `UserAuthService` constrói essa dependency — `current_user_dependency()`. Ela: 1. Lê `Authorization: Bearer ` via `HTTPBearer`. 2. Decodifica e verifica o JWT com **o mesmo `JWTUtils` que o service usou pra assinar** — sem segundo segredo pra manter sincronizado. 3. Pega o `sub` (id do usuário) do payload, abre uma sessão a partir do `db=` e devolve o `UserModel` persistido. ### 1. Declare a dependency uma vez O service já tem `user_model`, `JWTUtils` e a sessão — então não precisa escrever `load_user` à mão. Junte as duas variantes no `src/api/dependencies/auth.py`: ```python # src/api/dependencies/auth.py from src.api.app import auth_service get_current_user = auth_service.current_user_dependency() get_current_user_or_none = auth_service.current_user_dependency(soft=True) ``` !!! info "Requer `db=` no `UserAuthService`" `current_user_dependency` resolve o usuário abrindo a própria sessão, então o service precisa ter sido criado com `db=` (o `AsyncDatabaseManager` do [Setup mínimo](#setup-minimo)). Como reusa o `self.jwt` interno, o token é validado com o **mesmo** segredo que assinou — o footgun de `JWT_SECRET` divergente some. ??? note "Sem `UserAuthService`? Monte a dependency na mão" Se o seu serviço não usa o flow bundled, a primitiva `make_jwt_user_dependency` aceita qualquer `JWTUtils` + um `user_loader` async de um argumento: ```python from uuid import UUID from tempest_fastapi_sdk import JWTUtils, make_jwt_user_dependency from src.api.app import db from src.core.settings import settings from src.db.models import UserModel from src.db.repositories import UserRepository tokens: JWTUtils = JWTUtils( secret=settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM, ) async def load_user(subject: str) -> UserModel: """Resolve o subject do JWT (uma string UUID) para o usuário persistido.""" async with db.get_session_context() as session: repo: UserRepository = UserRepository(session) return await repo.get_by_id(UUID(subject)) get_current_user = make_jwt_user_dependency(tokens, load_user) get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True) ``` Atenção: aqui o `tokens` **precisa** usar o mesmo `JWT_SECRET` / `JWT_ALGORITHM` do login, senão todo token válido é rejeitado. ### 2. Injete na rota com `Depends` ```python # src/api/routers/users.py from fastapi import APIRouter, Depends from src.api.dependencies.auth import get_current_user from src.db.models import UserModel from src.schemas import UserResponseSchema router: APIRouter = APIRouter(prefix="/users", tags=["users"]) @router.get("/me") async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema: """Devolve o usuário dono do bearer token da requisição.""" return UserResponseSchema.model_validate(current) ``` `current` **é** o `UserModel` que o service resolveu — tipado, persistido, pronto pra uso. Token ausente ou inválido → `401 UnauthorizedException` antes do corpo da rota rodar. ### 3. Auth opcional — `soft=True` Para rotas que funcionam logado **e** anônimo (ex.: feed público que personaliza se houver login), use a variante `soft` — ela devolve `None` em vez de levantar: ```python @router.get("/feed") async def feed( current: UserModel | None = Depends(get_current_user_or_none), ) -> list[PostResponseSchema]: """Feed público; personaliza o ranking quando há usuário logado.""" if current is None: return await feed_service.public() return await feed_service.personalized(current.id) ``` !!! tip "Role e permission são o próximo passo" Quando a rota precisa de **papel** (`admin`) ou **permissão** (`users:write`) e não só "estar logado", troque por `make_role_dependency` / `make_permission_dependency`. Veja a [receita HTTP »](http.md) — mesma `JWTUtils`, mesmo padrão de `Depends`. ### 4. Guards imperativos — checagem dentro do service / controller As deps acima gateiam a **rota** (antes do handler rodar). Mas e quando você já tem o user em mãos, mais fundo na pilha (service, controller), e quer só **assertar** uma condição antes de continuar? Desde a v0.50.0 o SDK traz três guards prontos — sem reescrever `if user is None: raise ...` em todo serviço: ```python from tempest_fastapi_sdk import ( require_active, require_admin, require_authenticated, ) ``` | Guard | Levanta se | Status HTTP | |-------|------------|-------------| | `require_authenticated(user)` | `user is None` | 401 `UnauthorizedException` | | `require_active(user)` | `None`, ou `not user.is_active` | 401 / 403 `ForbiddenException` | | `require_admin(user)` | `None`, ou `not user.is_admin` | 401 / 403 `ForbiddenException` | O detalhe que importa: cada um **devolve o user já estreitado** — sem `None` e com o tipo concreto preservado — então o resto da função para de ver `| None`: ```python class ReportService: async def delete_all(self, current: UserModel | None) -> None: """Só admin apaga relatórios.""" admin: UserModel = require_admin(current) # 401/403, ou devolve tipado await self.repository.purge(by=admin.id) # `admin` não é mais `| None` ``` Combina direto com o `current_user_dependency(soft=True)`: a rota passa `UserModel | None`, e o guard decide no service. !!! tip "Já tem o `auth_service`? Use os mirrors estáticos" Os mesmos guards existem como staticmethods em `UserAuthService` — `auth_service.require_admin(current)` — pra quando você já injeta o service e não quer um import extra. Mesma semântica, mesma exceção. --- ## Próximos passos - **[Idempotência »](idempotency.md)** — proteja `POST /auth/signup` de retentativas que duplicariam linha. - **[Storage MinIO/S3 »](storage.md)** — anexar avatar / foto de perfil já no signup. - **[Logging »](logging.md)** — `request_id` já propaga automaticamente em cada log emitido durante o flow. - **[Métricas »](metrics.md)** — `PrometheusMiddleware` conta `/auth/*` separadamente sem config extra. --- # MFA / 2FA com TOTP (Authenticator) Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/mfa/ # MFA / 2FA com TOTP (Authenticator) Desde **v0.35.0** o fluxo de auth bundled suporta **autenticação de dois fatores** com apps Authenticator (Google Authenticator, 1Password, Authy, etc.) seguindo o padrão **TOTP (RFC 6238)**. Você ganha quatro endpoints prontos, códigos de recuperação single-use, e um login de dois passos — tudo controlado por um kill-switch global. ## Conteúdo da receita 1. **[Como funciona em 30 segundos](#como-funciona)** — o modelo mental do fluxo de dois passos. 2. **[Setup](#setup)** — extra `[mfa]`, colunas novas no `UserModel`, tabela de recovery codes. 3. **[Wiring](#wiring)** — passar `recovery_code_model` pro `make_auth_router`. 4. **[Os quatro endpoints](#endpoints)** — enroll / confirm / verify / disable. 5. **[Login de dois passos](#login-de-dois-passos)** — como `POST /auth/login` muda quando MFA está ativo. 6. **[Settings (`AuthSettings`)](#settings)** — flag a flag. 7. **[Usando só o `UserAuthService` (sem o router)](#service-direto)** — pra quem monta os próprios endpoints. 8. **[Segurança](#seguranca)**. 9. **[Próximos passos](#proximos-passos)**. --- ## Como funciona TOTP é o código de 6 dígitos que rola a cada 30 segundos no seu app Authenticator. O servidor e o app compartilham um **segredo** (gerado no enrollment); ambos derivam o mesmo código do relógio atual. Nenhum SMS, nenhuma rede — pura matemática local. O fluxo tem dois momentos: - **Enrollment (uma vez)** — o usuário logado pede um segredo, escaneia o QR code, e confirma digitando o primeiro código. A partir daí MFA está ativo. - **Login (toda vez)** — senha valida o passo 1, mas em vez do JWT pair o backend devolve um `mfa_token` curto. O usuário digita o código do Authenticator; o passo 2 troca `mfa_token` + código pelo JWT pair real. !!! info "Por que dois passos e não tudo de uma vez?" Separar mantém a senha e o segundo fator desacoplados. O `mfa_token` (TTL de 5 min por padrão) carrega só o `sub` do usuário — interceptá-lo sozinho não basta pra logar, porque ainda falta o código do Authenticator. --- ## Setup Requer o extra `[mfa]` (instala `pyotp`), por cima de `[auth]`: ```bash uv add "tempest-fastapi-sdk[auth,mfa]>=0.35.0" ``` ### Colunas via `MFAMixin` As colunas do MFA (`totp_secret`, `totp_enabled_at`) **não** moram no `BaseUserModel` — elas vêm de um mixin opt-in, `MFAMixin`. Misture-o no seu `UserModel` só quando for adotar MFA, assim projetos que nunca ligam a feature não carregam colunas mortas: ```python # src/db/models/user.py from tempest_fastapi_sdk import BaseUserModel, MFAMixin class UserModel(MFAMixin, BaseUserModel): """Concrete user table — MFAMixin adiciona totp_secret / totp_enabled_at.""" __tablename__ = "users" ``` !!! note "Ordem do MRO" O mixin vem **antes** do `BaseUserModel` na lista de bases — mesmo padrão de `AuditMixin` / `SoftDeleteMixin`. O mixin também expõe a property `is_mfa_active` (`totp_enabled_at is not None`). !!! warning "Migration obrigatória" `totp_secret` e `totp_enabled_at` são colunas novas. Rode `uv run tempest db revision -m "mfa columns"` + `uv run tempest db upgrade` antes de ligar a flag. ### Tabela de recovery codes Códigos de recuperação salvam o usuário que perdeu o celular. São **single-use**, mostrados **uma vez** no enrollment, e o banco guarda só o hash SHA-256 de cada um. `BaseUserRecoveryCodeModel` é abstrato — use o helper `make_user_recovery_code_model` pra criar a tabela concreta amarrada à sua tabela de users: ```python # src/db/models/__init__.py from tempest_fastapi_sdk import make_user_recovery_code_model from src.db.models.user import UserModel from src.db.models.user_token import UserTokenModel UserRecoveryCodeModel = make_user_recovery_code_model( user_table="users", tablename="user_recovery_codes", class_name="UserRecoveryCodeModel", ) __all__: list[str] = [ "UserModel", "UserTokenModel", "UserRecoveryCodeModel", ] ``` ??? note "Prefere subclassar à mão?" O helper é só açúcar. O equivalente explícito: ```python from uuid import UUID from sqlalchemy import ForeignKey from sqlalchemy.orm import Mapped, mapped_column from tempest_fastapi_sdk import BaseUserRecoveryCodeModel class UserRecoveryCodeModel(BaseUserRecoveryCodeModel): __tablename__ = "user_recovery_codes" user_id: Mapped[UUID] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True, ) ``` --- ## Wiring Ligue a flag `AUTH_MFA_ENABLED` e passe o `recovery_code_model` pro router. Sem o modelo, o router levanta `RuntimeError` no build — é uma trava proposital: ```python # src/api/app.py from tempest_fastapi_sdk import ( AsyncDatabaseManager, UserAuthService, make_auth_router, ) from src.core.settings import settings from src.db.models import UserModel, UserTokenModel, UserRecoveryCodeModel db = AsyncDatabaseManager(settings.DATABASE_URL) auth_service = UserAuthService( user_model=UserModel, token_model=UserTokenModel, auth_settings=settings, # mistura AuthSettings (AUTH_MFA_* abaixo) jwt_settings=settings, email=None, ) app.include_router( make_auth_router( auth_service, session_factory=db.session_dependency, recovery_code_model=UserRecoveryCodeModel, # obrigatório quando MFA on ), ) ``` !!! tip "Kill-switch global" Com `AUTH_MFA_ENABLED=False` (default), os endpoints `/auth/mfa/*` respondem `404` e o login ignora qualquer `totp_secret` persistido — útil pra desligar MFA na emergência de um Authenticator fora do ar, sem mexer no banco. --- ## Endpoints Os quatro só são montados quando `AUTH_MFA_ENABLED=True`: | Método | Path | Auth | Body / Output | Comportamento | |--------|------|------|---------------|---------------| | POST | `/auth/mfa/enroll` | Bearer JWT | — → `MFAEnrollResponseSchema` | Gera segredo + URI do QR + N recovery codes. **Mostrados só uma vez.** Não ativa MFA ainda. | | POST | `/auth/mfa/confirm` | Bearer JWT | `MFAConfirmSchema` | Confirma o enrollment com o primeiro código. A partir daqui MFA está ativo. | | POST | `/auth/mfa/verify` | — | `MFAVerifySchema` → `LoginResponseSchema` | Passo 2 do login: troca `mfa_token` + código pelo JWT pair. | | POST | `/auth/mfa/disable` | Bearer JWT | `MFADisableSchema` | Desliga MFA. Exige senha **e** código ativo (TOTP ou recovery). | ### Fluxo de enrollment ```python import httpx BASE = "http://localhost:8000" access = "" headers = {"Authorization": f"Bearer {access}"} # 1. Enroll — devolve segredo, URI do QR e os recovery codes (uma vez!) r = httpx.post(f"{BASE}/auth/mfa/enroll", headers=headers) data = r.json() print(data["provisioning_uri"]) # renderize como QR code print(data["recovery_codes"]) # mostre pro usuário salvar OFFLINE # 2. Usuário escaneia o QR no Authenticator e digita o código gerado: code = input("Código do Authenticator: ") httpx.post(f"{BASE}/auth/mfa/confirm", headers=headers, json={"code": code}) # 204 No Content → MFA ativo ``` !!! danger "Os recovery codes aparecem UMA vez" A resposta de `enroll` é a única vez que o `secret` e os `recovery_codes` saem em plaintext. Chamar `enroll` de novo **rotaciona** o segredo e **invalida** todos os códigos anteriores. Mostre-os com destaque e instrua o usuário a guardar offline. --- ## Login de dois passos Quando o usuário tem MFA ativo, `POST /auth/login` **não** devolve mais o JWT pair direto — devolve `mfa_required=True` + um `mfa_token` curto: ```python import httpx BASE = "http://localhost:8000" # Passo 1 — senha r1 = httpx.post( f"{BASE}/auth/login", json={"email": "ana@example.com", "password": "strong-pass-12-chars"}, ) body = r1.json() # { # "user_id": "...", # "access_token": null, # "refresh_token": null, # "mfa_required": true, # "mfa_token": "eyJhbGciOi..." # } # Passo 2 — código do Authenticator (ou um recovery code) code = input("Código do Authenticator: ") r2 = httpx.post( f"{BASE}/auth/mfa/verify", json={"mfa_token": body["mfa_token"], "code": code}, ) tokens = r2.json() # { "access_token": "...", "refresh_token": "...", "mfa_required": false } ``` Para usuários **sem** MFA (ou com o kill-switch desligado), `POST /auth/login` continua devolvendo o JWT pair direto, com `mfa_required=False` — o frontend simplesmente checa esse campo e ramifica. ```mermaid sequenceDiagram participant F as Frontend participant API as Backend (SDK) participant DB as Banco F->>API: POST /auth/login (email + senha) API->>DB: valida credenciais alt usuário tem MFA ativo API->>F: 200 {mfa_required: true, mfa_token: "..."} F->>API: POST /auth/mfa/verify (mfa_token + código) API->>DB: valida código TOTP ou recovery code API->>F: 200 {access_token, refresh_token} else MFA desligado / não-inscrito API->>F: 200 {access_token, refresh_token, mfa_required: false} end ``` --- ## Settings Mixe `AuthSettings` na sua classe `Settings` (como no [recipe de auth flow](auth-flow.md#settings-variaveis-de-ambiente)) e configure por env: ```bash # .env — MFA AUTH_MFA_ENABLED=true # kill-switch global (default false) AUTH_MFA_ISSUER=Acme Inc. # nome mostrado no Authenticator AUTH_MFA_RECOVERY_CODES_COUNT=10 # quantos códigos gerar no enroll (2..50) AUTH_MFA_TOKEN_TTL_SECONDS=300 # TTL do mfa_token entre passo 1 e 2 (30..900) AUTH_MFA_VERIFY_WINDOW=1 # tolerância de drift, em passos de 30s (0..4) ``` | Setting | Default | O que faz | |---------|---------|-----------| | `AUTH_MFA_ENABLED` | `False` | Liga os endpoints `/auth/mfa/*` e o login de dois passos. | | `AUTH_MFA_ISSUER` | `"Tempest"` | Label ao lado do email no app Authenticator. Use o nome do seu produto. | | `AUTH_MFA_RECOVERY_CODES_COUNT` | `10` | Quantidade de recovery codes gerados no enrollment. | | `AUTH_MFA_TOKEN_TTL_SECONDS` | `300` | Vida do `mfa_token` intermediário (5 min). | | `AUTH_MFA_VERIFY_WINDOW` | `1` | Janela de tolerância pro relógio do usuário. `1` aceita passo anterior + atual + próximo (90s). `0` é estrito; acima de `2` enfraquece. | --- ## Service direto Se você monta os próprios endpoints (sem `make_auth_router`), os seis métodos do `UserAuthService` cobrem o ciclo inteiro: ```python from sqlalchemy.ext.asyncio import AsyncSession from src.db.models import UserModel, UserRecoveryCodeModel async def enroll_user(service: UserAuthService, session: AsyncSession, user: UserModel) -> None: """Gera segredo + recovery codes e mostra ao usuário (uma vez).""" secret, provisioning_uri, recovery_codes = await service.mfa_enroll( session, user=user, recovery_code_model=UserRecoveryCodeModel, ) await session.commit() # renderize provisioning_uri como QR; mostre recovery_codes async def confirm_user( service: UserAuthService, session: AsyncSession, user: UserModel, code: str ) -> None: """Ativa MFA depois que o usuário prova que escaneou o QR.""" await service.mfa_confirm(session, user=user, code=code) await session.commit() ``` Superfície completa: | Método | Assinatura (resumida) | Retorno | |--------|-----------------------|---------| | `is_mfa_enrolled` | `(user) -> bool` | `True` se MFA ativo (e kill-switch ligado). | | `issue_mfa_token` | `(user) -> str` | JWT curto que liga passo 1 e passo 2. | | `mfa_enroll` | `(session, *, user, recovery_code_model) -> tuple[str, str, list[str]]` | `(secret, provisioning_uri, recovery_codes)`. | | `mfa_confirm` | `(session, *, user, code) -> None` | Ativa MFA. | | `mfa_verify` | `(session, *, mfa_token, code, recovery_code_model) -> UserModel` | Usuário autenticado (mint o JWT depois). | | `mfa_disable` | `(session, *, user, password, code, recovery_code_model) -> None` | Limpa segredo + códigos. | --- ## Segurança - **Segredo TOTP persistido no `UserModel`.** Considere criptografar a coluna `totp_secret` em repouso (Postgres `pgcrypto` ou um wrapper Fernet no nível da aplicação). - **Recovery codes guardados como hash SHA-256.** O plaintext sai uma única vez no enrollment; vazamento da tabela não rende códigos usáveis. - **Recovery codes são single-use.** `used_at` é carimbado no consume; replay rejeitado. - **`disable` exige senha + código.** Uma sessão sequestrada não consegue desligar MFA sozinha — precisa da senha **e** de um fator ativo. - **`mfa_token` é curto e bound ao usuário.** TTL de 5 min por padrão; carrega `purpose: "mfa_pending"` + o `sub`. Tokens de outro propósito são rejeitados em `mfa_verify`. - **Verificação em tempo constante.** `TOTPHelper.verify` delega ao `pyotp`, que compara o código com `hmac.compare_digest`. --- ## Próximos passos - **[Auth flow (signup/reset) »](auth-flow.md)** — o fluxo de conta local que o MFA estende. - **[Sessões server-side »](sessions.md)** — alternativa ao JWT, combinável com MFA no passo 1. - **[Segurança »](security.md)** — CSRF, rate-limit e body-size limit pros endpoints de auth. --- # Sessões server-side Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/sessions/ # Sessões server-side Desde v0.34.0 o SDK fornece o ciclo completo de autenticação baseada em **sessões server-side** — alternativa ao fluxo JWT do `UserAuthService`. O cookie carrega apenas um id opaco; estado real (user_id, TTL, metadata do cliente, payload da app) vive num **`SessionStore`** plugável (Memory pra dev/testes, Redis pra produção). ## JWT vs sessões server-side | Aspecto | JWT (`UserAuthService`) | Sessions (`SessionAuth`) | |---|---|---| | Estado | stateless (no cliente) | stateful (no Redis/Memory) | | Cookie size | ~500 B – 1 KB (JWT) | 64 B (opaque id) | | Revogação | espera token expirar (~1h típico) | **instantânea** (delete da row) | | Logout global | precisa de blocklist ou rotacionar JWT_SECRET | `revoke_all(user_id)` num call | | CSRF | precisa de header bearer custom | cookie HttpOnly + double-submit token nativo | | Multi-device UI ("logado em 3 lugares") | sem state → impossível direto | `list_sessions(user_id)` trivial | | Multi-replica | trivial (verify-only) | exige Redis (ou sticky) | | Latência por request | nenhuma DB (decode CPU) | 1 hit Redis (~0.5ms LAN) | **Use sessions quando:** SaaS B2C, painel admin, fluxo SSR (HTMX/Django-like), revogação instantânea é requisito, UI de "dispositivos ativos" é feature. **Use JWT quando:** APIs públicas consumidas por mobile/SPA, microservices stateless, escala alta sem dependência de Redis. ## Conteúdo da receita 1. **[Setup mínimo](#setup-minimo)** — wire de 4 objetos (`SessionStore`, `SessionAuth`, `SessionMiddleware`, `make_session_router`). 2. **[Endpoints bundled](#endpoints)** — login / logout / me / list / revoke. 3. **[Settings (`SessionSettings`)](#settings)** — flags + defaults. 4. **[Stores](#stores)** — `MemorySessionStore` vs `RedisSessionStore`. 5. **[Como o middleware injeta a sessão](#middleware)** — `request.state.session` + dependency. 6. **[Segurança](#seguranca)** — anti-fixation rotation, hash-at-rest, anti-enumeração, CSRF. 7. **[Trade-offs e quando NÃO usar](#trade-offs)** — multi-replica, mobile, edge. --- ## Setup mínimo Quatro objetos compõem o fluxo. Mount uma vez no `app.py`: ```python # src/api/app.py from fastapi import FastAPI from tempest_fastapi_sdk import ( AsyncDatabaseManager, RedisSessionStore, SessionAuth, SessionMiddleware, SessionSettings, make_session_router, register_exception_handlers, ) from tempest_fastapi_sdk.cache import AsyncRedisManager from src.core.settings import settings from src.db.models import UserModel db = AsyncDatabaseManager(settings.DATABASE_URL) cache = AsyncRedisManager(settings.REDIS_URL) session_settings = SessionSettings() session_store = RedisSessionStore(cache.client, prefix=f"{settings.APP_NAME}:") session_auth = SessionAuth( user_model=UserModel, store=session_store, settings=session_settings, ) def create_app() -> FastAPI: app = FastAPI(title="my-app") register_exception_handlers(app) # Order matters: middleware ANTES dos routers. app.add_middleware( SessionMiddleware, session_auth=session_auth, settings=session_settings, ) app.include_router( make_session_router( session_auth, session_factory=db.session_dependency, ) ) return app app = create_app() ``` Pronto. O usuário faz `POST /auth/session/login` com email+senha; o SDK seta o cookie HttpOnly+Secure; toda request subsequente que carrega o cookie tem `request.state.session` populado. --- ## Endpoints Cinco endpoints bundled cobrindo o ciclo todo: | Método | Path | Body / Output | Comportamento | |---|---|---|---| | POST | `/auth/session/login` | `SessionLoginSchema` → `SessionResponseSchema` | Verifica bcrypt. Mint nova sessão. Seta `Set-Cookie: tempest_session=; HttpOnly; Secure; SameSite=Lax`. Se já havia cookie, **rotaciona** (anti-fixation). | | POST | `/auth/session/logout` | — → `204 No Content` | Revoga a sessão atual + limpa cookie. Idempotente. | | GET | `/auth/session/me` | — → `Session` | Retorna a sessão atual (`user_id`, timestamps, ip, user_agent, data). `401` quando sem cookie. | | GET | `/auth/session/list` | — → `list[SessionSummarySchema]` | Lista todas as sessões ativas do usuário (UI "dispositivos ativos"). Marca a atual com `is_current=True`. | | DELETE | `/auth/session/{id}` | — → `204 No Content` | Revoga uma sessão específica pelo public id (32 chars do hash). Se for a própria, limpa o cookie. | --- ## Settings Mixe `SessionSettings` na sua `Settings`: ```python from tempest_fastapi_sdk import BaseAppSettings, SessionSettings class Settings(SessionSettings, BaseAppSettings): pass ``` ```bash # .env SESSION_TTL_SECONDS=86400 # 24h (default) SESSION_SLIDING=true # refresh expires_at a cada hit (default) SESSION_COOKIE_NAME=tempest_session SESSION_COOKIE_DOMAIN= # None = exato host SESSION_COOKIE_PATH=/ SESSION_COOKIE_SECURE=true # HTTPS only — false só pra dev HTTP SESSION_COOKIE_HTTPONLY=true # JavaScript não lê — sempre true SESSION_COOKIE_SAMESITE=lax # lax / strict / none SESSION_ROTATE_ON_LOGIN=true # anti-fixation ``` --- ## Stores ### `MemorySessionStore` — dev/testes ```python from tempest_fastapi_sdk import MemorySessionStore session_store = MemorySessionStore() ``` State no dict do processo. **Não escala** — restart do uvicorn limpa tudo, uma réplica não vê sessões da outra. Use em testes e localdev. ### `RedisSessionStore` — produção ```python from tempest_fastapi_sdk import RedisSessionStore from tempest_fastapi_sdk.cache import AsyncRedisManager cache = AsyncRedisManager(settings.REDIS_URL) session_store = RedisSessionStore(cache.client, prefix="myapp:") ``` Schema interno: - `myapp:sess:` — JSON da `Session`, TTL = `expires_at - now` - `myapp:user:` — Redis SET de hashes das sessões do user (índice pra `list_by_user` / `delete_by_user`) TTL é gerenciado pelo Redis automaticamente — sem janitor process. **Requer `[cache]` extra** (`redis` async client). ### Customizado Qualquer classe que implemente o protocol `SessionStore` (5 métodos async) plugga out-of-the-box — DynamoDB, Postgres table, Memcached, etc. --- ## Middleware `SessionMiddleware` roda **antes** dos routers, lê o cookie, resolve via store, popula `request.state.session`: ```python @router.get("/profile") async def profile(session: Session = Depends(make_session_dependency(required=True))): return {"user_id": str(session.user_id), "data": session.data} ``` **`required=True`** (default): sem cookie → `UnauthorizedException` → resposta `401` no envelope SDK. **`required=False`**: handler aceita ambos — `session` é `Session | None`. Use em endpoints públicos que adaptam conteúdo pra logged-in users. Acesso direto (sem dependency): ```python @router.get("/anything") async def handler(request: Request) -> dict: s: Session | None = request.state.session return {"authenticated": s is not None} ``` --- ## Segurança - **Hash at rest**: cookie carrega plaintext de 32 bytes URL-safe; store guarda só SHA-256. Vazamento da tabela `sessions` **não** dá login. - **Session-fixation prevention**: `SESSION_ROTATE_ON_LOGIN=True` (default) — login bem-sucedido sempre mint id novo, mesmo que o browser já tivesse um. Fecha o vetor "atacante planta cookie conhecido antes do login". - **CSRF nativo via SameSite**: `SESSION_COOKIE_SAMESITE=lax` (default) bloqueia POST cross-site. Combine com [`CSRFMiddleware`](security.md) pra GET-state-changing endpoints e form-submission. - **HttpOnly + Secure**: `SESSION_COOKIE_HTTPONLY=True` + `SESSION_COOKIE_SECURE=True` por default. JavaScript não lê (anti-XSS); browser não envia em HTTP. - **Sliding TTL com floor**: `SESSION_SLIDING=True` (default) refresh a cada hit, mas `created_at` permanece — você pode forçar logout absoluto após N dias via job que limpa rows com `created_at < now - 30d`. - **Anti-enumeração**: `/auth/session/login` rejeita email-errado e senha-errada com o **mesmo** `UnauthorizedException` + mesmo timing approximado (bcrypt sempre roda). - **Revogação instantânea**: `revoke_all(user_id)` no password-change / suspeita de compromisso → logout em todos os dispositivos no próximo request. --- ## Trade-offs **Quando NÃO usar:** - **API pública pra mobile** — apps nativos não dão atenção a cookies; bearer JWT no header `Authorization` continua melhor. - **Microservices stateless** — cada réplica decode JWT sem hit em DB. Sessions exige Redis compartilhado. - **Edge/CDN auth** — Cloudflare Workers etc. validam JWT no edge sem chegar no origin. Session exige roundtrip ao backend. **Quando combinar JWT + Session:** Possível. SPA web usa cookie de sessão; mobile do mesmo backend usa `UserAuthService.login` → JWT. Os dois flows coexistem sem conflito — `UserAuthService` e `SessionAuth` falam com o mesmo `UserModel`, diferem só no pós-verify (mint JWT vs mint Session). ## Próximos passos - **[Auth flow »](auth-flow.md)** — fluxo JWT bundled (signup / activate / reset). Sessions cobre só login/logout. - **[Segurança »](security.md)** — `CSRFMiddleware` pra blindar POST contra ataques cross-site mesmo com SameSite=lax. - **[Cache »](cache.md)** — `AsyncRedisManager` que alimenta o `RedisSessionStore`. --- # Métricas Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/metrics/ # Métricas O SDK oferece **dois caminhos complementares** de métricas: 1. **Prometheus de RED/USE para HTTP** (`PrometheusMiddleware` + `make_prometheus_router`) — escuta cada request, calcula `http_requests_total` + histograma de latência + `http_requests_in_flight`, expõe tudo num `GET /metrics` no formato texto do Prometheus pronto pra ser raspado pelo seu Prometheus / Grafana / Datadog. 2. **Snapshots de sistema sob demanda** (`MetricsUtils`) — coleta CPU / memória / disco / GPU NVIDIA pra um endpoint custom (debug page interna, /oncall, etc.). Sem export Prometheus integrado — o objetivo é dar a foto instantânea. Use o **#1** em produção sempre. Adicione o **#2** quando precisar inspecionar o host onde a app roda. ## #1 Prometheus HTTP — extra `[prometheus]` Instale com `[prometheus]` (puxa `prometheus-client`). O middleware mede toda request; o router serve o endpoint de scrape. ```bash uv add "tempest-fastapi-sdk[prometheus]>=0.32.0" ``` ```python # src/api/app.py from fastapi import FastAPI from tempest_fastapi_sdk import ( PrometheusMiddleware, make_prometheus_registry, make_prometheus_router, ) def create_app() -> FastAPI: app = FastAPI(title="my-service") # Registry isolado por app — evita colisões com outros prometheus-client globais. registry = make_prometheus_registry() app.add_middleware(PrometheusMiddleware, registry=registry) app.include_router(make_prometheus_router(registry=registry)) return app ``` Pronto. `GET /metrics` agora devolve algo como: ```text # HELP http_requests_total Total HTTP requests # TYPE http_requests_total counter http_requests_total{method="GET",path="/api/users",status="200"} 142.0 http_requests_total{method="POST",path="/auth/signup",status="201"} 7.0 # HELP http_request_duration_seconds HTTP request latency # TYPE http_request_duration_seconds histogram http_request_duration_seconds_bucket{le="0.005",method="GET",path="/api/users"} 89.0 ... http_requests_in_flight{method="GET",path="/api/users"} 2.0 ``` Buckets padrão (`DEFAULT_LATENCY_BUCKETS`) cobrem 5ms → 30s — adequado pra APIs típicas. Sobrescreva com `PrometheusMiddleware(registry=..., buckets=(0.001, 0.005, 0.025, 0.1, 0.5, 2, 10))` quando seu workload é mais granular. !!! tip "Path normalization" O `path` label usa o template da rota (`/api/users/{user_id}`), não o path concreto, pra não explodir a cardinalidade com UUIDs únicos. Isso vem do FastAPI/Starlette — você não precisa configurar nada. ### Scrape config `prometheus.yml`: ```yaml scrape_configs: - job_name: my-service metrics_path: /metrics static_configs: - targets: ["my-service:8000"] ``` Ou em compose: ```yaml services: my-service: image: ... ports: ["8000:8000"] prometheus: image: prom/prometheus:latest volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml ports: ["9090:9090"] ``` ## #2 Snapshot de sistema — extra `[metrics]` `MetricsUtils` coleta uso de CPU, memória, disco e GPU NVIDIA via `psutil` + `pynvml`. Todo método tem variante sync e async (o wrapper async roda o mesmo código via `asyncio.to_thread`). A amostragem de GPU degrada graciosamente para `[]` quando `pynvml` ou os drivers da NVIDIA estão ausentes. Instale com `[metrics]`. ```python # src/api/routers/system.py from typing import Any from fastapi import APIRouter from tempest_fastapi_sdk import MetricsUtils router = APIRouter() @router.get("/system-metrics") async def system_metrics() -> dict[str, Any]: """JSON snapshot. NOT the Prometheus endpoint — that one is /metrics.""" snapshot = await MetricsUtils.snapshot_async(disk_paths=["/", "/data"]) return snapshot.to_dict() ``` !!! warning "Não monte no path `/metrics`" Esse endpoint **não** é o do Prometheus — montar no mesmo path conflita com o `make_prometheus_router` quando os dois estão ativos. Use `/system-metrics`, `/admin/sysinfo`, ou algum prefixo restrito de oncall. !!! warning "`MetricsUtils.cpu(interval=...)` bloqueia o event loop" A chamada sync gasta `interval` segundos amostrando — o wrapper `cpu_async` evita o bloqueio rodando em thread. Sempre prefira `MetricsUtils.snapshot_async()` em handlers. ### Coletores individuais ```python snapshot = await MetricsUtils.snapshot_async(disk_paths=["/"]) print(snapshot.cpu.percent, snapshot.memory.percent) for disk in snapshot.disks: print(disk.path, disk.percent) for gpu in snapshot.gpus: print(gpu.name, gpu.utilization_percent, gpu.memory_used_bytes) ``` Os coletores individuais também estão disponíveis: `MetricsUtils.cpu(interval=...)`, `MetricsUtils.memory()`, `MetricsUtils.disk(path)`, `MetricsUtils.disks(paths)`, `MetricsUtils.gpus()` — e suas variantes `*_async`. Cada um retorna uma dataclass tipada (`CPUMetrics`, `MemoryMetrics`, `DiskMetrics`, `GPUMetrics`, `SystemMetrics`) com um helper `to_dict()` para serialização JSON. --- # Painel admin Source: https://mauriciobenjamin700.github.io/tempest-fastapi-sdk/recipes/admin/ # Painel admin UI de gerenciamento no estilo Django montada sob `/admin`. Operadores entram com uma linha de usuário do banco (sem store de senha de admin separado) e navegam por todo modelo registrado pelo navegador, então a porta do banco pode ficar fechada em redes privadas. O painel é completo (paridade com o Django admin): list view com busca / filtros ricos por campo (enum / FK / range de data) / colunas ordenáveis, CRUD completo (criar / editar / excluir), ações em massa, export CSV/JSON, widgets FK-select, dashboard com contagens de linhas + métricas de sistema, MFA TOTP opcional no login, campos de upload de arquivo/imagem, e trilha de auditoria carimbando `created_by` / `updated_by`. Ainda no roadmap: edição inline/relacionada. Requer o extra `[admin]`: ```bash pip install "tempest-fastapi-sdk[admin]" ``` #### 1. Modelo de usuário Subclasse `BaseUserModel` para ganhar as quatro colunas que o backend de auth do admin espera (`email`, `hashed_password`, `is_admin`, `last_login_at`) em cima da linha padrão do `BaseModel`: ```python # src/db/models/user.py from tempest_fastapi_sdk import BaseUserModel class UserModel(BaseUserModel): __tablename__ = "users" # scaffold convention; admin slug derives from __tablename__ ``` `set_password()` / `check_password()` delegam ao `PasswordUtils`; `normalize_email()` deixa minúsculo e remove espaços. O `is_active` padrão (herdado do `BaseModel`) e o `is_admin` (default `False`) controlam o acesso — somente linhas `is_active=True` E `is_admin=True` podem entrar. Faça o bootstrap do primeiro admin pela sua CLI / migração / script de seed. O script completo conecta um `AsyncDatabaseManager`, abre uma sessão, insere a linha e dá commit — exatamente o mesmo padrão que seus repositories seguem em runtime: ```python # scripts/create_admin.py import asyncio from tempest_fastapi_sdk import AsyncDatabaseManager from src.core.settings import settings from src.db.models import UserModel async def main() -> None: db = AsyncDatabaseManager(settings.DATABASE_URL) await db.connect() try: async with db.get_session_context() as session: # ──────── the only admin-specific lines ──────── admin = UserModel(email="root@example.com", is_admin=True) admin.set_password("hunter2") # bcrypt via PasswordUtils session.add(admin) await session.commit() finally: await db.disconnect() if __name__ == "__main__": asyncio.run(main()) ``` As quatro linhas destacadas sob o comentário divisor são o único código de bootstrap específico de admin; tudo ao redor é o ciclo de vida async de DB padrão que o SDK já usa. #### 2. Registre suas classes de admin `AdminModel` é uma instância de configuração tipada simples — a assinatura do construtor é o contrato (sem mágica de atributo de classe / metaclass), e todo campo aceita um atributo de coluna SQLAlchemy real (`UserModel.email`), então erros de digitação aparecem no seu editor em vez de em runtime. Os defaults funcionam de cara; passe os campos que quiser para enriquecer a list view: ```python # src/admin/site.py from sqlalchemy import desc from tempest_fastapi_sdk import AdminModel, AdminSite from src.db.models import UserModel, OrderModel site = AdminSite( title="MyApp Admin", brand="servus-backend-admin", # texto centralizado no topo (opcional; default = title) index_subtitle="Site administration", site_url="https://myapp.com", # optional outbound "View site" link ) site.register(AdminModel( model=UserModel, list_display=[UserModel.email, UserModel.is_admin, UserModel.is_active, UserModel.last_login_at], list_filter=[UserModel.is_active, UserModel.is_admin], search_fields=[UserModel.email], readonly_fields=[UserModel.id, UserModel.hashed_password, UserModel.created_at, UserModel.updated_at], ordering=desc(UserModel.created_at), page_size=25, )) ``` Toda referência a campo também aceita uma string simples (`list_display=["email", ...]`) para configuração dinâmica, e `ordering` aceita uma coluna (ascendente), `desc(column)` / `asc(column)`, ou uma string no estilo Django `"-created_at"`. `register` retorna a instância e levanta `ValueError` em slug duplicado. Os slugs derivam por padrão do `__tablename__` do modelo, para que URLs e tabelas do banco fiquem em sincronia. !!! info "Filtros automáticos por tipo de coluna" Cada campo em `list_filter` vira o widget certo conforme o tipo da coluna: **boolean** → dropdown Sim/Não; **enum** → dropdown com os membros; **FK** (cujo destino tem `AdminModel` registrado) → dropdown das linhas relacionadas (label pelo `search_fields`); **date/datetime** → dois inputs de data (de/até, range inclusivo); qualquer outra coluna → input de texto (igualdade). Tudo preserva busca/ordenação/paginação na URL. !!! tip "Marca centralizada e customizável" O nome exibido no centro do header vem de `brand` (opcional). Sem ele, cai no `title` — então sites existentes não mudam. Use `brand` para mostrar um nome distinto (ex.: `"servus-backend-admin"`) centralizado no topo de toda página. A sidebar é fixa e **sobrepõe header e footer** no desktop (z-index maior) — comportamento automático do CSS embutido, sem config. #### 2b. Atalho — registrar todos os modelos de uma vez (`automap`) Em vez de um `register` por tabela, aponte `automap` para o pacote dos modelos e o SDK descobre e registra **todo `BaseModel` concreto** automaticamente. Bases abstratas (`BaseUserModel` e cia. — sem `__tablename__`) são puladas sozinhas: ```python # src/admin/site.py from tempest_fastapi_sdk import AdminModel, AdminSite site = AdminSite(title="MyApp Admin", brand="servus-backend-admin") # Carrega TODAS as tabelas de src/db/models de uma vez: site.automap("src.db.models") ``` Misture os dois estilos: registre à mão os modelos que precisam de config própria, depois deixe o `automap` preencher o resto (ele pula slugs já registrados por padrão): ```python # UserModel ganha config caprichada... site.register(AdminModel( model=UserModel, list_display=[UserModel.email, UserModel.is_admin], search_fields=[UserModel.email], )) # ...e o automap registra o resto com os defaults. site.automap("src.db.models") ``` `automap` aceita: `exclude=[...]` (classe, nome de classe ou nome de tabela para esconder um modelo), `skip_registered=False` (levanta `ValueError` em colisão, igual `register`), e `**admin_kwargs` aplicados a todos (`page_size=50`, `can_delete=False`, ...). Para introspecção sem registrar, use a função `discover_models("src.db.models")` direto. !!! warning "Config uniforme" Os `**admin_kwargs` do `automap` valem para **todos** os modelos descobertos. Quando um modelo precisa de `list_display` / `search_fields` próprios, registre-o à mão **antes** do `automap` (com `skip_registered=True`, o default). #### 3. Monte o router ```python # src/api/app.py from fastapi import FastAPI from tempest_fastapi_sdk import UserModelAuthBackend, make_admin_router from src.admin.site import site from src.api.dependencies import db # singleton de src/api/dependencies/resources.py from src.core.settings import settings from src.db.models import UserModel app = FastAPI() app.include_router( make_admin_router( site, db=db, auth_backend=UserModelAuthBackend(UserModel), secret_key=settings.JWT_SECRET, # scaffold reuses JWT_SECRET — pelo menos 32 bytes prefix="/admin", cookie_secure=not settings.DEBUG, # True in production HTTPS show_logs=True, # liga a página de logs + item na sidebar log_dir=settings.LOG_DIR, # mesmo dir passado pro configure_logging ) ) ``` `make_admin_router` monta: - `GET /admin/login`, `POST /admin/login`, `POST /admin/logout` — fluxo de auth. - `GET/POST /admin/mfa` — desafio TOTP (segundo fator) entre a senha e o acesso, para principais com MFA habilitado. - `GET /admin/` — dashboard: card por modelo com **contagem de linhas** + Browse/New, e um **painel de métricas** (CPU/RAM/disco via `MetricsUtils`). Painel ligado por default, omitido sem o extra `[metrics]`, desligável com `make_admin_router(show_metrics=False)`. - `GET /admin/logs` — **logs da aplicação** (quando `show_logs=True`): lê os arquivos JSON estruturados escritos pelo `configure_logging(log_dir=…)`, com filtro por fonte (`?source=`), busca em texto (`?q=`) e paginação. Badges coloridos por nível. Quando ainda não há arquivos de log, mostra um estado vazio. - `GET /admin/m/{slug}/` — list view com paginação + busca em texto livre (`?q=`) + filtros por campo (`?filter_=value`) + **ordenação por coluna** clicável (`?sort=&dir=asc|desc`). - `GET /admin/m/{slug}/export.csv` / `export.json` — **exporta** o resultado atual (respeitando busca/filtros/ordenação) como CSV ou JSON. Limite de linhas via `make_admin_router(export_max_rows=…)` (default 5000). - `POST /admin/m/{slug}/bulk` — **ações em massa** (delete / activate / deactivate + suas **ações customizadas**) nas linhas selecionadas. - `GET/POST /admin/m/{slug}/new` — **criar** registro (quando `can_create`). - `GET /admin/m/{slug}/{identity}` — detail view com botões Edit/Delete. - `GET/POST /admin/m/{slug}/{identity}/edit` — **editar** registro (quando `can_edit`). - `POST /admin/m/{slug}/{identity}/delete` — **excluir** registro (quando `can_delete`). - `GET /admin/static/{path}` — assets CSS/HTMX embutidos. !!! info "Escrita (CRUD) + permissões" Create/edit/delete são controlados por flags no `AdminModel`: `can_create` / `can_edit` / `can_delete` (todas `True` por default; uma view desativada responde `404`). Todo POST de escrita carrega o token CSRF da sessão, validado no servidor (`403` em mismatch). Os **widgets de campo** são derivados do tipo da coluna — texto / textarea (strings longas) / number / checkbox / `datetime-local` / date / `select` para enums — com validação de obrigatórios + erros por campo re-renderizados no formulário. **Ações em massa**: a list view mostra checkboxes por linha + select-all e uma barra de ação (delete / activate / deactivate) que opera nas linhas marcadas via `POST .../bulk` (CSRF + flags `can_delete`/`can_edit`), apoiada em `BaseRepository.delete_batch` / `bulk_update`. **FK-select**: uma coluna FK cujo destino tem `AdminModel` registrado vira um dropdown das linhas relacionadas (igual ao FK select do Django) no formulário, em vez de um input UUID cru. O label da opção vem do primeiro `search_fields` do admin referenciado (fallback: atributo `name`/`title`/`email`, depois o id). Limitado a 1000 linhas; FK para tabela não-gerenciada continua input UUID. **MFA no login**: um principal com MFA habilitado (colunas `totp_secret`/`totp_enabled_at` do `MFAMixin`) passa por um desafio TOTP em `/admin/mfa` depois da senha — só um código válido libera o acesso. Habilite passando um usuário com MFA via `UserModelAuthBackend(UserModel, mfa_issuer=...)`; backends customizados sobrescrevem `mfa_enabled`/`verify_mfa`. **Audit trail**: create/edit pelo admin carimba `created_by`/`updated_by` (do `AuditMixin`) com o id do admin atuante; o detail mostra um painel **Audit** com timestamps e — quando o modelo tem as colunas de auditoria — o ator (UUID resolvido para nome via o auth backend). Modelos sem `AuditMixin` mostram só os timestamps. Ainda **não** incluídos (fases futuras do roadmap): inline/related editing. ## Ações customizadas (`@admin_action`) Além das 3 fixas (activate / deactivate / delete), você registra **ações próprias** — uma função async decorada com `@admin_action` e passada em `AdminModel(actions=[...])`. Cada uma vira uma opção no dropdown de ações em massa, operando nas linhas marcadas. ```python from tempest_fastapi_sdk import ( AdminActionContext, AdminActionResult, AdminModel, admin_action, ) @admin_action(label="Enviar boas-vindas") async def send_welcome(ctx: AdminActionContext) -> AdminActionResult: """Roda nas linhas selecionadas; a mensagem é exibida na list view.""" users = await ctx.repository.list(filters={"id": ctx.ids}) for user in users: await mailer.send_welcome(user.email) return AdminActionResult(f"{len(users)} e-mails enviados.") site.register(AdminModel(model=UserModel, actions=[send_welcome])) ``` O handler recebe um `AdminActionContext` com: | Campo | O que é | | --- | --- | | `ids` | Identidades das linhas marcadas. | | `repository` | `BaseRepository` do modelo, na sessão do request. | | `db_session` | A sessão DB (pra trabalho além do repositório). | | `request` | O request inbound. | | `session` | A sessão do admin autenticado. | | `principal` | A linha do usuário admin que disparou a ação. | Retorne um `AdminActionResult(message, category="success"|"error"|"warning")` pra exibir um banner na list view (ou `None` pra não mostrar nada). A função fica **diretamente chamável/testável** — o decorator só anexa metadados. Use `name=` pra fixar o identificador (default: nome da função) e `dangerous=True` pra marcar ação destrutiva. ## Campo de upload de arquivo / imagem Uma coluna `String` que guarda o caminho/chave de um arquivo pode virar um **input de upload** no formulário. Liste a coluna em `upload_fields` e passe um `upload_storage` (os backends que o SDK já tem — `LocalUploadStorage` / `MinIOUploadStorage`). No submit, o arquivo é salvo no storage e a **chave retornada** é gravada na coluna. ```python from tempest_fastapi_sdk import AdminModel from tempest_fastapi_sdk.utils import LocalUploadStorage site.register(AdminModel( model=DocumentModel, upload_fields=[DocumentModel.attachment], # coluna String que guarda a chave upload_storage=LocalUploadStorage("media/"), # ou MinIOUploadStorage(...) )) ``` - O form vira `multipart/form-data` automaticamente quando há `upload_fields`. - **Create**: arquivo obrigatório só se a coluna for `NOT NULL` e sem default. - **Edit**: sem arquivo novo → mantém o valor atual (mostra "Current: …"); com arquivo → substitui. - A coluna guarda a **chave** do storage (`//.`); use o `upload_storage` (ou `UploadUtils`) pra servir/baixar depois. !!! warning "`upload_fields` exige `upload_storage`" Registrar `upload_fields` sem `upload_storage` levanta `ValueError` na construção do `AdminModel` — sem storage não há onde gravar o arquivo. !!! tip "Navegação por sidebar + burger" Toda página autenticada tem uma **sidebar** persistente: Dashboard, um link por modelo registrado (agrupados em "Models") e, com `show_logs=True`, "Logs" em "System". O item da página atual fica destacado. No **desktop** a sidebar fica sempre visível à esquerda; no **mobile** (≤768px) ela vira off-canvas, aberta pelo ícone **burger** no header e fechada tocando no scrim — tudo CSS puro, sem JS. !!! info "Página de logs (`show_logs=True`)" `GET /admin/logs` lê os arquivos JSON estruturados que o `configure_logging(log_dir=…)` grava. Passe o **mesmo** `log_dir` para `make_admin_router`. A página oferece filtro por fonte (`all`/`debug`/`info`/`warning`/`error`/`critical`/`500`), busca por substring na mensagem e paginação, com badges coloridos por nível. É **opt-in** (`show_logs=False` por default) porque o payload expõe tracebacks e metadados de request — só habilite atrás do login do admin. Sem arquivos no `log_dir`, a página mostra um estado vazio. !!! tip "Responsivo por padrão" Os templates + CSS embutidos são responsivos: em telas estreitas (≤600px) o header empilha, busca/filtros/ações viram full-width, as tabelas ganham scroll horizontal (nunca quebram o layout) e o grid do detail colapsa para uma coluna. Headers de coluna são clicáveis para alternar a ordenação (▲/▼). #### 4. Defaults de segurança de sessão `SignedCookieSessionStore` usa `itsdangerous.TimestampSigner` (HMAC-SHA256) para assinar um único cookie: - `HttpOnly` sempre definido. - `Secure` marcado quando `cookie_secure=True` (padrão; desligue no dev HTTP local). - `SameSite=Lax` (`"lax"`/`"strict"`/`"none"` aceitos). - Tempo de vida padrão `8h`; cookies expirados ou adulterados são rejeitados silenciosamente. - Um token CSRF por sessão é gerado no login e exigido por todo POST de formulário (login, logout, criar, editar, excluir, ações em massa). - `secret_key` deve ter ao menos 32 bytes — chaves curtas levantam `ValueError` no momento da construção. !!! danger "Login em loop? É o `Secure` do cookie sobre HTTP puro" Se o `POST /admin/login` responde `303` (parece sucesso), mas o `GET /admin/` seguinte redireciona de volta pro login — repetindo pra sempre — o cookie de sessão **não está voltando**. Causa quase certa: `cookie_secure=True` enquanto o admin é servido por **HTTP puro** (sem TLS na frente). O browser recusa gravar um cookie `Secure` em conexão não-HTTPS, então nenhuma sessão persiste. ```python # ❌ Atado a DEBUG: em produção DEBUG=false → cookie_secure=True, # mas se não houver HTTPS na frente, o login entra em loop. make_admin_router(..., cookie_secure=not settings.DEBUG) # ✅ Controle dedicado, independente de DEBUG: make_admin_router(..., cookie_secure=settings.ADMIN_COOKIE_SECURE) ``` **Correção certa:** ponha HTTPS na frente (nginx/Caddy terminando TLS) e deixe `cookie_secure=True` — o cookie da sessão admin não deve trafegar em claro. **Paliativo** só quando o admin roda mesmo em HTTP (intranet, MVP): `cookie_secure=False`, ciente de que a sessão vai sem `Secure`. Não amarre esse flag ao `DEBUG` — ligar debug em produção é pior que o problema original. #### 5. Plugue um backend de auth customizado `AdminAuthBackend` é uma ABC, então troque o default por LDAP / OAuth / IAM externo subclasseando: ```python from typing import Any from sqlalchemy.ext.asyncio import AsyncSession from tempest_fastapi_sdk import AdminAuthBackend, AdminAuthError class OAuthAdminBackend(AdminAuthBackend): async def authenticate( self, session: AsyncSession, *, identifier: str, password: str, ) -> Any: principal = await my_oauth_client.authenticate(identifier, password) if not principal.has_role("admin"): raise AdminAuthError("not an admin") return principal async def load_principal( self, session: AsyncSession, principal_id: str, ) -> Any | None: return await my_oauth_client.get_user(principal_id) def principal_id(self, principal: Any) -> str: return principal.sub def display_name(self, principal: Any) -> str: return principal.email ``` Passe a instância via `auth_backend=` e o resto do pipeline do admin (sessões, dashboard, list, detail) segue funcionando sem mudanças. #### 6. Customizar a aparência — `AdminTheme` O CSS do admin é todo dirigido por **CSS custom properties** em `:root`. Em vez de forkar a folha de estilo, você passa um `AdminTheme` com **parâmetros tipados e documentados** — cores, logo, favicon, fonte, raio, rodapé, modo escuro — e a SDK injeta um bloco `