Calafai Docs

Engine Architecture

Engine Architecture

The Groundtruth engine is a Python 3.12 application that orchestrates AI consulting crews using CrewAI. It runs as a persistent FastAPI service on Railway, called by the Next.js web app via HTTP.

Overview

  • 30 core modules + 3 storage modules + 6 route modules + 9 tool modules — approximately 10,000+ lines of code
  • Key design principle: everything routes through StorageAdapter (no direct filesystem access) and LLMRouter (no module-level LLM calls)
  • Location: packages/engine/

Module Organization

CategoryModulesPurpose
Core Executionapi, runner, dag, router, model_selector, costs, queue_pollerRun lifecycle, task orchestration, LLM dispatch, queue processing
Quality & Reviewobserver, red_team, citation_verifier, routing_judge, libraryScoring, adversarial review, source verification, tier classification
Context & Knowledgecontext, insights, stateContext injection, cross-task knowledge, shared state
Session Workflowsbrief_researcher, task_planner, podcast_generator, agent_researcher, session_managerInteractive pre/post-execution workflows
Utilitiesencryption, attachment_parser, rss_ingester, metadata_extractor, cost_tracking, errors, models, model_resolver, temporal, simulationShared infrastructure, model validation, temporal context, dry-run
Agent Toolstools/web_search, tools/academic_search, tools/excel_builder, tools/ideogram, tools/gpt_image, tools/github_repo, tools/vercel_deploy, tools/playwright_eval, tools/vision_evaluateCrewAI tool implementations
API Routesroutes/runs, routes/sessions, routes/analysis, routes/attachments, routes/admin, routes/queueFastAPI endpoint modules
Storage Layerstorage/adapter, storage/database, storage/filesystemAbstract storage interface + implementations

Core Execution

api.py — FastAPI Server

The HTTP interface for the engine. Exposes all endpoints that the Next.js web app calls. In production, the route definitions are decomposed into the API Routes modules — api.py mounts these routers and handles middleware, CORS, and Sentry integration.

Endpoints:

MethodPathPurposeRoute Module
GET/healthService health checkadmin
POST/runs/startStart an engagement runruns
GET/runs/{slug}/statusGet current run statusruns
POST/runs/{slug}/stopStop a running engagementruns
POST/runs/{slug}/pausePause a running engagementruns
POST/runs/{slug}/resumeResume a paused engagementruns
GET/runs/{slug}/logsGet run log outputruns
GET/runs/{slug}/streamSSE stream of real-time updatesruns
GET/runs/{slug}/deliverablesList deliverables produced by a runruns
POST/pattern-analysisTrigger cross-engagement pattern analysisanalysis
GET/model-performanceModel performance dashboardanalysis
GET/cost-breakdownPer-activity cost breakdownanalysis
POST/sessions/brief-research/*Brief research workflowsessions
POST/sessions/task-planner/*Task planning workflowsessions
POST/sessions/podcast/*Podcast generationsessions
POST/sessions/agent-research/*Agent research workflowsessions
POST/attachments/parse-and-summarizeAttachment parsingattachments
POST/admin/rss-ingestRSS feed ingestionadmin
POST/admin/import-insightsInsights importadmin

All endpoints require an X-Tenant-ID header for tenant scoping. The middleware tags Sentry events with the tenant ID for debugging.

Run Registry: Active runs are tracked in an in-memory dictionary keyed by {tenant_id}:{engagement_slug}. This means a single Railway instance manages all concurrent runs. The stateless design ensures no local filesystem state persists between deploys.

runner.py — Engagement Runner

Manages the full lifecycle of an engagement run:

  1. Load engagement configuration from the database (via StorageAdapter)
  2. Build the CrewAI crew with the configured agents and tasks
  3. Execute the crew (delegating to dag.py for parallel execution)
  4. Persist results (costs, token usage, deliverables) back to the database
  5. Run post-execution quality scoring (via observer.py)

The runner maintains run state (status, timing, costs) in memory during execution and persists snapshots to the database for recovery and reporting. It also handles BYOK key injection, alert-based task insertion, and agent activity event emission for the Mission Control UI.

dag.py — DAG Engine

Parallel task execution based on a dependency graph. Tasks without dependencies run concurrently; tasks with dependencies wait for their upstream tasks to complete.

Key capabilities:

  • Parallel execution: Independent tasks run simultaneously, reducing total run time
  • Re-runs: Individual tasks can be re-run without re-executing the entire engagement
  • Dependency resolution: Automatically determines execution order from the task graph
  • Red-team integration: Final deliverables pass through adversarial review before completion
  • DAG state events: Pushes dag.updated SSE events on every state transition for real-time UI updates

The DAG engine reads task output text directly (not file paths) and passes it to the observer for quality scoring.

router.py — LLM Router

Class-based multi-provider LLM dispatch. Routes each request to the appropriate model based on the agent's assigned tier.

Model Tiers:

TierModelProviderUse Case
strategygrok-4-0709xAIC-suite strategic analysis
writingclaude-sonnet-4-6AnthropicCommunications, storytelling
fullstackclaude-sonnet-4-6AnthropicProduct, engineering, research
analyticalgpt-4.1-nanoOpenAIData analysis, scoring, reviews
codeqwen3-coderOllama (local)Code generation
simpleqwen3:8bOllama (local)Basic support tasks
multimodalgemini-2.5-flash-liteGoogleImage/document analysis
managerclaude-sonnet-4-6AnthropicCrewAI manager agent
deepseekdeepseek-chatDeepSeekDeepSeek-specific tasks
mistralmistral-small-latestMistralMistral-specific tasks

The router is instantiated as a class (not module-level functions), making it independently testable. Each test can create its own router instance with mock providers. Supports BYOK key overrides via api_key_overrides parameter. Includes retry-once with 5s backoff on transient API errors in call_llm().

Production note: code and simple tiers fall back to gpt-4.1-nano on Railway (no Ollama available).

model_selector.py — Adaptive Model Selection

Implements a 4-layer decision process for choosing the optimal model for each task:

  1. Static defaults: Tier-based model assignment from the router configuration
  2. Rubric mapping: Task-specific rubrics influence model choice (e.g., creative tasks prefer writing-tier models)
  3. Historical performance: Past TaskScore data informs which models perform best for similar tasks
  4. Epsilon-greedy exploration: Occasionally selects a non-optimal model to discover better options (configurable exploration rate)

Decisions and outcomes are persisted to the ModelRoutingDecision table, creating a feedback loop that improves model selection over time.

costs.py — Cost Tracker

Pure in-memory cost tracking per run. Monitors token usage and estimated costs in real time during execution.

Key features:

  • Budget circuit breaker: Halts execution immediately when the estimated cost exceeds the engagement's budget limit. Prevents runaway spend.
  • Per-model pricing: Tracks costs based on each model's per-token pricing (input and output separately)
  • No persistence: The cost tracker itself holds no state between runs. The runner persists final cost data to the database via StorageAdapter.
  • Transient error classification: Classifies xAI finish_reason errors as transient (retryable) via _TRANSIENT_ERROR_PATTERNS.

queue_poller.py — Run Queue Poller

Background daemon thread that claims queued runs and starts them. Started automatically on application boot in api.py. Polls the RunQueue table every 5 seconds (configurable via QUEUE_POLL_INTERVAL_SECS) for entries ready to process.

Key capabilities:

  • Priority-based scheduling: Claims the highest-priority queued entry first, using SQL ORDER BY priority DESC
  • Anti-starvation: Lower-priority entries gain +1 effective priority per 10 minutes in queue, preventing Enterprise runs from permanently starving Starter runs
  • Tenant concurrency limits: Respects per-plan limits (Starter: 1, Professional: 3, Enterprise: 5 concurrent runs)
  • Platform capacity gating: Checks MAX_CONCURRENT_RUNS before claiming any entry
  • Stuck run detection: Entries in processing state with heartbeats older than 5 minutes are reset to queued with a retry counter. After 3 retries, marked as failed
  • Connection pooling: Borrows connections from the shared psycopg2 pool rather than opening new connections each cycle

Key functions: start_queue_poller() (daemon thread launcher), _poll_once() (single poll cycle), _handle_stuck_runs() (stale heartbeat detection).


Quality & Review

observer.py — Task Observer

Post-task quality scoring using GPT-4o-mini. After each task completes, the observer evaluates the output on rubric-specific criteria.

Returns:

  • Score (1-10): Overall quality rating
  • Insights: What the task output reveals or does well
  • Issues: Identified problems or weaknesses
  • Rerun recommendation: Whether the task should be re-executed for better results

The observer takes output_text directly as a parameter (not a file path), so it works identically with both storage adapter implementations. Defaults to score=5 (rerun recommended) on API failure instead of a phantom 7/10 pass.

red_team.py — Red Team Reviewer

Adversarial review of final deliverables. A separate LLM call reviews each deliverable from a critical perspective, looking for:

  • Logical inconsistencies
  • Unsupported claims
  • Missing perspectives
  • Potential risks in the recommendations

Red team feedback is attached to the deliverable for the client to review alongside the primary output.

citation_verifier.py — Citation Verification Pipeline

Programmatic citation verification (Tier 3 of the source verification system). Runs automated extraction, existence checking, and claim-source alignment independently of agent behavior.

4-stage pipeline:

  1. Extraction: Regex-based fast extraction + optional LLM-based thorough extraction of citations from deliverable text
  2. Verification: Cascading academic search (Semantic Scholar → OpenAlex → CrossRef → DuckDuckGo) to confirm source existence
  3. Alignment: LLM-based claim-source alignment checking — does the cited source actually support the claim being made?
  4. Reporting: Structured markdown report with statistics, critical findings, and tier classification

Key classes: CitationExtractor, CitationVerifier, ClaimAlignmentChecker, BibliographyVerifier (orchestrator).

Critical findings are flagged when T4 sources (unverifiable) drive major recommendations.

routing_judge.py — LLM-as-Judge Tier Classifier

Uses gpt-4.1-nano as a routing judge to classify which LLM tier best fits a task. Replaces fragile keyword matching with an LLM call that costs ~$0.00003/classification (~200 input tokens, ~30 output tokens).

Two modes:

ModeFunctionWhen Used
Tier classificationclassify_task_tier()Bootstrap mode (<50 scored routing decisions), then demotes to re-run-only
Incumbent challengeshould_challenge_incumbent()Historical mode ~8% of the time when fingerprint data is available

The tier classifier receives the task description, agent role, and available tiers (with model names and pricing), then returns a (tier, reasoning) tuple. The challenge mode examines an incumbent model's known strengths and weaknesses against task requirements to decide if a challenger model might perform better.

JSON response parsing includes fallback regex extraction. Invalid tiers are rejected — the judge can only select from the agent's configured options. Cost is tracked via CostTracker and ActivityCost records.

library.py — Library Curator

Post-run analysis and pattern extraction. After an engagement completes, the library curator:

  1. Generates a run report summarizing execution (timing, costs, quality scores)
  2. Produces an operations review with improvement recommendations
  3. Creates a library entry — a structured record of what was learned
  4. Updates the organization-wide insights index

The PatternAnalyzer class performs cross-engagement analysis (within a tenant's data) to identify recurring patterns, common issues, and improvement opportunities.


Context & Knowledge

context.py — Context Windowing

Builds the context injection pipeline for each task. Assembles context from multiple sources:

  1. Engagement brief: The client's initial requirements and background
  2. Formatting directive: Standard markdown structure rules (heading hierarchy, data presentation, readability) injected into every task to ensure consistent deliverable formatting
  3. Design adherence: Slider-controlled guidance on how closely to follow reference materials
  4. Attachment context index: Auto-generated summaries of uploaded documents
  5. Cross-engagement insights: Anonymized patterns from previous runs (platform-level, not cross-tenant)
  6. Prior task outputs: Results from upstream tasks in the DAG

The formatting directive (FORMATTING_DIRECTIVE constant) adds 350 tokens per task ($0.001 cost) and instructs agents to use H1 for document titles only, H2 for major sections, tables for comparative data, and to maintain a clear executive-to-detail hierarchy.

The context module takes manifest as a list[dict] parameter (not a filesystem path), keeping it independent of the storage backend.

insights.py — Insight Store

Thread-safe shared knowledge bus per engagement. During execution, agents can deposit insights that other agents pick up in subsequent tasks.

This enables emergent collaboration: an early research task might surface a finding that a later strategy task incorporates, even though the two agents were not explicitly configured to share that information.

state.py — Shared Engine State

Process-local state registry and factory helpers. Holds the active runs dictionary, shared LLM router singleton, and storage adapter factory. Used by route modules to access shared state without circular imports.

Key exports:

  • active_runs: Dict[str, EngagementRun] keyed by {tenant_id}:{engagement_slug}, protected by runs_lock
  • get_storage(tenant_id): Factory that creates a DatabaseStorageAdapter scoped to the given tenant
  • get_router(): Lazy-loaded singleton LLMRouter instance

Session Workflows

These modules manage interactive, multi-turn workflows that run before or after crew execution. Each creates a "session" via the shared SessionRegistry that maintains state across multiple API calls.

session_manager.py — Session Registry

Generic thread-safe session registry with TTL-based cleanup. Eliminates boilerplate across all session modules.

  • SessionRegistry[T]: Generic class parameterized by session type
  • Tenant-scoped keying: {tenant_id}:{identifier}
  • Thread-safe get(), put(), remove() operations
  • cleanup_stale() removes sessions older than a configurable TTL

Used by: brief_researcher, task_planner, podcast_generator, agent_researcher.

brief_researcher.py — Brief Research

LLM-powered pre-kickoff brief refinement through multi-turn conversation. Analyzes engagement briefs, asks clarifying questions via a chat interface, and reformats the refined brief into a comprehensive, token-efficient prompt before crew execution.

Workflow: Start session → Multi-turn chat → Finalize → Save refined brief as deliverable

Key class: BriefResearchSession — manages conversation state, messages, and transcript saving. Uses "analytical" tier LLM calls and tracks cost per interaction.

task_planner.py — Task Pipeline Planner

LLM-powered engagement task pipeline generation. Analyzes the engagement brief and available agent roster, then generates a tailored multi-phase task pipeline for user approval before crew execution.

Workflow: Start session → Generate tasks (with optional feedback) → Approve → Save to engagement

Key class: TaskPlannerSession — generates cost-aware task arrays (tasks cost ~$0.40-0.80 each), scales phases by budget (1-4 phases), supports regeneration with user feedback, and validates JSON output. Supports template-based planning for engagement templates.

podcast_generator.py — Podcast Generator

SCIPA brief synthesis + optional two-speaker podcast audio. Synthesizes all engagement deliverables into a SCIPA-structured document (Situation, Complication, Implication, Proposal, Ask, Benefit) suitable for Google NotebookLM or OpenAI TTS.

Workflow: Start generation → Background thread produces SCIPA brief → Generate dialogue script → Render audio via OpenAI TTS → Save as binary deliverable

Key class: PodcastSession — manages async generation with phase tracking (generating_brief → generating_audio → complete). Uses "writing" tier for SCIPA and dialogue generation. Supports BYOK OpenAI key for TTS.

agent_researcher.py — Agent Research & Creation

4-phase agent creation pipeline: discovery through conversation, job board research, profile synthesis, and agent creation with overlap detection.

Workflow: Discovery (chat) → Research (DuckDuckGo job boards) → Synthesis (profile generation) → Create (with overlap check against existing roster)

Key class: AgentResearchSession — manages the full 4-phase workflow. Uses DuckDuckGo-based web search for job postings and LLM-driven JSON profile extraction. Checks for overlap with existing agents before creating a new DynamicAgent.


Utilities

encryption.py — BYOK Key Encryption

Fernet (AES-128-CBC + HMAC) encryption for tenant BYOK API keys. Provides encrypt_api_key() and decrypt_api_key() helpers.

Requires the LLM_KEY_ENCRYPTION_KEY environment variable. Validates key format at startup via validate_encryption_key(). Security-critical — handles plaintext API keys for 6 providers (OpenAI, Anthropic, xAI, Google, DeepSeek, Mistral).

attachment_parser.py — Multi-Format File Parser

Parses uploaded files and generates LLM summaries. Supports PDF (pdfplumber), XLSX (openpyxl), DOCX (python-docx), PPTX (python-pptx), CSV, images (PIL), JSON, and plain text.

Main entry point: parse_and_summarize() — returns {parsed, summary, meta, error}. Full text extraction is capped at 100KB. Uses "analytical" tier LLM for summarization (~$0.00025/doc). Extracts format-specific metadata (page count, sheet names, column count, image dimensions, EXIF data).

rss_ingester.py — RSS Feed Ingestion

RSS 2.0 and Atom feed parser. Fetches feeds, extracts articles, and stores as library entries via StorageAdapter.

Key safety feature: SSRF validation — blocks private IP ranges before fetching. Falls back to Wayback Machine for dead links. Caps at 20 entries per feed. Uses stdlib urllib.request (no external HTTP library).

metadata_extractor.py — Deliverable Metadata Extractor

Deterministic post-processor for extracting structured metadata from deliverable content. Zero LLM calls — pure regex.

Extracts:

  • BLUF (Bottom Line Up Front) from <!-- BLUF: ... --> HTML comment markers (with fallback to first paragraph after H1)
  • Confidence levels from <!-- CONFIDENCE: high|medium|low|verify --> markers
  • Heading structure with line ranges and nesting
  • Sources section and next steps section detection

Main entry point: extract_metadata() — returns (cleaned_content, metadata_dict) with markers stripped from the clean content.

cost_tracking.py — Activity Cost Tracking

Utility for extracting token counts from LLM responses and persisting ActivityCost records asynchronously.

  • extract_llm_usage(): Extracts {model, input_tokens, output_tokens, cost} from LangChain response metadata
  • track_activity_cost(): Extract and persist cost in a fire-and-forget daemon thread

Called by every module that makes LLM calls — the centralized cost attribution point for the platform.

errors.py — Error Hierarchy

Structured exception types so API routes can map failures to appropriate HTTP status codes.

ExceptionHTTP StatusWhen
EngineError500Base class for all engine errors
LLMError502Provider returned error or unusable output
StorageError500Database/storage operation failed
BudgetExceededError402Engagement budget limit reached
SessionNotFoundError404Requested session not found
EngagementNotFoundError404Engagement not found
ConfigurationError500Engine config is invalid

model_resolver.py — Self-Healing Model Resolution

Validates model names at startup by probing provider APIs and resolves invalid models at runtime via a cascading fallback strategy. Zero impact on the happy path — resolution only fires when a provider returns a model-not-found error.

Resolution cascade:

  1. Check cache — previously resolved models are returned immediately (1-hour TTL)
  2. Strip date suffix — e.g., claude-sonnet-4-6-20250217claude-sonnet-4-6
  3. Tier fallback chain — ordered list of (provider, model) alternatives per tier
  4. Terminal fallbackgpt-4.1-nano as the last resort

Key class: ModelResolver — instantiated with the router's model_names dict. validate_at_startup() probes OpenAI, Anthropic, and xAI model list endpoints (skips Ollama, Gemini, DeepSeek, Mistral). Returns a validation summary dict per tier showing model, provider, validity, and any auto-resolution applied.

Providers are guessed from model name prefixes (claude- → Anthropic, gpt- → OpenAI, grok- → xAI). Non-blocking — all exceptions are caught, boot is never prevented.

temporal.py — Temporal Context Injection

Provides current-date awareness and AI-era timeline calibration directives injected into all LLM prompts. Without this, LLMs default to their training cutoff and produce outdated references.

Two directives:

FunctionPurposeToken Cost
temporal_context()Injects today's date (UTC) and instructs the LLM to prefer recent data and not cite pre-previous-year sources as "current"~50 tokens
timeline_calibration()Instructs agents to present dual timelines (traditional pre-AI vs AI-enabled) for any roadmaps or effort estimates~200 tokens

The timeline calibration directive ensures agents present both a traditional human-team estimate and an AI-accelerated estimate (typically 5-20% of traditional), helping clients see the acceleration AI provides. Injected via the context windowing pipeline in context.py.

simulation.py — Dry-Run Simulation

Zero-cost pipeline verification mode. Generates template content and cost estimates so dry-run executions exercise the full persistence pipeline (deliverables, metadata extraction, observer scoring) without making any LLM API calls.

Two functions:

FunctionPurpose
generate_simulation_content()Produces 300-500 word markdown tagged with [SIMULATION] banner. Includes structural markers (BLUF, CONFIDENCE, Sources, Next Steps) that exercise the metadata_extractor
estimate_run_cost()Projects per-task costs using MODEL_PRICING and AGENT_LLM_OPTIONS, plus overhead for observer, red-team, bibliography, and ops review operations

The cost estimator uses fixed token assumptions per task (2,000 input / 1,500 output) and per overhead operation. Returns a detailed breakdown with per-task costs, overhead subtotals, total estimated cost, required agents, and required providers. Filters out disabled tasks automatically.

models.py — Model Name Definitions

Single source of truth for tier-to-model mappings. Imported by router.py (LLM dispatch) and costs.py (pricing lookups).

Key constants:

  • DEFAULT_MODEL_NAMES: Dict mapping tiers to default models
  • FALLBACK_CAPABLE: Fallback for premium tiers (gpt-4.1)
  • FALLBACK_MIDTIER: Fallback for utility tiers (gpt-4.1-mini)
  • FALLBACK_ANALYTICAL: Non-OpenAI escape hatch (claude-haiku-4-5-20251001)

Agent Tools

CrewAI tool implementations that agents can invoke during execution. Each is a subclass of CrewAI's BaseTool with rate limiting per engagement.

tools/web_search.py — Web Search & Fetch

DuckDuckGo-based web search and page text extraction for source verification.

  • WebSearchTool: Rate-limited search (max 100 per engagement). Returns results with source tier labels.
  • WebFetchTool: Rate-limited page fetch (max 30 per engagement). Strips scripts, styles, and HTML tags.
  • classify_source_tier(): Domain-to-tier classification (T1-T4) using allowlist from source_tiers.json.
  • _check_url_status(): HEAD request for URL existence verification.

tools/academic_search.py — Academic Search APIs

Free academic search APIs for citation verification. Three providers, no API keys required.

  • SemanticScholarTool: 200M+ papers. Returns title, authors, year, citation count, abstract, DOI.
  • CrossRefTool: 150M+ DOIs. Uses polite pool header for higher rate limits.
  • OpenAlexTool: 250M+ works. Broadest coverage, includes open access URLs.

Rate limits: max 30 queries, max 20 lookups per engagement.

tools/excel_builder.py — Excel Model Builder

Generates .xlsx financial models from JSON specifications. Agents output a JSON spec; the tool converts it to a workbook with openpyxl.

Key feature: Assumptions tab — highlighted in yellow with 5 columns (Variable, Estimated Value, Source, Your Actual, Effective Value). Generates formulas that reference the Effective Value column, allowing clients to override assumptions and see model results update automatically.

tools/ideogram.py — Ideogram Image Generator

Generates logos, brand imagery, and design assets via the Ideogram 3.0 API. Used by design department agents (brand_designer, design_system_architect).

  • Model: Ideogram V_2A
  • Cost: ~$0.06 per image
  • Rate limit: 20 generations per engagement
  • Styles: AUTO, DESIGN, REALISTIC, RENDER_3D, ANIME
  • Aspect ratios: 1:1, 16:9, 9:16, 4:3, 3:4
  • Output: 1-8 images per generation, URLs saved as external resources via StorageAdapter
  • Env var: IDEOGRAM_API_KEY

Costs are tracked via CostTracker.record_api_cost(). Uses stdlib urllib.request for HTTP.

tools/gpt_image.py — GPT Image Generator

Generates images via OpenAI's DALL-E 3 API. Complements the Ideogram tool — better for illustrations and conceptual imagery.

  • Cost: $0.04 (standard 1024x1024) to $0.12 (HD 1792x1024)
  • Rate limit: 20 generations per engagement
  • Sizes: 1024x1024, 1792x1024, 1024x1792
  • Quality: standard or HD
  • Note: DALL-E 3 generates 1 image per API request, so num_images=4 results in 4 sequential requests
  • Env var: Uses existing OPENAI_API_KEY

tools/github_repo.py — GitHub Repository Manager

Creates GitHub repositories and pushes multi-file codebases atomically. Used by prototype_engineer and ui_engineer agents to deploy working code.

Two actions:

ActionPurpose
create_repoCreates a public repo under the configured org with auto-init (README)
push_filesAtomic multi-file commit via Git tree API (create blobs → assemble tree → create commit → update ref)
  • Rate limit: 10 operations per engagement
  • Env vars: GITHUB_PAT (personal access token with repo scope), GITHUB_ORG
  • Output: Repository URL, commit SHA, saved as external resource

tools/vercel_deploy.py — Vercel Deployment Manager

Creates Vercel projects linked to GitHub repositories and triggers deployments with status polling.

Three actions:

ActionPurpose
create_projectLinks a GitHub repo to a new Vercel project (auto-deploy on push)
deployTriggers a production deployment, polls every 10s for up to 120s until READY or ERROR
get_statusChecks a deployment's current state
  • Rate limit: 10 operations per engagement
  • Env vars: VERCEL_TOKEN, VERCEL_TEAM_ID
  • Output: Live deployment URL saved as external resource when status = READY

tools/playwright_eval.py — Screenshot & Page Evaluator

Captures screenshots of deployed prototypes at multiple viewports using headless Chromium via Playwright. Used by ux_evaluator and design_researcher agents.

  • Viewports: desktop (1440x900), tablet (768x1024), mobile (375x812) — all at 2x (Retina) scale
  • Rate limit: 50 screenshots per engagement
  • Metrics collected: HTTP status, page load time (ms), console errors/warnings
  • Output: PNG screenshots saved as binary deliverables via write_binary_deliverable()
  • Dependency: playwright + Chromium browser

Supports full-page scrolling captures or viewport-only. Console errors are collected and reported (up to 10 shown).

tools/vision_evaluate.py — Vision UX Evaluator

Sends screenshots to a multimodal LLM (Gemini Flash) for structured UX quality scoring across 5 weighted dimensions.

Scoring dimensions:

DimensionWeightEvaluates
Visual hierarchy30%CTA prominence, reading flow, whitespace, visual rhythm
Responsive behavior20%Layout adaptation, stacking, touch targets (≥44px)
Accessibility25%Color contrast (WCAG AA 4.5:1), focus indicators, ARIA
Brand consistency15%Color palette adherence, typography, spacing system
Load performance10%Image optimization, render-blocking, lazy loading
  • Rate limit: 20 evaluations per engagement
  • Cost: ~$0.15 per evaluation
  • Output: Structured JSON with per-dimension scores (0-100), weighted overall score, and actionable recommendations
  • Used by: Loop exit conditions in the design prototyping pipeline

API Routes

FastAPI router modules that decompose api.py into logical groupings. All require the X-Tenant-ID header.

routes/runs.py — Run Management

The largest route module. Handles the full run lifecycle:

  • POST /runs/start: Load BYOK keys, create EngagementRun, inject alert-based tasks, launch execution thread
  • POST /runs/{slug}/pause: Pause with optional state snapshot
  • POST /runs/{slug}/resume: Restore from snapshot and resume
  • POST /runs/{slug}/stop: Cancel execution and clean up
  • GET /runs/{slug}/status: Poll current run state (status, cost, steps, tasks)
  • GET /runs/{slug}/stream: SSE endpoint for real-time updates
  • GET /runs/{slug}/deliverables: List deliverables for the run
  • GET /runs/{slug}/deliverables/{filename}: Read specific deliverable content

routes/sessions.py — Interactive Workflows

Endpoints for all pre/post-execution session workflows:

  • Brief research: start → respond (multi-turn) → finalize → save refined brief
  • Task planner: start → generate (with feedback loop) → approve
  • Podcast: start async generation → poll status until complete
  • Agent researcher: discovery → research → synthesis → create agent

Handles BYOK key loading and surfaces decryption errors as warnings rather than blocking.

routes/analysis.py — Analytics & Insights

Aggregation and analysis endpoints:

  • GET /model-performance: Per-rubric scoring, cost efficiency, score-per-dollar metrics from ModelRoutingDecision table
  • POST /pattern-analysis: Cross-engagement pattern extraction via PatternAnalyzer
  • GET /cost-breakdown: Per-activity, per-tier cost aggregation from ActivityCost table

routes/attachments.py — File Processing

Attachment parsing and context regeneration:

  • POST /attachments/parse-and-summarize: Download file from signed URL → parse → generate LLM summary → update attachment record → regenerate context files (_manifest.json, _context_index.md)

Uses httpx.AsyncClient for file download and delegates to attachment_parser.parse_and_summarize() for format-specific processing.

routes/admin.py — Administration

Health and admin utility endpoints:

  • GET /health: Returns status, version, active run count, and engine mode
  • POST /admin/rss-ingest: Ingest RSS/Atom feed URL → create library entries
  • POST /admin/import-insights: Import markdown insights to PlatformInsight table

routes/queue.py — Queue Management

Queue status, cancellation, and platform capacity endpoints.

MethodPathPurpose
GET/queue/statusList queued and processing entries for the tenant (with queue position)
POST/queue/{queue_id}/cancelCancel a queued entry (rejects if status ≠ queued)
GET/queue/platform-statusAdmin endpoint: platform-wide running count, max concurrent, available slots

The status endpoint returns entries ordered by priority (descending) with queue position numbers for entries still waiting. Cancellation is tenant-scoped — entries are validated against the requesting tenant's ID.


Storage Layer

adapter.py — StorageAdapter (Abstract Base Class)

Defines the interface for all storage operations. Methods cover:

  • Agent and engagement configuration loading
  • Run state persistence
  • Deliverable read/write/list/version (text and binary)
  • Context source loading (attachments, manifests, org knowledge)
  • Library and insight persistence
  • Pause/resume snapshots
  • Log file paths
  • Activity cost persistence
  • Tenant LLM key loading

database.py — DatabaseStorageAdapter

Production implementation connecting to PostgreSQL via psycopg2.

CrewAI Temp File Handling: CrewAI's Task.output_file parameter expects a filesystem path. The DatabaseStorageAdapter solves this by:

  1. Providing a temporary filesystem path for CrewAI to write to
  2. After task completion, reading the temp file content
  3. Persisting the content to PostgreSQL (the Deliverable table)
  4. Cleaning up the temp file

All queries are scoped by tenant_id, maintaining strict tenant isolation.

filesystem.py — FileSystemStorageAdapter

Local development and testing implementation. Reads and writes to the local filesystem, mirroring the directory structure of the original single-user Groundtruth system.

Directory layout:

PathContent
config/agents/*.yamlAgent configuration files
config/engagements/*.yamlEngagement configuration files
data/run_state.jsonAll run states (single JSON file)
data/run_reports/{slug}.jsonPer-engagement run reports
data/pause_snapshots/{slug}.jsonPause state snapshots
deliverables/{slug}/*.mdText deliverables
attachments/{slug}/_context_index.mdAttachment context index
attachments/{slug}/_manifest.jsonAttachment manifest
consulting_library/*.json + *.mdLibrary entries (JSON + markdown pairs)
consulting_library/_insights_index.mdOrg-wide insights index
logs/{slug}/*.logCrewAI log files

Supports text and binary deliverables. Versioning uses incrementing .v1, .v2 suffixes via shutil.copy2(). External resource operations (save_external_resource, list_external_resources) are no-ops since external resources are a database-only concept.


SSE (Server-Sent Events)

During active runs, the engine streams real-time updates to connected clients via the /runs/{slug}/stream endpoint. Event types include:

EventDataPurpose
log.line{ message }Real-time log streaming
cost.updated{ estimated_cost, steps, budget }Real-time cost ticker
run.started{ slug, status }Run began
run.completed{ slug, status, estimated_cost, error }Run finished
dag.updated{ dag_state: { nodes, edges, levels, total_levels } }DAG state change (task status transitions, level progress)
agent.activity{ agent_slug, agent_role, department, action, tool, coworker, message }Per-agent structured activity (step, tool_use, delegation, finish)
task.startedTask ID and nameTask began execution
task.completedTask ID, output summary, scoreTask finished

The dag.updated event is pushed every time _update_dag_state() is called in the DAG engine — after building the DAG, when tasks change status, and after observer scoring. The agent.activity event is pushed from _record_agent_step() in the runner, providing real-time visibility into which agents are working, what tools they are using, and when they delegate to coworkers.

The Next.js web app proxies these SSE connections through its own API (/api/engagements/[id]/stream), adding auth and CORS handling. On the frontend, three hooks manage the connection:

  • useEventSource — Low-level SSE connection management with auto-reconnect. Listens for all known event types including agent.activity.
  • useRunStream — State management for SSE data. Maintains agents (Map of agent states), feedEntries (structured activity feed), dagState (full DAG with nodes, edges, levels), plus cost, logs, and status.
  • useMissionControl — Orchestration hook that derives higher-level state from useRunStream: active/idle agents (15s timeout), tasks grouped by level, phase label, elapsed time, and action handlers (pause/stop/resume).

Testing the Engine

Engine tests are in packages/engine/tests/. Because the LLMRouter is class-based and the StorageAdapter is an abstract interface, both can be mocked or stubbed per-test without global state pollution.

Key testing patterns:

  • Instantiate LLMRouter with mock providers
  • Use FileSystemStorageAdapter with a temp directory
  • Test the CostTracker purely in-memory (no database needed)
  • Feed observer.py with output_text strings directly
  • Use httpx.AsyncClient with FastAPI's TestClient for route testing

On this page

Engine ArchitectureOverviewModule OrganizationCore Executionapi.py — FastAPI Serverrunner.py — Engagement Runnerdag.py — DAG Enginerouter.py — LLM Routermodel_selector.py — Adaptive Model Selectioncosts.py — Cost Trackerqueue_poller.py — Run Queue PollerQuality & Reviewobserver.py — Task Observerred_team.py — Red Team Reviewercitation_verifier.py — Citation Verification Pipelinerouting_judge.py — LLM-as-Judge Tier Classifierlibrary.py — Library CuratorContext & Knowledgecontext.py — Context Windowinginsights.py — Insight Storestate.py — Shared Engine StateSession Workflowssession_manager.py — Session Registrybrief_researcher.py — Brief Researchtask_planner.py — Task Pipeline Plannerpodcast_generator.py — Podcast Generatoragent_researcher.py — Agent Research & CreationUtilitiesencryption.py — BYOK Key Encryptionattachment_parser.py — Multi-Format File Parserrss_ingester.py — RSS Feed Ingestionmetadata_extractor.py — Deliverable Metadata Extractorcost_tracking.py — Activity Cost Trackingerrors.py — Error Hierarchymodel_resolver.py — Self-Healing Model Resolutiontemporal.py — Temporal Context Injectionsimulation.py — Dry-Run Simulationmodels.py — Model Name DefinitionsAgent Toolstools/web_search.py — Web Search & Fetchtools/academic_search.py — Academic Search APIstools/excel_builder.py — Excel Model Buildertools/ideogram.py — Ideogram Image Generatortools/gpt_image.py — GPT Image Generatortools/github_repo.py — GitHub Repository Managertools/vercel_deploy.py — Vercel Deployment Managertools/playwright_eval.py — Screenshot & Page Evaluatortools/vision_evaluate.py — Vision UX EvaluatorAPI Routesroutes/runs.py — Run Managementroutes/sessions.py — Interactive Workflowsroutes/analysis.py — Analytics & Insightsroutes/attachments.py — File Processingroutes/admin.py — Administrationroutes/queue.py — Queue ManagementStorage Layeradapter.py — StorageAdapter (Abstract Base Class)database.py — DatabaseStorageAdapterfilesystem.py — FileSystemStorageAdapterSSE (Server-Sent Events)Testing the EngineRelated Documentation