Ir para o conteúdo

Tempo real

Empurre dados para os clientes sem que o cliente fique fazendo polling. SSE para broadcasts servidor→navegador, Web Push para notificações mesmo com a página fechada.

Server-Sent Events (SSE)

EventStream é uma fila async em memória que alimenta uma conexão HTTP SSE. ServerSentEvent codifica um frame; sse_response embrulha o stream de bytes em um StreamingResponse do Starlette com headers amigáveis ao SSE.

# src/api/routers/events.py
import asyncio
from collections.abc import AsyncIterator

from fastapi import APIRouter
from starlette.responses import StreamingResponse

from tempest_fastapi_sdk import EventStream, sse_response

router = APIRouter()


@router.get("/events")
async def events() -> StreamingResponse:
    """Emit three SSE frames then close the stream."""
    stream = EventStream(heartbeat_seconds=15.0)

    async def producer() -> None:
        try:
            for n in range(1, 4):
                await stream.publish({"n": n}, event="counter", id=str(n))
                await asyncio.sleep(1)
        finally:
            await stream.close()

    task = asyncio.create_task(producer())

    async def lifecycle_aware() -> AsyncIterator[bytes]:
        try:
            async for chunk in stream.stream():
                yield chunk
        finally:
            # Client closed the connection (or producer finished + close()
            # was called) — cancel the producer so it doesn't leak.
            task.cancel()

    return sse_response(lifecycle_aware())

Pattern: link producer to client connection lifecycle

SSE streams are long-lived. If the client disconnects mid-stream, you don't want the producer running forever. Wrapping stream.stream() in an outer async generator gives a finally block that runs when the underlying response closes — cancel the producer there.

No navegador:

const es = new EventSource("/events");
es.addEventListener("counter", (e) => console.log("got", JSON.parse(e.data)));

heartbeat_seconds emite um comentário SSE : keepalive quando ocioso, para que load-balancers não fechem conexões de longa duração. ServerSentEvent.data aceita strings, bytes ou qualquer objeto Python serializável em JSON — não-strings são codificados em JSON automaticamente. Passe retry= para sugerir ao navegador o atraso de reconexão (em milissegundos).

Notificações Web Push

WebPushDispatcher embrulha a biblioteca síncrona pywebpush em asyncio.to_thread e expõe os dois erros que importam para a aplicação: WebPushGoneError (HTTP 404/410 — apague a inscrição) e WebPushError (todo o resto). Instale com [webpush].

# src/services/notifications.py
from tempest_fastapi_sdk import (
    WebPushDispatcher,
    WebPushGoneError,
    WebPushPayloadSchema,
    WebPushSubscriptionSchema,
)


dispatcher = WebPushDispatcher(
    settings.VAPID_PRIVATE_KEY,
    vapid_subject="mailto:ops@example.com",
    ttl_seconds=60,
)


async def notify_order_paid(
    subscription: WebPushSubscriptionSchema,
    order_id: str,
) -> None:
    payload = WebPushPayloadSchema(
        title="Pagamento confirmado",
        body=f"Pedido {order_id} aprovado.",
        icon="/static/icons/order.png",
        data={"orderId": order_id, "url": f"/orders/{order_id}"},
    )
    try:
        await dispatcher.send(subscription, payload)
    except WebPushGoneError:
        # Prune the subscription from your store.
        await subscriptions_repo.delete_by_endpoint(subscription.endpoint)


async def broadcast(subs: list[WebPushSubscriptionSchema], payload: WebPushPayloadSchema) -> None:
    gone = await dispatcher.send_many(subs, payload)
    if gone:
        await subscriptions_repo.delete_by_endpoints(gone)

WebPushSubscriptionSchema faz round-trip exato do JSON que PushSubscription.toJSON() emite no navegador (ele faz o alias expiration_timeexpirationTime), então você pode armazenar inscrições recebidas literalmente e reproduzi-las no envio.