Async AI Job Processing Platform
Production-style async job platform: FastAPI enqueues AI jobs via POST /jobs (returns 202 in <100ms), Celery 5 workers process them against PostgreSQL 16 + Redis 7 (broker + result backend), React 18 dashboard polls live status. Workers are stateless and horizontally scalable (make scale-workers N=5).
Role
Backend Engineer & Systems Designer
Team
Solo
Company/Organization
Personal Project
The Problem
Long-running AI inference tasks (document processing, embeddings, LLM calls) cannot execute inside synchronous HTTP request handlers — clients time...
Running AI work directly in FastAPI route handlers couples API latency to job duration — a 2-minute document analysis job holds a worker thread for...
No fault tolerance for transient failures: if an OpenAI API call fails mid-job, the entire request fails with no retry and no way to resume from...
No horizontal scaling path — synchronous handlers can't be replicated independently from the API layer, so scaling the system means over-provisioning...
No live visibility into job state for clients — polling a status endpoint requires the job state to be persisted somewhere durable, not just in...
The Solution
Built a decoupled producer-consumer architecture separating the API (job submission) from the workers (AI processing).
API Layer (FastAPI + Uvicorn)
app/routes/job_routes.py — 3 endpoints:
`POST /jobs` — Validates request (Pydantic), inserts pending row into PostgreSQL via job_service.py, enqueues `process_ai_job.delay(job_id)` Celery...
`GET /jobs` — Lists all jobs with current status and timestamps. Paginated.
`GET /jobs/{id}` — Returns single job: id, status, result JSON, error_message, created_at, updated_at.
app/routes/admin_routes.py — `GET /admin/stats` returns aggregated counts by status (pending, processing, completed, failed) for dashboard stats...
app/services/job_service.py — Business logic: create job row, fetch job, list jobs, update status. All DB operations use async SQLAlchemy...
app/core/database.py — Async SQLAlchemy 2 engine with asyncpg driver. `get_db()` async context manager for dependency injection into route...
app/models/job.py — SQLAlchemy ORM model: UUID primary key, input_file_path (str), status (Enum: pending/processing/completed/failed), result...
Worker Layer (Celery 5 + Redis 7)
app/worker/celery_app.py — Celery instance configured with CELERY_BROKER_URL (Redis db 0) and CELERY_RESULT_BACKEND (Redis db 1). Separate Redis...
app/worker/tasks.py — `process_ai_job` task:
1. Idempotency guard — Fetch job from PostgreSQL; if status is already `completed` or `failed`, return immediately (no-op). Safe to re-deliver.
2. Status update — SET status = processing, updated_at = now.
3. AI processing — Runs actual AI work (OpenAI call or simulated sleep if OPENAI_API_KEY not set). Accepts input_file_path as context.
4. Success path — SET status = completed, result = {output: '...'}, updated_at = now.
5. Failure path — On exception, SET status = failed, error_message = str(exc), updated_at = now. Re-raise for Celery retry.
6. Retry policy — `autoretry_for=(Exception,)`, `max_retries=3`, `retry_backoff=True`, `retry_backoff_max=60`, `retry_jitter=True`. Exponential...
Frontend (React 18 + Vite 7 + Axios)
src/api.js — Axios instance with baseURL pointing to `/api` proxy. Functions: `submitJob(inputFilePath)`, `getJob(id)`, `listJobs()`,...
src/App.jsx — Layout with stats bar (total/pending/processing/completed/failed counts, polls every 5s via getStats()). Renders JobForm + JobList.
src/components/JobForm.jsx — Controlled form with input_file_path field. On submit calls submitJob(), adds returned job to local state, clears...
src/components/JobList.jsx — Renders per-job cards. Each card with active status (pending/processing) starts a 3-second polling interval via...
vite.config.js — host 0.0.0.0 (Docker binding) + `/api` proxy to `http://api:8000` — eliminates CORS in development and Docker environments.
Horizontal Scaling
Workers are completely stateless — shared state is Redis broker + PostgreSQL. Run N replicas: `make scale-workers N=5`.
Docker Compose `scale` command starts N worker containers, all consuming from the same Celery queue.
Throughput scales linearly — 5 workers process ~5× as many jobs per minute as 1.
API and worker images are built from the same multi-stage Dockerfile, different CMD (uvicorn vs celery worker).
Monitoring (Flower)
Flower connects to Redis broker, provides real-time Celery UI at :5555.
Shows active tasks, queue depth, worker count, task success/failure rates, execution time histograms.
No code changes needed — reads directly from Celery's Redis metadata.
Load Testing (Locust)
load_test/locustfile.py — Simulates users submitting jobs and polling for results.
Locust UI at :8089; configure 100 users, 10/s spawn rate.
Validates sub-100ms submission latency and end-to-end job throughput under concurrent load.
Docker Compose Stack
`db` — PostgreSQL 16 (internal only, port 5432 not published to host)
`redis` — Redis 7 (internal only, port 6379 not published)
`api` — FastAPI (port 8000 published)
`worker` — Celery worker (no port, internal only)
`flower` — Flower UI (port 5555 published)
`frontend` — Vite dev server (port 5173 published)
`locust` (optional) — Locust load tester (port 8089)
All services on a shared internal Docker network.
DB and Redis unreachable from host — only accessible via the API.
Makefile Automation
make up / down / restart / rebuild, make logs / logs-api / logs-worker / logs-frontend, make status / health / stats, make scale-workers N=3, make...
Design Decisions
API returns 202 Accepted immediately after enqueuing — never blocks on AI work. Keeps p99 API latency under 100ms regardless of job duration. Clients...
Celery + Redis over FastAPI BackgroundTasks — BackgroundTasks run in the same process as the API (can't scale independently, die if the API crashes)....
Idempotency guard in tasks.py (check status before processing) — makes task re-delivery safe. Celery at-least-once delivery means a task could be...
Exponential backoff with jitter (retry_backoff=True, retry_jitter=True) — avoids thundering herd when multiple workers retry simultaneously after a...
Async SQLAlchemy 2 + asyncpg throughout API layer — non-blocking DB operations match FastAPI's async event loop. Avoids thread pool exhaustion from...
Separate Redis databases for broker (db 0) and result backend (db 1) — prevents Celery metadata from interfering with application cache keys. Easy to...
UUID primary key for jobs — globally unique, safe to generate client-side if needed, no sequential ID enumeration attack surface.
Stateless workers with all state in PostgreSQL — workers can be killed and restarted without data loss. Job state always recoverable from DB. Enables...
Flower for Celery monitoring over custom solution — zero additional code, connects directly to Redis broker, provides production-grade queue...
Locust load testing included in the repo — load_test/locustfile.py ships with the project so performance characteristics are testable by anyone who...
Tradeoffs & Constraints
3-second polling interval for status updates — simple and reliable but not real-time. WebSocket or SSE would reduce latency to near-zero but adds...
Celery at-least-once delivery — tasks may execute more than once on broker failure. Idempotency guard handles this for completed/failed jobs but adds...
No job cancellation — once a task is enqueued, there is no cancel endpoint. Would require Celery task revocation (celery.control.revoke) + status...
Flower and /admin/stats are unauthenticated — acceptable for local/private environments, must be placed behind auth middleware or IP allowlist before...
PostgreSQL for job state over Redis — adds a persistent store dependency but provides durable job history, queryable by status/date, and ACID...
Simulated AI mode (no OPENAI_API_KEY) — workers sleep to simulate processing. Useful for local development and CI but doesn't test actual LLM latency...
Would improve: Add WebSocket/SSE for real-time status push, job cancellation endpoint, per-job priority queues (separate Celery queues for high/low...
Outcome & Impact
Sub-100ms API response times for job submission (POST /jobs returns 202 Accepted immediately after DB insert + Celery enqueue, before any AI...
Live status transitions (pending → processing → completed/failed) visible in React dashboard via 3-second polling — no page refresh, per-job polling...
Horizontal worker scaling validated: make scale-workers N=5 starts 5 independent Celery workers sharing Redis broker and PostgreSQL state, processing...
Idempotent task processing — re-delivering a completed or failed job ID is a no-op, safe under Celery at-least-once delivery semantics.
Exponential backoff with jitter retries (up to 3 attempts, 2-60s delay with jitter) handles transient AI API failures without thundering herd.
Flower real-time Celery dashboard at :5555 — active tasks, queue depth, worker count, task success/failure rates, execution time histograms, zero...
Locust load tests (100 users, 10/s spawn) validate throughput and p99 latency under concurrent submissions. load_test/locustfile.py ships with the...
One-command stack startup (make up) and full Makefile automation for logs, health checks, scaling, load testing, and cleanup.
Tech Stack
API: Python 3.11, FastAPI, Uvicorn (ASGI server)
Task Queue: Celery 5 (distributed task processing, exponential backoff retries, idempotency)
Message Broker: Redis 7 (Celery broker on db 0, result backend on db 1)
Database: PostgreSQL 16, SQLAlchemy 2 async ORM, asyncpg driver
Frontend: React 18, Vite 7 (dev server + /api proxy), Axios (HTTP client)
Monitoring: Flower (real-time Celery queue + worker UI)
Load Testing: Locust (concurrent job submission + polling simulation)
Orchestration: Docker Compose (6-service stack — api, worker, db, redis, flower, frontend)
Automation: Makefile (up/down/scale/logs/health/load-test commands)