FleetPilot — API Conventions
FastAPI backend patterns for VS Code Claude sessions on fleetpilot-backend.
Architecture split
| Layer | Handles | Does NOT handle |
|---|---|---|
| FastAPI | GCS operations, Claude API, external services (DVLA, Credas), Cloud Tasks triggers | Standard CRUD |
| Supabase JS (frontend) | All standard CRUD — vehicles, customers, bookings, violations, incidents | Anything requiring server-side secrets |
Active routers: documents, violations (scan endpoints), incidents (partial), internal
Never route standard CRUD through FastAPI. Supabase JS server actions are the correct pattern.
FastAPI route pattern
@router.get("/things")
async def list_things(
tenant: Annotated[dict, Depends(get_current_tenant)],
status: Optional[str] = Query(default=None),
):
params = {
"tenant_id": f"eq.{tenant['tenant_id']}",
"archived_at": "is.null",
"order": "created_at.desc",
}
if status:
params["status"] = f"eq.{status}"
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{os.environ['SUPABASE_URL']}/rest/v1/things",
headers=supabase_headers(),
params=params,
)
if resp.status_code != 200:
raise HTTPException(status_code=502, detail="Failed to fetch")
return [ThingResponse(**row) for row in resp.json()]
Pydantic models
Three models per entity:
class ThingCreate(BaseModel):
# Required fields for POST
class ThingUpdate(BaseModel):
# All fields Optional for PATCH
# Use exclude_unset=True on dump
class ThingResponse(BaseModel):
# Response shape — includes computed fields
For PATCH: body.model_dump(exclude_unset=True, mode="json")
This ensures datetime/date objects serialise to ISO strings.
Auth dependency
from auth import get_current_tenant
async def my_route(tenant: Annotated[dict, Depends(get_current_tenant)]):
tenant_id = tenant['tenant_id']
# Every route has access to tenant_id via the JWT
The get_current_tenant dependency:
1. Reads Authorization: Bearer <token> header
2. Verifies Supabase JWT using SUPABASE_JWT_SECRET
3. Extracts tenant_id from token claims
4. Returns 401 if missing or invalid
Supabase headers
def supabase_headers() -> dict:
return {
"apikey": os.environ["SUPABASE_SERVICE_KEY"],
"Authorization": f"Bearer {os.environ['SUPABASE_SERVICE_KEY']}",
"Content-Type": "application/json",
"Prefer": "return=representation",
}
Error responses
# 404
raise HTTPException(status_code=404, detail="Thing not found")
# 409 conflict
raise HTTPException(status_code=409, detail="Vehicle already booked for these dates")
# 502 upstream failure
raise HTTPException(status_code=502, detail="Failed to fetch from database")
# 422 validation
raise HTTPException(status_code=422, detail="No fields to update")
NEVER return raw Supabase error messages. Generic messages only.
Soft deletes
Archive via PATCH to set archived_at:
async with httpx.AsyncClient() as client:
resp = await client.patch(
f"{SUPABASE_URL}/rest/v1/things",
headers=supabase_headers(),
params={"id": f"eq.{thing_id}", "tenant_id": f"eq.{tenant_id}"},
json={"archived_at": datetime.utcnow().isoformat()},
)
All list queries filter "archived_at": "is.null".
Internal endpoints (Cloud Tasks / Cloud Scheduler)
Internal endpoints use X-Internal-Secret header instead of JWT:
@router.post("/internal/jobs/process-pcn")
async def process_pcn(request: Request):
secret = request.headers.get("X-Internal-Secret")
if secret != os.environ["INTERNAL_SECRET"]:
raise HTTPException(status_code=403)
# ...
Pattern for job processing: 1. Check status is 'queued' (idempotency guard) 2. Set status to 'processing' 3. Do the work 4. Set status to 'done' or 'needs_attention' 5. If crash mid-flight: Cloud Scheduler reset cron recovers stuck 'processing' jobs after 10 min
Cloud Tasks enqueueing
from google.cloud import tasks_v2
def enqueue_process_pcn_task(job_id: str, tenant_id: str) -> Optional[str]:
if not os.environ.get("GCP_PROJECT_ID"):
print(f"GCP_PROJECT_ID not set — Cloud Tasks task not enqueued for job {job_id}")
return None # Dev mode — silent skip
client = tasks_v2.CloudTasksClient()
# ... create task pointing to /internal/jobs/process-pcn
# Raises on API failure (caller should catch and set job to needs_attention)
GCS operations
from google.cloud import storage
import json, os
def get_gcs_client() -> storage.Client:
sa_json = os.environ["GCS_SERVICE_ACCOUNT_JSON"]
credentials = service_account.Credentials.from_service_account_info(
json.loads(sa_json)
)
return storage.Client(credentials=credentials)
def generate_signed_upload_url(bucket: str, object_path: str, content_type: str, expires_seconds=3600) -> str:
client = get_gcs_client()
blob = client.bucket(bucket).blob(object_path)
return blob.generate_signed_url(
version="v4",
expiration=timedelta(seconds=expires_seconds),
method="PUT", # Must be PUT for direct browser upload
content_type=content_type,
)
GCS bucket: saas-rental-492618-documents
Object path pattern: {tenant_id}/{entity_type}/{entity_id}/{doc_type}/{uuid}_{filename}
Migration conventions
Migration files: {timestamp}_{description}.sql
Next migration number: check migrations/ folder for the latest timestamp.
Apply: supabase db push after every backend PR that includes migrations.
Always use:
- public.current_tenant_id() for RLS policies
- set_updated_at() trigger for updated_at columns
- SECURITY DEFINER SET search_path = public on triggers
Testing
# tests/test_things.py — follows conftest.py fixture pattern
# Uses respx to mock Supabase HTTP calls
# Run: pytest tests/ -v
Test files: test_documents.py, test_violations.py, test_incidents.py