Python & FastAPI13 min read·By Liyabona Saki·

FastAPI + Kafka — Build Real-Time Event Systems

Build event-driven systems with FastAPI and Apache Kafka using aiokafka — producers, consumers, schemas, retries and exactly-once-ish semantics.

Advertisement

Introduction

Kafka is the spine of most modern event-driven systems. Pairing it with FastAPI gives you HTTP APIs in front of a real-time event stream — perfect for order pipelines, notifications, audit logs and analytics. This tutorial uses aiokafka and walks through producer, consumer, schemas and operational pitfalls.

For the Java equivalent, see Spring Boot + Kafka — Build a Real-Time Messaging System.

Key takeaways

  • Use aiokafka to avoid blocking the FastAPI event loop.
  • Producers should set acks=all and enable_idempotence=True in production.
  • Consumers must handle at-least-once semantics — design idempotent handlers.
  • Validate every event with Pydantic before processing.
  • Always commit offsets after successful processing, not before.

Setup

bash
pip install aiokafka pydantic

Producer wired into FastAPI's lifespan

```python
# app/events/producer.py
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI
import orjson

producer: AIOKafkaProducer | None = None

@asynccontextmanager async def lifespan(app: FastAPI): global producer producer = AIOKafkaProducer( bootstrap_servers="kafka:9092", enable_idempotence=True, acks="all", compression_type="zstd", ) await producer.start() try: yield finally: await producer.stop()

app = FastAPI(lifespan=lifespan)

async def publish(topic: str, key: str, value: dict): await producer.send_and_wait(topic, orjson.dumps(value), key=key.encode()) ```

Endpoint that emits an event

python
@app.post("/orders")
async def create_order(payload: OrderCreate):
    order = await svc.create(payload)
    await publish("orders.created", key=str(order.id), value=order.model_dump(mode="json"))
    return order

Consumer service

```python
# consumer/main.py
import asyncio, orjson
from aiokafka import AIOKafkaConsumer
from app.schemas.events import OrderCreated

async def run(): consumer = AIOKafkaConsumer( "orders.created", bootstrap_servers="kafka:9092", group_id="notifications", enable_auto_commit=False, auto_offset_reset="earliest", ) await consumer.start() try: async for msg in consumer: event = OrderCreated.model_validate_json(msg.value) try: await handle(event) await consumer.commit() except Exception: # leave offset uncommitted → message will be re-delivered raise finally: await consumer.stop()

asyncio.run(run()) ```

Schemas

Define event contracts as Pydantic models in a shared package versioned alongside producer + consumer. For polyglot teams, Avro + Schema Registry is the right next step.

Production best practices

  • Tune partitions for parallelism — a topic with one partition cannot scale beyond one consumer.
  • Keep handlers idempotent: use a dedupe table keyed by event id.
  • Set max.poll.interval.ms higher than your slowest handler.
  • Use a dead-letter topic for poison messages; never drop silently.

Common mistakes

  • Auto-committing offsets — you'll lose messages on crash mid-processing.
  • Doing heavy work inside the producer endpoint — push to Kafka, return 202, process async.
  • Sharing a consumer group across unrelated services — they'll steal each other's messages.

Tools used in this tutorial

The setup below uses the following tools. Versions matter less than the role each one plays, so swap freely as long as you keep the responsibilities the same.

  • Python 3.12+
  • FastAPI
  • Uvicorn / Gunicorn
  • Pydantic v2
  • SQLAlchemy 2.x
  • Docker
  • Kubernetes

Real-world production context

In production, a Python backend like this rarely runs as a single uvicorn process on a laptop. Teams build the app into a container image, push it to a registry, and run it behind a managed load balancer with autoscaling, health checks and centralized logging. Async FastAPI services typically use multiple Uvicorn workers managed by Gunicorn, a managed Postgres for state, Redis for cache and rate-limit counters, and an object store for uploads. The reason cloud hosting becomes a hard requirement is operational: zero-downtime rollouts, log aggregation, metrics, and TLS at the edge are not optional once real users depend on the API.

Recommended tools & deployment options

Once the tutorial works on your machine, the next question is *where do I run this for real?* These are the platforms most Python backend teams reach for:

  • DigitalOcean — the simplest path from a working FastAPI container to a public URL. App Platform deploys directly from a Dockerfile, managed Postgres and Redis are one click away, and pricing is predictable. A common way to deploy the setup in this tutorial is using a cloud provider like DigitalOcean when you want to ship quickly without learning a full cloud SDK.
  • AWS — the default for enterprise workloads. ECS Fargate or EKS run containers without you managing servers, RDS handles Postgres, and CloudWatch covers logs and metrics.
  • Docker — the packaging format every modern deploy target understands. Build once, run the same image locally, in CI and in production.
  • Kubernetes (managed: EKS, DOKS, GKE) — the right choice once you have more than a handful of services, need rolling updates, autoscaling and policy-driven networking.

A VPS or managed cloud service is required to run this architecture end-to-end — uvicorn --reload is for development, not for serving traffic.

FAQ

Kafka vs Redis Streams vs RabbitMQ? Kafka for high throughput + replay. Redis Streams for simple, single-cluster pipelines. RabbitMQ for classic task queues.

Next steps & related tutorials

Keep the momentum going with the next tutorial in this learning path:

Architecture

Kafka Producer → Topic → Consumer

PRODUCERBROKERCONSUMERSDATApublishsubscribeProducer Service@KafkaListenerKafka TopicPartitions 0–NConsumer AGroup: ordersConsumer BGroup: ordersAnalytics WorkerGroup: analyticsPostgreSQLData WarehouseS3 / Snowflake
The producer publishes messages to a partitioned topic. Consumers in the same group share partitions; offsets track progress for at-least-once delivery.

TL;DR

Key takeaways

  • Understand the core concepts behind FastAPI + Kafka — Build Real-Time Event Systems in a production context.
  • Apply the patterns to real Python & FastAPI systems, not just toy examples.
  • Recognize the trade-offs, failure modes, and operational concerns before adopting them.
  • Get a clear path to the next step — related tutorials, tools, and reference architectures.

Avoid these

Common mistakes

  • 1. Copy-pasting code without understanding the trade-offs

    It's tempting to ship a snippet from a blog post into production, but Python & FastAPI patterns only work when the failure modes are understood. Always reason about timeouts, retries, and consistency.

  • 2. Skipping observability from day one

    Structured logs, metrics, and traces are not optional. Wire them in before you ship — debugging Python & FastAPI systems without them is painful and expensive.

  • 3. Optimizing too early

    Premature caching, sharding, or microservice extraction adds operational cost. Validate the bottleneck with real measurements first.

  • 4. Ignoring security defaults

    Secrets in env files, open management ports, missing RBAC — these are the most common production incidents. Treat security as part of the definition of done.

Ship it safely

Production best practices

Apply these before promoting FastAPI + Kafka — Build Real-Time Event Systems to a real production environment.

Scalability

Design Python & FastAPI services to scale horizontally. Keep request handlers stateless, push session and cache state to external stores (Redis, the database), and benchmark p95/p99 latency under realistic load before tuning.

Monitoring & Observability

Emit metrics (RED/USE), structured JSON logs, and distributed traces from day one. Wire dashboards and alerts to SLOs you actually care about — error rate, latency, saturation — not vanity metrics.

Logging

Log with correlation IDs, never log secrets or PII, and centralize logs (ELK, Loki, CloudWatch). Use levels deliberately: INFO for state changes, WARN for recoverable issues, ERROR for incidents.

Security

Apply least-privilege IAM, rotate secrets through a vault, validate every input, and patch dependencies on a schedule. For HTTP services, enable TLS everywhere and set sensible security headers.

Testing

Layer unit, integration, and contract tests. Run them in CI on every PR, and add smoke tests post-deploy. For Python & FastAPI systems, also run chaos and load tests before a major release.

Reliability & Rollouts

Ship with health checks, readiness probes, graceful shutdown, and a rollback strategy. Prefer canary or blue/green deploys over big-bang releases.

Questions

Frequently asked questions

Is this tutorial up to date?

Yes. This tutorial was last reviewed and updated on May 26, 2026. We revisit popular Python & FastAPI tutorials regularly to keep them aligned with current best practices.

What level is this tutorial aimed at?

It is written for working developers with some backend experience. Beginners can still follow along, and senior engineers will find production-grade patterns and trade-off discussions.

Do I need to follow every step in order?

The walkthrough is sequential because each step depends on the previous one. If you only need a specific concept, the table of contents at the top of the article lets you jump straight to that section.

Where can I find the source code?

Code samples are inlined in the tutorial. When a companion repository is published it will be linked at the top of this page.

Go deeper

Further reading

#FastAPI#Kafka#aiokafka#Event-Driven#Python#Microservices

More From the Channel

Follow the full tutorial series on YouTube

The MasterLabSystems channel publishes in-depth, project-based tutorials on Java, Spring Boot, microservices, Docker, Kubernetes, AWS and DevOps — the same topics covered on this site, with full code walkthroughs.

Stay in the Loop

Get the next tutorial in your inbox

next tutorial →

Deploy FastAPI Applications to Kubernetes

Related tutorials