Skip to content

Real-time

Push data to clients without the client polling. SSE for server→browser broadcasts, Web Push for notifications even when the page is closed.

Server-Sent Events (SSE)

EventStream is an in-memory async queue feeding one SSE HTTP connection. ServerSentEvent encodes one frame; sse_response wraps the byte stream in a Starlette StreamingResponse with SSE-friendly headers.

# src/api/routers/events.py
import asyncio
from collections.abc import AsyncIterator

from fastapi import APIRouter
from starlette.responses import StreamingResponse

from tempest_fastapi_sdk import EventStream, sse_response

router = APIRouter()


@router.get("/events")
async def events() -> StreamingResponse:
    """Emit three SSE frames then close the stream."""
    stream = EventStream(heartbeat_seconds=15.0)

    async def producer() -> None:
        try:
            for n in range(1, 4):
                await stream.publish({"n": n}, event="counter", id=str(n))
                await asyncio.sleep(1)
        finally:
            await stream.close()

    task = asyncio.create_task(producer())

    async def lifecycle_aware() -> AsyncIterator[bytes]:
        try:
            async for chunk in stream.stream():
                yield chunk
        finally:
            # Client closed the connection (or producer finished + close()
            # was called) — cancel the producer so it doesn't leak.
            task.cancel()

    return sse_response(lifecycle_aware())

Pattern: link producer to client connection lifecycle

SSE streams are long-lived. If the client disconnects mid-stream, you don't want the producer running forever. Wrapping stream.stream() in an outer async generator gives a finally block that runs when the underlying response closes — cancel the producer there.

Browser side:

const es = new EventSource("/events");
es.addEventListener("counter", (e) => console.log("got", JSON.parse(e.data)));

heartbeat_seconds emits a : keepalive SSE comment when idle so load-balancers don't close long-lived connections. ServerSentEvent.data accepts strings, bytes or any JSON-serializable Python object — non-strings are JSON-encoded automatically. Pass retry= to hint the browser at the reconnect delay (milliseconds).

Web Push notifications

WebPushDispatcher wraps the synchronous pywebpush library in asyncio.to_thread and surfaces the two errors the application cares about: WebPushGoneError (HTTP 404/410 — delete the subscription) and WebPushError (everything else). Install with [webpush].

# src/services/notifications.py
from tempest_fastapi_sdk import (
    WebPushDispatcher,
    WebPushGoneError,
    WebPushPayloadSchema,
    WebPushSubscriptionSchema,
)


dispatcher = WebPushDispatcher(
    settings.VAPID_PRIVATE_KEY,
    vapid_subject="mailto:ops@example.com",
    ttl_seconds=60,
)


async def notify_order_paid(
    subscription: WebPushSubscriptionSchema,
    order_id: str,
) -> None:
    payload = WebPushPayloadSchema(
        title="Pagamento confirmado",
        body=f"Pedido {order_id} aprovado.",
        icon="/static/icons/order.png",
        data={"orderId": order_id, "url": f"/orders/{order_id}"},
    )
    try:
        await dispatcher.send(subscription, payload)
    except WebPushGoneError:
        # Prune the subscription from your store.
        await subscriptions_repo.delete_by_endpoint(subscription.endpoint)


async def broadcast(subs: list[WebPushSubscriptionSchema], payload: WebPushPayloadSchema) -> None:
    gone = await dispatcher.send_many(subs, payload)
    if gone:
        await subscriptions_repo.delete_by_endpoints(gone)

WebPushSubscriptionSchema round-trips the exact JSON PushSubscription.toJSON() emits in the browser (it aliases expiration_timeexpirationTime), so you can store inbound subscriptions verbatim and replay them on dispatch.