How We Built Schema-Per-Tenant Multitenancy in PostgreSQL with FastAPI

How do you isolate tenant data in a single PostgreSQL cluster?

Three patterns exist:

Approach Isolation Overhead
Separate databases Strongest Highest — one connection pool per tenant
Shared tables with a tenant_id column Weakest Easy to query wrong data, needs RLS
Separate schemas in one database Strong One pool, native PG namespacing

We chose schema-per-tenant. You get hard namespace isolation without the operational complexity of hundreds of databases. PostgreSQL's search_path becomes your tenant routing key. Let's get into it.


The Schema Layout

Our PostgreSQL instance holds three distinct schema categories:

┌─────────────────────────────────────────────────────────────┐
│                       PostgreSQL 16                          │
│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌────────────┐  ┌──────────┐  │
│  │ shared   │  │ internal │  │ tenant_abc │  │tenant_xyz│  │
│  ├──────────┤  ├──────────┤  ├────────────┤  ├──────────┤  │
│  │ user     │  │ sysconfig│  │ contact    │  │ contact  │  │
│  │ profile  │  │ auditlog │  │ campaign   │  │ campaign │  │
│  │ tenant   │  │ apiusage │  │ wabaaccount│  │wabaaccount│ │
│  │ business │  │          │  │ phonenumber│  │phonenumber│ │
│  │ aisensy_ │  │          │  │ template   │  │ template │  │
│  │  business│  │          │  │ message    │  │ message  │  │
│  └──────────┘  └──────────┘  └────────────┘  └──────────┘  │
└─────────────────────────────────────────────────────────────┘

shared — Cross-tenant identity data. User, Profile, Tenant, Business, AiSensyBusiness. These tables have a schema argument of "shared" in SQLAlchemy and are accessed from any session via the search_path.

internal — Platform-level system tables. Never touches tenant data. Admin routes and health checks live here.

tenant_<slug> — One schema per customer. Created at the moment the tenant completes onboarding. Every row in these tables belongs exclusively to that tenant — no tenant_id discriminator column needed anywhere.

The slug derives directly from the business name at signup. "Acme Corp" becomes acme-corp (slug) and tenant_acme_corp (schema name):

# app/core/utils.py
def generate_slug_from_business_name(business_name: str) -> str:
    # Strips legal suffixes, normalises unicode, replaces spaces with hyphens
    # "Acme Corp Pvt. Ltd." → "acme-corp"

And in the onboarding route:

# app/accounts/routes/onboarding.py
tenant = Tenant(
    schema_name=f"tenant_{slug_name.replace('-', '_')}",
    slug=slug_name,
)

The slug is the public-facing URL identifier. The schema name is the PostgreSQL namespace. Hyphens become underscores because PostgreSQL schema names cannot contain hyphens without quoting.


The Data Models

The Tenant model lives in shared and is the single source of truth for tenant identity:

class Tenant(Base):
    __tablename__ = "tenant"
    __table_args__ = {"schema": "shared"}

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    slug: Mapped[str] = mapped_column(String, index=True, unique=True)
    schema_name: Mapped[str] = mapped_column(String, index=True, unique=True)
    onboarding_complete: Mapped[bool] = mapped_column(Boolean, default=False)
    # ...

    @property
    def tenant_url(self) -> str:
        return f"{settings.FRONTEND_HOST}/{self.slug}"

User and Profile sit in shared too. A Profile links to exactly one Tenant via a nullable tenant_id FK. Before a user completes onboarding there's no tenant yet — that FK being nullable was an intentional design choice.

Tenant-specific models carry no tenant_id column at all:

class Contact(Base):
    __tablename__ = "contact"
    __table_args__ = {"schema": "tenant"}   # ← "tenant" is a placeholder
    # No tenant_id column. The schema IS the tenant boundary.

Wait — why schema="tenant" and not schema="tenant_acme_corp"? Because SQLAlchemy model definitions are singletons loaded once at process start. The actual schema target is controlled at runtime via search_path, not at model definition time. The string "tenant" in __table_args__ is just metadata used by Alembic (more on that below).


The Middleware: Extracting Tenant from URL

All tenant-specific API calls travel through URL paths of the form:

/{tenant-slug}/api/v1/contacts
/{tenant-slug}/api/v1/campaigns

Our TenantMiddleware (a Starlette BaseHTTPMiddleware) intercepts every request, extracts the slug from the first path segment, looks up the corresponding Tenant record, and stores it in request.state.tenant.

class TenantMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        path = request.url.path

        # Public API routes resolve tenant from API key, not URL
        if path.startswith("/api/v1/public"):
            request.state.tenant = None
            return await call_next(request)

        tenant_slug = self.extract_tenant_from_path(path)

        if tenant_slug:
            tenant = await self.get_tenant_by_slug(tenant_slug)
            if tenant:
                request.state.tenant = tenant
                # Rewrite path: /acme-corp/api/v1/contacts → /api/v1/contacts
                new_path = self.remove_tenant_from_path(path, tenant_slug)
                request.scope["path"] = new_path
        else:
            request.state.tenant = None

        return await call_next(request)

The path rewrite is critical. FastAPI's router has no idea about tenant slugs — it was registered with routes like /api/v1/contacts. If we didn't strip the prefix, every tenant URL would return 404. We mutate request.scope["path"] in-place; that's Starlette's documented mechanism for path rewriting.

Slug Validation to Prevent Route Conflicts

A naive approach just takes the first path segment as the slug. That would accidentally treat /docs, /api, /static as tenant slugs. Our validator rejects them:

def is_valid_tenant_slug(self, slug: str) -> bool:
    if not (3 <= len(slug) <= 63):
        return False
    if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9\-_]*[a-zA-Z0-9]$', slug):
        return False

    reserved_routes = {
        'api', 'www', 'docs', 'redoc', 'static', 'assets',
        'health', 'metrics', 'auth', 'login', 'public', ...
    }
    return slug.lower() not in reserved_routes

TTL Cache: Don't Hit Postgres on Every Request

Tenant lookups happen on every single HTTP request. We cannot afford a database round-trip for each one. We built a simple thread-safe TTL cache:

class _TenantCache:
    def __init__(self, ttl: int = 60):
        self._ttl = ttl
        self._store: dict[str, tuple[float, Tenant | None]] = {}
        self._lock = Lock()

    def get(self, slug: str) -> Tenant | None:
        with self._lock:
            entry = self._store.get(slug)
            if entry and (monotonic() - entry[0]) < self._ttl:
                return entry[1]
            return None

    def put(self, slug: str, tenant: Tenant | None) -> None:
        with self._lock:
            self._store[slug] = (monotonic(), tenant)

    def invalidate(self, slug: str) -> None:
        with self._lock:
            self._store.pop(slug, None)

_tenant_cache = _TenantCache(ttl=60)

The cache is module-level, so it persists across requests within a worker process. A 60-second TTL means a newly created or deactivated tenant takes up to a minute to reflect. Acceptable for our use case; Redis-backed caching would be the natural next step.


The Session Dependencies: search_path as a Routing Key

The real magic happens here. PostgreSQL's search_path tells the engine which schemas to search when an unqualified table name is used. Setting search_path = tenant_acme_corp, shared, public means:

  1. First look in tenant_acme_corp — all the tenant's own tables are found here
  2. Fall back to shared — so shared.user, shared.tenant etc. are still accessible
  3. Fall back to public — for extensions

We have three session dependency types defined in app/core/deps.py:

TenantSessionDep — For Tenant-Scoped Routes

async def get_tenant_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        tenant = getattr(request.state, "tenant", None)

        if not tenant:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Tenant context required. Include tenant slug in URL path: /{tenant}/api/v1/..."
            )

        schema_name = _validate_schema_name(tenant.schema_name)
        session.info["tenant_schema"] = schema_name
        session.info["tenant_id"] = tenant.id
        await session.execute(
            text('SET search_path TO "' + schema_name + '", shared, public')
        )
        yield session

TenantSessionDep = Annotated[AsyncSession, Depends(get_tenant_db)]

Notice session.info["tenant_schema"] = schema_name. That's not just metadata bookkeeping — it's the key to one of the trickiest problems we ran into.

InternalSessionDep — For Cross-Tenant Routes

async def get_internal_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        await session.execute(
            text('SET search_path TO shared, internal, public')
        )
        yield session

InternalSessionDep = Annotated[AsyncSession, Depends(get_internal_db)]

Authentication, registration, onboarding, webhooks from external systems — all use this. No tenant context, no schema switching.

PublicApiSessionDep — For External API Keys

The public REST API authenticates via API keys rather than JWT. The API key middleware resolves the tenant and stores tenant_schema and tenant_id in request.state before the dependency runs:

async def get_public_api_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    tenant_schema = getattr(request.state, "tenant_schema", None)
    tenant_id = getattr(request.state, "tenant_id", None)

    if not tenant_schema or not tenant_id:
        raise HTTPException(status_code=403, detail="Tenant required")

    schema_name = _validate_schema_name(tenant_schema)
    async with async_session_factory() as session:
        session.info["tenant_schema"] = schema_name
        session.info["tenant_id"] = tenant_id
        await session.execute(
            text('SET search_path TO "' + schema_name + '", shared, public')
        )
        yield session

The Hardest Bug: search_path Resets After Every Commit

This took us a while to fully understand. Here's what happens in PostgreSQL:

SET search_path applies to a connection, not a transaction. When you commit and SQLAlchemy returns the connection to the pool, the next statement that checks out that connection gets whatever search_path was set on it — which might be a completely different tenant's schema.

In our case: tenant A's request commits, returns the connection to the pool, tenant B's request gets that same connection, we set B's search path, all good. But within a single long request that does multiple commits (e.g., onboarding), after each commit the connection goes back to the pool, a new checkout might return a different physical connection, and the search_path we set earlier is gone.

SQLAlchemy's Session event system is the correct fix. We register a listener on Session.after_begin:

@event.listens_for(Session, "after_begin")
def _reapply_tenant_search_path(session, transaction, connection):
    """
    Re-apply the tenant search_path every time a transaction begins.

    SET search_path lasts only for the physical connection it ran on.
    A session returns its connection to the pool on every commit, so the
    next statement may run on a different connection that never got the SET.
    This listener fires on every begin (including the implicit one after a
    commit) and re-applies it, keyed off the schema name stashed in session.info.
    """
    schema = session.info.get("tenant_schema")
    if schema:
        connection.exec_driver_sql(
            f'SET search_path TO "{schema}", shared, public'
        )

This fires every time a transaction begins — including the implicit begin after every commit. Because we stash tenant_schema into session.info at session creation time, the listener always knows which schema to apply even after a pool checkout returns a different physical connection.

The session.info dict is part of the Session object and survives commits. The physical connection doesn't, but session.info does. That's the key insight.


SQL Injection in Schema Names

We were paranoid about this one. A tenant with a malicious slug could potentially inject SQL through the schema name in our SET search_path statement. We added a strict validator at every entry point:

_SCHEMA_RE = re.compile(r'^[a-zA-Z0-9_-]+$')

def _validate_schema_name(name: str) -> str:
    """Validate that a schema name is safe for SQL identifier use."""
    if not name or not _SCHEMA_RE.match(name):
        raise ValueError(f"Invalid schema name: {name}")
    return name

The schema name is always double-quoted in SQL (SET search_path TO "tenant_acme_corp"), which prevents identifier injection. The regex validation on top ensures only [a-zA-Z0-9_-] characters get through. The onboarding endpoint also validates schema names against a compiled regex before attempting CREATE SCHEMA:

_SCHEMA_NAME_RE = re.compile(r"^[a-z0-9_]+$")

# In complete_onboarding:
if not _SCHEMA_NAME_RE.match(schema_name):
    logger.error(f"Invalid schema name rejected: {schema_name}")
    return APIResponse.error(message="Invalid tenant configuration", ...)

Tenant Provisioning: The Onboarding Flow

When a new business signs up, their schema doesn't exist yet. We create it at the end of onboarding as an atomic, idempotent operation:

@router.post("/complete-onboarding")
async def complete_onboarding(session: InternalSessionDep, current_user: CurrentUser):
    # Step 1: Check if schema already exists (idempotency)
    existed = (await session.execute(
        text("SELECT 1 FROM information_schema.schemata WHERE schema_name = :s"),
        {"s": schema_name},
    )).scalar() is not None

    created_schema_this_attempt = False
    if not existed:
        conn = await session.connection()
        await conn.execute(sa.schema.CreateSchema(schema_name))
        await session.commit()
        created_schema_this_attempt = True

    # Step 2: Run Alembic migrations in a worker thread (Alembic is sync)
    try:
        await asyncio.to_thread(_run_tenant_alembic_upgrade, schema_name)
    except Exception:
        # Step 2a: Rollback schema if WE created it and migration failed
        if created_schema_this_attempt:
            conn = await session.connection()
            await conn.execute(sa.schema.DropSchema(schema_name, cascade=True))
            await session.commit()
        return APIResponse.server_error(message="Failed to create tenant schema. Please retry.")

    # Step 3: Verify sentinel tables exist (guard against silent partial migrations)
    for table_name in ("wabaaccount", "phonenumber"):
        present = (await session.execute(
            text("SELECT 1 FROM information_schema.tables WHERE table_schema = :s AND table_name = :t"),
            {"s": schema_name, "t": table_name},
        )).scalar() is not None
        if not present:
            return APIResponse.server_error(message="Tenant schema is incomplete. Please retry.")

    # Step 4: Mark onboarding complete
    tenant.onboarding_complete = True
    session.add(tenant)
    await session.commit()

Several design decisions worth calling out:

Idempotency. We check information_schema.schemata before creating. If the network drops after schema creation but before the response is delivered, the user can retry and it works correctly.

Migration in a thread. Alembic's command.upgrade is synchronous Python. Running it directly in an async def handler would block the entire event loop. asyncio.to_thread hands it to the thread pool.

Rollback only what we created. If we ran this endpoint on a pre-existing schema (e.g., admin re-running onboarding), we must not drop it on migration failure. The created_schema_this_attempt flag tracks whether this specific invocation created the schema.

Sentinel table verification. Alembic can "succeed" silently with a partially applied migration in edge cases (e.g., empty versions table, config mismatch). Checking that wabaaccount and phonenumber exist before marking onboarding_complete = True prevents us from giving a user a broken dashboard.


The Alembic Configuration: Three Independent Migration Tracks

Managing schema migrations across three schema categories (shared, internal, and N tenant schemas) required a multi-section alembic.ini:

[DEFAULT]
script_location = app/alembic

[internal]
version_locations = app/alembic/internal/versions

[shared]
version_locations = app/alembic/shared/versions

[tenant]
version_locations = app/alembic/tenant/versions

The env.py branches based on which section is active:

current_schema = context.config.attributes.get("schema") \
    or context.get_x_argument(as_dictionary=True).get("schema")

if current_schema == "internal":
    from app.core.db import Base
    target_metadata = Base.metadata

    def include_object(object, name, type_, reflected, compare_to):
        return type_ == "table" and object.schema == "internal"

elif current_schema == "shared":
    from app.accounts.models import *
    from app.publicapi.models import *
    from app.core.db import Base
    target_metadata = Base.metadata

    def include_object(object, name, type_, reflected, compare_to):
        return type_ == "table" and object.schema == "shared"

else:
    # tenant — runs against an actual tenant schema name
    from app.accounts.models import *
    from app.contacts.models import *
    from app.whatsapp.models import *
    from app.campaigns.models import *
    from app.core.db import Base
    target_metadata = Base.metadata

    def include_object(object, name, type_, reflected, compare_to):
        return type_ == "table" and (object.schema == "tenant" or object.schema is None)

And the connection setup in run_migrations_online applies the target schema to search_path:

connection.execute(text('set search_path to "%s"' % current_schema))
connection.commit()
context.configure(
    connection=connection,
    target_metadata=target_metadata,
    include_object=include_object,
    include_schemas=True,
)

Running migrations:

# Bootstrap shared schema (users, tenants, businesses)
alembic --name shared upgrade head

# Bootstrap internal schema
alembic --name internal upgrade head

# Migrate one specific tenant (used by onboarding code)
alembic --name tenant -x schema=tenant_acme_corp upgrade head

The -x schema=... flag passes the schema name as an Alembic -x argument. env.py reads it via context.get_x_argument(as_dictionary=True).get("schema").

_run_tenant_alembic_upgrade — Programmatic API

The onboarding route doesn't shell out. It uses Alembic's Python API, which is cleaner and doesn't require subprocess:

def _run_tenant_alembic_upgrade(schema_name: str) -> None:
    cfg = AlembicConfig(str(config.BASE_DIR / "alembic.ini"), ini_section="tenant")
    cfg.attributes["schema"] = schema_name
    alembic_command.upgrade(cfg, "head")

cfg.attributes["schema"] is how we pass the schema name when calling programmatically (vs the -x schema=... CLI approach). env.py reads from context.config.attributes.get("schema") first, so both paths work.


Bulk Tenant Migrations: migrate_tenants.py

Adding a new column to the contacts table means running Alembic against every existing tenant schema. We have a dedicated script for this:

def _get_tenant_schema_names() -> Sequence[str]:
    engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
    with engine.connect() as conn:
        rows = conn.execute(
            text("SELECT schema_name FROM shared.tenant ORDER BY schema_name")
        ).fetchall()
    return [row[0] for row in rows]


def _migrate_single_tenant(schema_name: str) -> TenantMigrationResult:
    """Apply alembic upgrade head to one tenant schema. Never raises."""
    try:
        cfg = AlembicConfig("alembic.ini", ini_section="tenant")
        cfg.attributes["schema"] = schema_name
        alembic_command.upgrade(cfg, "head")
        return TenantMigrationResult(schema_name=schema_name, success=True)
    except Exception as exc:
        logger.error("tenant_migration.failed schema=%s error=%s", schema_name, exc, exc_info=True)
        return TenantMigrationResult(schema_name=schema_name, success=False, error=str(exc))


def migrate_all_tenants() -> MigrationResult:
    schema_names = _get_tenant_schema_names()
    result = MigrationResult()
    for schema_name in schema_names:
        r = _migrate_single_tenant(schema_name)
        result.succeeded.append(r) if r.success else result.failed.append(r)
    return result

Key design choices here:

  • One failure never blocks others. Each tenant is migrated independently. _migrate_single_tenant wraps everything in a try/except and returns a result object instead of raising. Tenant B's migration proceeds even if Tenant A's fails.
  • Returns a structured result. Callers (Celery beat, CI scripts, CLI) can inspect result.has_failures and decide what to do.
  • Ordered by schema name. Deterministic ordering makes log analysis easier.

In production, we run this via a Celery beat task on every deploy, before the new app code starts serving traffic. A failed migration surfaces in structured logs tagged tenant_migration.failed and gets picked up by our Sentry integration.


Celery Integration: Sync Sessions for Background Workers

Celery workers are not async. FastAPI's session dependencies use AsyncSession and are tied to an HTTP request. Neither applies to a Celery task. We provide two context managers for non-request code:

@asynccontextmanager
async def tenant_session(tenant_id: UUID | str) -> AsyncIterator[AsyncSession]:
    """Tenant-scoped AsyncSession for non-request async code."""
    async with async_session_factory() as session:
        tenant = await session.get(Tenant, tenant_id)
        schema_name = _validate_schema_name(tenant.schema_name)
        session.info["tenant_schema"] = schema_name
        session.info["tenant_id"] = tenant_id
        await session.execute(
            text('SET search_path TO "' + schema_name + '", shared, public')
        )
        yield session


@contextmanager
def tenant_session_sync(tenant_id: UUID | str) -> Iterator[Session]:
    """Tenant-scoped sync Session for Celery tasks."""
    with Session(engine) as session:
        schema_name = _resolve_schema_name_sync(session, tenant_id)
        session.info["tenant_schema"] = schema_name
        session.info["tenant_id"] = tenant_id
        session.execute(
            text('SET search_path TO "' + schema_name + '", shared, public')
        )
        yield session

The sync variant uses the engine (not async_engine). We keep a dedicated sync SQLAlchemy engine running alongside the async one:

# Sync engine — Celery workers, Alembic, management scripts
engine = create_engine(
    str(config.settings.SQLALCHEMY_DATABASE_URI),
    pool_size=20,
    max_overflow=10,
    pool_recycle=300,
    pool_pre_ping=False,  # psycopg3 pre-ping needs greenlet context; disabled here
)

# Async engine — FastAPI request handlers
async_engine = create_async_engine(
    str(config.settings.SQLALCHEMY_DATABASE_URI),
    pool_size=20,
    max_overflow=10,
    pool_recycle=300,
    pool_pre_ping=True,
)

Note pool_pre_ping=False on the sync engine. psycopg3's pre-ping mechanism needs a greenlet context when running inside an asyncio event loop. Celery workers don't run in an event loop, and we don't want to pay the overhead of greenlet setup here. We rely on pool_recycle=300 and retry logic instead.

A typical Celery task looks like:

@celery_app.task
def send_campaign_batch(tenant_id: str, campaign_id: str, contact_ids: list[str]):
    with tenant_session_sync(tenant_id) as session:
        campaign = session.get(Campaign, campaign_id)
        contacts = session.query(Contact).filter(Contact.id.in_(contact_ids)).all()
        # ... process

The _reapply_tenant_search_path event listener fires here too, so even if a sync commit internally recycles the connection to a new one, the search path is re-applied.


The Dual Engine Architecture

The migration from sync SQLModel to async SQLAlchemy 2.0 gave us one non-obvious constraint: you cannot freely mix sync and async code paths with a single engine. Specifically:

  • FastAPI route handlers are async def, use AsyncSession, and must await everything
  • Celery task functions are plain def, use Session, and cannot await
  • Alembic is entirely synchronous

So we run two engines from the same database URI:

┌─────────────────────────────────────────────────────────────┐
│                    Database Connection Layer                  │
│                                                             │
│  ┌────────────────────────┐   ┌────────────────────────┐   │
│  │   async_engine         │   │       engine            │   │
│  │   (psycopg3 async)     │   │   (psycopg3 sync)       │   │
│  │   pool_size=20         │   │   pool_size=20          │   │
│  │   pool_pre_ping=True   │   │   pool_pre_ping=False   │   │
│  └───────────┬────────────┘   └────────────┬────────────┘   │
│              │                             │               │
│      FastAPI handlers                Celery workers        │
│      TenantSessionDep                tenant_session_sync   │
│      InternalSessionDep              Alembic migrations    │
│      PublicApiSessionDep             Management scripts    │
└─────────────────────────────────────────────────────────────┘

Both engines share the same PostgreSQL connection pool from the database side. From the application side they're completely independent; no session object ever crosses the async/sync boundary.


The Request Lifecycle: End to End

Putting it all together, here's what happens when a user GETs /acme-corp/api/v1/contacts:

1. Request arrives at uvicorn

2. CORSMiddleware → passes through

3. AuthenticationMiddleware
   → Decodes JWT → sets request.user

4. TenantMiddleware
   → Extracts "acme-corp" from path
   → Hits TTL cache → miss → async DB lookup
   → Finds Tenant(slug="acme-corp", schema_name="tenant_acme_corp")
   → request.state.tenant = tenant
   → request.scope["path"] = "/api/v1/contacts"  (prefix stripped)

5. FastAPI router matches /api/v1/contacts → contacts.list_contacts()

6. get_tenant_db() dependency runs
   → Creates AsyncSession via async_session_factory()
   → Validates schema name: "tenant_acme_corp" ✓
   → session.info["tenant_schema"] = "tenant_acme_corp"
   → await session.execute('SET search_path TO "tenant_acme_corp", shared, public')

7. list_contacts() handler executes
   → SELECT * FROM contact WHERE ... LIMIT 20
   → PostgreSQL resolves "contact" → "tenant_acme_corp"."contact"
   → Returns only acme-corp's contacts

8. Session commits → connection returned to pool
   → _reapply_tenant_search_path fires on next transaction begin

9. Response serialized and returned

Tenant A can never see Tenant B's contacts. The search_path ensures that SELECT * FROM contact only ever hits the correct schema.


Tradeoffs and What We'd Do Differently

What worked extremely well:

  • Zero tenant_id columns in tenant-schema tables. Queries are cleaner. There's no WHERE tenant_id = ? clause that a developer could accidentally forget.
  • Native PostgreSQL schema isolation is rock-solid. There's no application-layer RLS policy to get wrong.
  • Alembic's multi-section alembic.ini handles schema-specific migration tracks elegantly.
  • The after_begin event listener pattern for re-applying search_path was the cleanest solution we found.

Sharp edges:

  • Schema proliferation. With hundreds of tenants you accumulate hundreds of schemas. pg_catalog.pg_namespace grows linearly. \dn in psql becomes noisy. This is manageable at our current scale but becomes a concern at thousands of tenants.
  • Bulk migrations are slow. Running Alembic against 500 schemas sequentially takes time. We're exploring parallelising migrate_all_tenants() with a thread pool.
  • Alembic autogenerate is tricky. When you run alembic --name tenant revision --autogenerate, it compares against whichever tenant schema you point it at. The include_object filter is critical to prevent it from generating migrations for shared tables.
  • Cross-tenant queries are impossible. If you want aggregated analytics across all tenants, you need to either use a separate data warehouse or write a custom query that UNION ALLs across schemas dynamically. We keep analytics out of the main database for now.
  • No built-in tenant isolation at the Postgres level. If application code does something wrong (e.g., a raw text() query without validating the schema), it could theoretically access the wrong schema. Row-Level Security would add an extra safety net, but we haven't implemented it yet.

The Middleware Stack in main.py

The order of middleware registration matters enormously. Here's ours:

middlewares = [
    Middleware(CORSMiddleware, allow_origins=settings.all_cors_origins, ...),
    Middleware(AuthenticationMiddleware, backend=OAuth2PasswordBearerAuthenticationBackend()),
    Middleware(TenantMiddleware),
]

# Public API middleware added after (innermost = last to run)
app.add_middleware(RateLimitMiddleware)
app.add_middleware(IdempotencyMiddleware)
app.add_middleware(RequestIdMiddleware)

TenantMiddleware runs after auth so the user identity is available in request.user when the tenant lookup happens. Rate limiting is scoped to public API keys and runs before request processing but after tenant context is set.

One critical setting:

app = FastAPI(
    ...
    redirect_slashes=False,  # Prevent 307 redirects that lose context in multi-tenant apps
)

FastAPI by default sends a 307 redirect when you hit /acme-corp/api/v1/contacts/ instead of /acme-corp/api/v1/contacts. The redirect drops the tenant prefix because the path rewriting in TenantMiddleware has already happened. Disabling slash redirects eliminates this entire class of bug.


Conclusion

Schema-per-tenant PostgreSQL multitenancy with FastAPI comes down to these core mechanisms:

  1. Tenant model in shared stores slug → schema_name mapping
  2. TenantMiddleware extracts slug, resolves Tenant via TTL cache, rewrites path
  3. Session dependencies set SET search_path and stash tenant_schema in session.info
  4. after_begin event listener re-applies search_path after every commit/pool checkout
  5. Multi-section alembic.ini manages three independent migration tracks
  6. complete_onboarding provisions schemas via CREATE SCHEMA + Alembic programmatic API
  7. migrate_all_tenants drives per-deploy bulk schema migrations, failure-isolated per tenant
  8. Dual engine (async for FastAPI, sync for Celery) with session.info propagation

The system has been running in production without a cross-tenant data leak since launch. The complexity is concentrated in a handful of files (deps.py, middlewares.py, env.py, onboarding.py) and the rest of the application code just picks the right session dependency type.

If you're starting a similar project today, this architecture scales comfortably to hundreds of tenants on a single PostgreSQL instance. Beyond that, you'll want to evaluate PostgreSQL logical replication to secondary clusters, or re-evaluate whether separate databases per large enterprise customer makes sense for your tiered pricing model.


The Lynkist engineering team — May 2026

Share This Post

Comments (0)

Leave a Comment

No comments yet. Be the first to comment!