Back to Projects

Async AI Job Processing Platform

Production-style async job platform: FastAPI enqueues AI jobs via POST /jobs (returns 202 in <100ms), Celery 5 workers process them against PostgreSQL 16 + Redis 7 (broker + result backend), React 18 dashboard polls live status. Workers are stateless and horizontally scalable (make scale-workers N=5).

Python 3.11FastAPIUvicornCelery 5Redis 7PostgreSQL 16SQLAlchemy 2 (async)asyncpgReact 18Vite 7AxiosFlowerLocustDockerDocker ComposeMakefile

Role

Backend Engineer & Systems Designer

Team

Solo

Company/Organization

Personal Project

The Problem

Long-running AI inference tasks (document processing, embeddings, LLM calls) cannot execute inside synchronous HTTP request handlersclients time...

Running AI work directly in FastAPI route handlers couples API latency to job durationa 2-minute document analysis job holds a worker thread for...

No fault tolerance for transient failures: if an OpenAI API call fails mid-job, the entire request fails with no retry and no way to resume from...

No horizontal scaling pathsynchronous handlers can't be replicated independently from the API layer, so scaling the system means over-provisioning...

No live visibility into job state for clientspolling a status endpoint requires the job state to be persisted somewhere durable, not just in...

The Solution

Built a decoupled producer-consumer architecture separating the API (job submission) from the workers (AI processing).

API Layer (FastAPI + Uvicorn)

app/routes/job_routes.py — 3 endpoints:

`POST /jobs`Validates request (Pydantic), inserts pending row into PostgreSQL via job_service.py, enqueues `process_ai_job.delay(job_id)` Celery...

`GET /jobs`Lists all jobs with current status and timestamps. Paginated.

`GET /jobs/{id}`Returns single job: id, status, result JSON, error_message, created_at, updated_at.

app/routes/admin_routes.py — `GET /admin/stats` returns aggregated counts by status (pending, processing, completed, failed) for dashboard stats...

app/services/job_service.py — Business logic: create job row, fetch job, list jobs, update status. All DB operations use async SQLAlchemy...

app/core/database.py — Async SQLAlchemy 2 engine with asyncpg driver. `get_db()` async context manager for dependency injection into route...

app/models/job.py — SQLAlchemy ORM model: UUID primary key, input_file_path (str), status (Enum: pending/processing/completed/failed), result...

Worker Layer (Celery 5 + Redis 7)

app/worker/celery_app.py — Celery instance configured with CELERY_BROKER_URL (Redis db 0) and CELERY_RESULT_BACKEND (Redis db 1). Separate Redis...

app/worker/tasks.py — `process_ai_job` task:

1. Idempotency guard — Fetch job from PostgreSQL; if status is already `completed` or `failed`, return immediately (no-op). Safe to re-deliver.

2. Status update — SET status = processing, updated_at = now.

3. AI processing — Runs actual AI work (OpenAI call or simulated sleep if OPENAI_API_KEY not set). Accepts input_file_path as context.

4. Success path — SET status = completed, result = {output: '...'}, updated_at = now.

5. Failure path — On exception, SET status = failed, error_message = str(exc), updated_at = now. Re-raise for Celery retry.

6. Retry policy — `autoretry_for=(Exception,)`, `max_retries=3`, `retry_backoff=True`, `retry_backoff_max=60`, `retry_jitter=True`. Exponential...

Frontend (React 18 + Vite 7 + Axios)

src/api.js — Axios instance with baseURL pointing to `/api` proxy. Functions: `submitJob(inputFilePath)`, `getJob(id)`, `listJobs()`,...

src/App.jsx — Layout with stats bar (total/pending/processing/completed/failed counts, polls every 5s via getStats()). Renders JobForm + JobList.

src/components/JobForm.jsx — Controlled form with input_file_path field. On submit calls submitJob(), adds returned job to local state, clears...

src/components/JobList.jsx — Renders per-job cards. Each card with active status (pending/processing) starts a 3-second polling interval via...

vite.config.js — host 0.0.0.0 (Docker binding) + `/api` proxy to `http://api:8000` — eliminates CORS in development and Docker environments.

Horizontal Scaling

Workers are completely statelessshared state is Redis broker + PostgreSQL. Run N replicas: `make scale-workers N=5`.

Docker Compose `scale` command starts N worker containers, all consuming from the same Celery queue.

Throughput scales linearly5 workers process ~5× as many jobs per minute as 1.

API and worker images are built from the same multi-stage Dockerfile, different CMD (uvicorn vs celery worker).

Monitoring (Flower)

Flower connects to Redis broker, provides real-time Celery UI at :5555.

Shows active tasks, queue depth, worker count, task success/failure rates, execution time histograms.

No code changes neededreads directly from Celery's Redis metadata.

Load Testing (Locust)

load_test/locustfile.py — Simulates users submitting jobs and polling for results.

Locust UI at :8089; configure 100 users, 10/s spawn rate.

Validates sub-100ms submission latency and end-to-end job throughput under concurrent load.

Docker Compose Stack

`db`PostgreSQL 16 (internal only, port 5432 not published to host)

`redis`Redis 7 (internal only, port 6379 not published)

`api`FastAPI (port 8000 published)

`worker`Celery worker (no port, internal only)

`flower`Flower UI (port 5555 published)

`frontend`Vite dev server (port 5173 published)

`locust` (optional)Locust load tester (port 8089)

All services on a shared internal Docker network.

DB and Redis unreachable from hostonly accessible via the API.

Makefile Automation

make up / down / restart / rebuild, make logs / logs-api / logs-worker / logs-frontend, make status / health / stats, make scale-workers N=3, make...

Design Decisions

API returns 202 Accepted immediately after enqueuingnever blocks on AI work. Keeps p99 API latency under 100ms regardless of job duration. Clients...

Celery + Redis over FastAPI BackgroundTasksBackgroundTasks run in the same process as the API (can't scale independently, die if the API crashes)....

Idempotency guard in tasks.py (check status before processing)makes task re-delivery safe. Celery at-least-once delivery means a task could be...

Exponential backoff with jitter (retry_backoff=True, retry_jitter=True)avoids thundering herd when multiple workers retry simultaneously after a...

Async SQLAlchemy 2 + asyncpg throughout API layernon-blocking DB operations match FastAPI's async event loop. Avoids thread pool exhaustion from...

Separate Redis databases for broker (db 0) and result backend (db 1)prevents Celery metadata from interfering with application cache keys. Easy to...

UUID primary key for jobsglobally unique, safe to generate client-side if needed, no sequential ID enumeration attack surface.

Stateless workers with all state in PostgreSQLworkers can be killed and restarted without data loss. Job state always recoverable from DB. Enables...

Flower for Celery monitoring over custom solutionzero additional code, connects directly to Redis broker, provides production-grade queue...

Locust load testing included in the repoload_test/locustfile.py ships with the project so performance characteristics are testable by anyone who...

Tradeoffs & Constraints

3-second polling interval for status updatessimple and reliable but not real-time. WebSocket or SSE would reduce latency to near-zero but adds...

Celery at-least-once deliverytasks may execute more than once on broker failure. Idempotency guard handles this for completed/failed jobs but adds...

No job cancellationonce a task is enqueued, there is no cancel endpoint. Would require Celery task revocation (celery.control.revoke) + status...

Flower and /admin/stats are unauthenticatedacceptable for local/private environments, must be placed behind auth middleware or IP allowlist before...

PostgreSQL for job state over Redisadds a persistent store dependency but provides durable job history, queryable by status/date, and ACID...

Simulated AI mode (no OPENAI_API_KEY)workers sleep to simulate processing. Useful for local development and CI but doesn't test actual LLM latency...

Would improve: Add WebSocket/SSE for real-time status push, job cancellation endpoint, per-job priority queues (separate Celery queues for high/low...

Outcome & Impact

Sub-100ms API response times for job submission (POST /jobs returns 202 Accepted immediately after DB insert + Celery enqueue, before any AI...

Live status transitions (pending → processing → completed/failed) visible in React dashboard via 3-second pollingno page refresh, per-job polling...

Horizontal worker scaling validated: make scale-workers N=5 starts 5 independent Celery workers sharing Redis broker and PostgreSQL state, processing...

Idempotent task processingre-delivering a completed or failed job ID is a no-op, safe under Celery at-least-once delivery semantics.

Exponential backoff with jitter retries (up to 3 attempts, 2-60s delay with jitter) handles transient AI API failures without thundering herd.

Flower real-time Celery dashboard at :5555active tasks, queue depth, worker count, task success/failure rates, execution time histograms, zero...

Locust load tests (100 users, 10/s spawn) validate throughput and p99 latency under concurrent submissions. load_test/locustfile.py ships with the...

One-command stack startup (make up) and full Makefile automation for logs, health checks, scaling, load testing, and cleanup.

Tech Stack

API: Python 3.11, FastAPI, Uvicorn (ASGI server)

Task Queue: Celery 5 (distributed task processing, exponential backoff retries, idempotency)

Message Broker: Redis 7 (Celery broker on db 0, result backend on db 1)

Database: PostgreSQL 16, SQLAlchemy 2 async ORM, asyncpg driver

Frontend: React 18, Vite 7 (dev server + /api proxy), Axios (HTTP client)

Monitoring: Flower (real-time Celery queue + worker UI)

Load Testing: Locust (concurrent job submission + polling simulation)

Orchestration: Docker Compose (6-service stackapi, worker, db, redis, flower, frontend)

Automation: Makefile (up/down/scale/logs/health/load-test commands)

Back to Projects