Skip to content

FleetPilot — API Conventions

FastAPI backend patterns for VS Code Claude sessions on fleetpilot-backend.


Architecture split

Layer Handles Does NOT handle
FastAPI GCS operations, Claude API, external services (DVLA, Credas), Cloud Tasks triggers Standard CRUD
Supabase JS (frontend) All standard CRUD — vehicles, customers, bookings, violations, incidents Anything requiring server-side secrets

Active routers: documents, violations (scan endpoints), incidents (partial), internal

Never route standard CRUD through FastAPI. Supabase JS server actions are the correct pattern.


FastAPI route pattern

@router.get("/things")
async def list_things(
    tenant: Annotated[dict, Depends(get_current_tenant)],
    status: Optional[str] = Query(default=None),
):
    params = {
        "tenant_id": f"eq.{tenant['tenant_id']}",
        "archived_at": "is.null",
        "order": "created_at.desc",
    }
    if status:
        params["status"] = f"eq.{status}"

    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"{os.environ['SUPABASE_URL']}/rest/v1/things",
            headers=supabase_headers(),
            params=params,
        )
    if resp.status_code != 200:
        raise HTTPException(status_code=502, detail="Failed to fetch")
    return [ThingResponse(**row) for row in resp.json()]

Pydantic models

Three models per entity:

class ThingCreate(BaseModel):
    # Required fields for POST

class ThingUpdate(BaseModel):
    # All fields Optional for PATCH
    # Use exclude_unset=True on dump

class ThingResponse(BaseModel):
    # Response shape — includes computed fields

For PATCH: body.model_dump(exclude_unset=True, mode="json") This ensures datetime/date objects serialise to ISO strings.


Auth dependency

from auth import get_current_tenant

async def my_route(tenant: Annotated[dict, Depends(get_current_tenant)]):
    tenant_id = tenant['tenant_id']
    # Every route has access to tenant_id via the JWT

The get_current_tenant dependency: 1. Reads Authorization: Bearer <token> header 2. Verifies Supabase JWT using SUPABASE_JWT_SECRET 3. Extracts tenant_id from token claims 4. Returns 401 if missing or invalid


Supabase headers

def supabase_headers() -> dict:
    return {
        "apikey": os.environ["SUPABASE_SERVICE_KEY"],
        "Authorization": f"Bearer {os.environ['SUPABASE_SERVICE_KEY']}",
        "Content-Type": "application/json",
        "Prefer": "return=representation",
    }

Error responses

# 404
raise HTTPException(status_code=404, detail="Thing not found")

# 409 conflict
raise HTTPException(status_code=409, detail="Vehicle already booked for these dates")

# 502 upstream failure
raise HTTPException(status_code=502, detail="Failed to fetch from database")

# 422 validation
raise HTTPException(status_code=422, detail="No fields to update")

NEVER return raw Supabase error messages. Generic messages only.


Soft deletes

Archive via PATCH to set archived_at:

async with httpx.AsyncClient() as client:
    resp = await client.patch(
        f"{SUPABASE_URL}/rest/v1/things",
        headers=supabase_headers(),
        params={"id": f"eq.{thing_id}", "tenant_id": f"eq.{tenant_id}"},
        json={"archived_at": datetime.utcnow().isoformat()},
    )

All list queries filter "archived_at": "is.null".


Internal endpoints (Cloud Tasks / Cloud Scheduler)

Internal endpoints use X-Internal-Secret header instead of JWT:

@router.post("/internal/jobs/process-pcn")
async def process_pcn(request: Request):
    secret = request.headers.get("X-Internal-Secret")
    if secret != os.environ["INTERNAL_SECRET"]:
        raise HTTPException(status_code=403)
    # ...

Pattern for job processing: 1. Check status is 'queued' (idempotency guard) 2. Set status to 'processing' 3. Do the work 4. Set status to 'done' or 'needs_attention' 5. If crash mid-flight: Cloud Scheduler reset cron recovers stuck 'processing' jobs after 10 min


Cloud Tasks enqueueing

from google.cloud import tasks_v2

def enqueue_process_pcn_task(job_id: str, tenant_id: str) -> Optional[str]:
    if not os.environ.get("GCP_PROJECT_ID"):
        print(f"GCP_PROJECT_ID not set — Cloud Tasks task not enqueued for job {job_id}")
        return None  # Dev mode — silent skip

    client = tasks_v2.CloudTasksClient()
    # ... create task pointing to /internal/jobs/process-pcn
    # Raises on API failure (caller should catch and set job to needs_attention)

GCS operations

from google.cloud import storage
import json, os

def get_gcs_client() -> storage.Client:
    sa_json = os.environ["GCS_SERVICE_ACCOUNT_JSON"]
    credentials = service_account.Credentials.from_service_account_info(
        json.loads(sa_json)
    )
    return storage.Client(credentials=credentials)

def generate_signed_upload_url(bucket: str, object_path: str, content_type: str, expires_seconds=3600) -> str:
    client = get_gcs_client()
    blob = client.bucket(bucket).blob(object_path)
    return blob.generate_signed_url(
        version="v4",
        expiration=timedelta(seconds=expires_seconds),
        method="PUT",  # Must be PUT for direct browser upload
        content_type=content_type,
    )

GCS bucket: saas-rental-492618-documents Object path pattern: {tenant_id}/{entity_type}/{entity_id}/{doc_type}/{uuid}_{filename}


Migration conventions

Migration files: {timestamp}_{description}.sql Next migration number: check migrations/ folder for the latest timestamp. Apply: supabase db push after every backend PR that includes migrations.

Always use: - public.current_tenant_id() for RLS policies - set_updated_at() trigger for updated_at columns - SECURITY DEFINER SET search_path = public on triggers


Testing

# tests/test_things.py — follows conftest.py fixture pattern
# Uses respx to mock Supabase HTTP calls
# Run: pytest tests/ -v

Test files: test_documents.py, test_violations.py, test_incidents.py