Engine Architecture
Engine Architecture
The Groundtruth engine is a Python 3.12 application that orchestrates AI consulting crews using CrewAI. It runs as a persistent FastAPI service on Railway, called by the Next.js web app via HTTP.
Overview
- 30 core modules + 3 storage modules + 6 route modules + 9 tool modules — approximately 10,000+ lines of code
- Key design principle: everything routes through
StorageAdapter(no direct filesystem access) andLLMRouter(no module-level LLM calls) - Location:
packages/engine/
Module Organization
| Category | Modules | Purpose |
|---|---|---|
| Core Execution | api, runner, dag, router, model_selector, costs, queue_poller | Run lifecycle, task orchestration, LLM dispatch, queue processing |
| Quality & Review | observer, red_team, citation_verifier, routing_judge, library | Scoring, adversarial review, source verification, tier classification |
| Context & Knowledge | context, insights, state | Context injection, cross-task knowledge, shared state |
| Session Workflows | brief_researcher, task_planner, podcast_generator, agent_researcher, session_manager | Interactive pre/post-execution workflows |
| Utilities | encryption, attachment_parser, rss_ingester, metadata_extractor, cost_tracking, errors, models, model_resolver, temporal, simulation | Shared infrastructure, model validation, temporal context, dry-run |
| Agent Tools | tools/web_search, tools/academic_search, tools/excel_builder, tools/ideogram, tools/gpt_image, tools/github_repo, tools/vercel_deploy, tools/playwright_eval, tools/vision_evaluate | CrewAI tool implementations |
| API Routes | routes/runs, routes/sessions, routes/analysis, routes/attachments, routes/admin, routes/queue | FastAPI endpoint modules |
| Storage Layer | storage/adapter, storage/database, storage/filesystem | Abstract storage interface + implementations |
Core Execution
api.py — FastAPI Server
The HTTP interface for the engine. Exposes all endpoints that the Next.js web app calls. In production, the route definitions are decomposed into the API Routes modules — api.py mounts these routers and handles middleware, CORS, and Sentry integration.
Endpoints:
| Method | Path | Purpose | Route Module |
|---|---|---|---|
GET | /health | Service health check | admin |
POST | /runs/start | Start an engagement run | runs |
GET | /runs/{slug}/status | Get current run status | runs |
POST | /runs/{slug}/stop | Stop a running engagement | runs |
POST | /runs/{slug}/pause | Pause a running engagement | runs |
POST | /runs/{slug}/resume | Resume a paused engagement | runs |
GET | /runs/{slug}/logs | Get run log output | runs |
GET | /runs/{slug}/stream | SSE stream of real-time updates | runs |
GET | /runs/{slug}/deliverables | List deliverables produced by a run | runs |
POST | /pattern-analysis | Trigger cross-engagement pattern analysis | analysis |
GET | /model-performance | Model performance dashboard | analysis |
GET | /cost-breakdown | Per-activity cost breakdown | analysis |
POST | /sessions/brief-research/* | Brief research workflow | sessions |
POST | /sessions/task-planner/* | Task planning workflow | sessions |
POST | /sessions/podcast/* | Podcast generation | sessions |
POST | /sessions/agent-research/* | Agent research workflow | sessions |
POST | /attachments/parse-and-summarize | Attachment parsing | attachments |
POST | /admin/rss-ingest | RSS feed ingestion | admin |
POST | /admin/import-insights | Insights import | admin |
All endpoints require an X-Tenant-ID header for tenant scoping. The middleware tags Sentry events with the tenant ID for debugging.
Run Registry: Active runs are tracked in an in-memory dictionary keyed by {tenant_id}:{engagement_slug}. This means a single Railway instance manages all concurrent runs. The stateless design ensures no local filesystem state persists between deploys.
runner.py — Engagement Runner
Manages the full lifecycle of an engagement run:
- Load engagement configuration from the database (via
StorageAdapter) - Build the CrewAI crew with the configured agents and tasks
- Execute the crew (delegating to
dag.pyfor parallel execution) - Persist results (costs, token usage, deliverables) back to the database
- Run post-execution quality scoring (via
observer.py)
The runner maintains run state (status, timing, costs) in memory during execution and persists snapshots to the database for recovery and reporting. It also handles BYOK key injection, alert-based task insertion, and agent activity event emission for the Mission Control UI.
dag.py — DAG Engine
Parallel task execution based on a dependency graph. Tasks without dependencies run concurrently; tasks with dependencies wait for their upstream tasks to complete.
Key capabilities:
- Parallel execution: Independent tasks run simultaneously, reducing total run time
- Re-runs: Individual tasks can be re-run without re-executing the entire engagement
- Dependency resolution: Automatically determines execution order from the task graph
- Red-team integration: Final deliverables pass through adversarial review before completion
- DAG state events: Pushes
dag.updatedSSE events on every state transition for real-time UI updates
The DAG engine reads task output text directly (not file paths) and passes it to the observer for quality scoring.
router.py — LLM Router
Class-based multi-provider LLM dispatch. Routes each request to the appropriate model based on the agent's assigned tier.
Model Tiers:
| Tier | Model | Provider | Use Case |
|---|---|---|---|
| strategy | grok-4-0709 | xAI | C-suite strategic analysis |
| writing | claude-sonnet-4-6 | Anthropic | Communications, storytelling |
| fullstack | claude-sonnet-4-6 | Anthropic | Product, engineering, research |
| analytical | gpt-4.1-nano | OpenAI | Data analysis, scoring, reviews |
| code | qwen3-coder | Ollama (local) | Code generation |
| simple | qwen3:8b | Ollama (local) | Basic support tasks |
| multimodal | gemini-2.5-flash-lite | Image/document analysis | |
| manager | claude-sonnet-4-6 | Anthropic | CrewAI manager agent |
| deepseek | deepseek-chat | DeepSeek | DeepSeek-specific tasks |
| mistral | mistral-small-latest | Mistral | Mistral-specific tasks |
The router is instantiated as a class (not module-level functions), making it independently testable. Each test can create its own router instance with mock providers. Supports BYOK key overrides via api_key_overrides parameter. Includes retry-once with 5s backoff on transient API errors in call_llm().
Production note: code and simple tiers fall back to gpt-4.1-nano on Railway (no Ollama available).
model_selector.py — Adaptive Model Selection
Implements a 4-layer decision process for choosing the optimal model for each task:
- Static defaults: Tier-based model assignment from the router configuration
- Rubric mapping: Task-specific rubrics influence model choice (e.g., creative tasks prefer writing-tier models)
- Historical performance: Past
TaskScoredata informs which models perform best for similar tasks - Epsilon-greedy exploration: Occasionally selects a non-optimal model to discover better options (configurable exploration rate)
Decisions and outcomes are persisted to the ModelRoutingDecision table, creating a feedback loop that improves model selection over time.
costs.py — Cost Tracker
Pure in-memory cost tracking per run. Monitors token usage and estimated costs in real time during execution.
Key features:
- Budget circuit breaker: Halts execution immediately when the estimated cost exceeds the engagement's budget limit. Prevents runaway spend.
- Per-model pricing: Tracks costs based on each model's per-token pricing (input and output separately)
- No persistence: The cost tracker itself holds no state between runs. The runner persists final cost data to the database via
StorageAdapter. - Transient error classification: Classifies xAI
finish_reasonerrors as transient (retryable) via_TRANSIENT_ERROR_PATTERNS.
queue_poller.py — Run Queue Poller
Background daemon thread that claims queued runs and starts them. Started automatically on application boot in api.py. Polls the RunQueue table every 5 seconds (configurable via QUEUE_POLL_INTERVAL_SECS) for entries ready to process.
Key capabilities:
- Priority-based scheduling: Claims the highest-priority queued entry first, using SQL
ORDER BY priority DESC - Anti-starvation: Lower-priority entries gain +1 effective priority per 10 minutes in queue, preventing Enterprise runs from permanently starving Starter runs
- Tenant concurrency limits: Respects per-plan limits (Starter: 1, Professional: 3, Enterprise: 5 concurrent runs)
- Platform capacity gating: Checks
MAX_CONCURRENT_RUNSbefore claiming any entry - Stuck run detection: Entries in
processingstate with heartbeats older than 5 minutes are reset toqueuedwith a retry counter. After 3 retries, marked asfailed - Connection pooling: Borrows connections from the shared psycopg2 pool rather than opening new connections each cycle
Key functions: start_queue_poller() (daemon thread launcher), _poll_once() (single poll cycle), _handle_stuck_runs() (stale heartbeat detection).
Quality & Review
observer.py — Task Observer
Post-task quality scoring using GPT-4o-mini. After each task completes, the observer evaluates the output on rubric-specific criteria.
Returns:
- Score (1-10): Overall quality rating
- Insights: What the task output reveals or does well
- Issues: Identified problems or weaknesses
- Rerun recommendation: Whether the task should be re-executed for better results
The observer takes output_text directly as a parameter (not a file path), so it works identically with both storage adapter implementations. Defaults to score=5 (rerun recommended) on API failure instead of a phantom 7/10 pass.
red_team.py — Red Team Reviewer
Adversarial review of final deliverables. A separate LLM call reviews each deliverable from a critical perspective, looking for:
- Logical inconsistencies
- Unsupported claims
- Missing perspectives
- Potential risks in the recommendations
Red team feedback is attached to the deliverable for the client to review alongside the primary output.
citation_verifier.py — Citation Verification Pipeline
Programmatic citation verification (Tier 3 of the source verification system). Runs automated extraction, existence checking, and claim-source alignment independently of agent behavior.
4-stage pipeline:
- Extraction: Regex-based fast extraction + optional LLM-based thorough extraction of citations from deliverable text
- Verification: Cascading academic search (Semantic Scholar → OpenAlex → CrossRef → DuckDuckGo) to confirm source existence
- Alignment: LLM-based claim-source alignment checking — does the cited source actually support the claim being made?
- Reporting: Structured markdown report with statistics, critical findings, and tier classification
Key classes: CitationExtractor, CitationVerifier, ClaimAlignmentChecker, BibliographyVerifier (orchestrator).
Critical findings are flagged when T4 sources (unverifiable) drive major recommendations.
routing_judge.py — LLM-as-Judge Tier Classifier
Uses gpt-4.1-nano as a routing judge to classify which LLM tier best fits a task. Replaces fragile keyword matching with an LLM call that costs ~$0.00003/classification (~200 input tokens, ~30 output tokens).
Two modes:
| Mode | Function | When Used |
|---|---|---|
| Tier classification | classify_task_tier() | Bootstrap mode (<50 scored routing decisions), then demotes to re-run-only |
| Incumbent challenge | should_challenge_incumbent() | Historical mode ~8% of the time when fingerprint data is available |
The tier classifier receives the task description, agent role, and available tiers (with model names and pricing), then returns a (tier, reasoning) tuple. The challenge mode examines an incumbent model's known strengths and weaknesses against task requirements to decide if a challenger model might perform better.
JSON response parsing includes fallback regex extraction. Invalid tiers are rejected — the judge can only select from the agent's configured options. Cost is tracked via CostTracker and ActivityCost records.
library.py — Library Curator
Post-run analysis and pattern extraction. After an engagement completes, the library curator:
- Generates a run report summarizing execution (timing, costs, quality scores)
- Produces an operations review with improvement recommendations
- Creates a library entry — a structured record of what was learned
- Updates the organization-wide insights index
The PatternAnalyzer class performs cross-engagement analysis (within a tenant's data) to identify recurring patterns, common issues, and improvement opportunities.
Context & Knowledge
context.py — Context Windowing
Builds the context injection pipeline for each task. Assembles context from multiple sources:
- Engagement brief: The client's initial requirements and background
- Formatting directive: Standard markdown structure rules (heading hierarchy, data presentation, readability) injected into every task to ensure consistent deliverable formatting
- Design adherence: Slider-controlled guidance on how closely to follow reference materials
- Attachment context index: Auto-generated summaries of uploaded documents
- Cross-engagement insights: Anonymized patterns from previous runs (platform-level, not cross-tenant)
- Prior task outputs: Results from upstream tasks in the DAG
The formatting directive (FORMATTING_DIRECTIVE constant) adds 350 tokens per task ($0.001 cost) and instructs agents to use H1 for document titles only, H2 for major sections, tables for comparative data, and to maintain a clear executive-to-detail hierarchy.
The context module takes manifest as a list[dict] parameter (not a filesystem path), keeping it independent of the storage backend.
insights.py — Insight Store
Thread-safe shared knowledge bus per engagement. During execution, agents can deposit insights that other agents pick up in subsequent tasks.
This enables emergent collaboration: an early research task might surface a finding that a later strategy task incorporates, even though the two agents were not explicitly configured to share that information.
state.py — Shared Engine State
Process-local state registry and factory helpers. Holds the active runs dictionary, shared LLM router singleton, and storage adapter factory. Used by route modules to access shared state without circular imports.
Key exports:
active_runs:Dict[str, EngagementRun]keyed by{tenant_id}:{engagement_slug}, protected byruns_lockget_storage(tenant_id): Factory that creates aDatabaseStorageAdapterscoped to the given tenantget_router(): Lazy-loaded singletonLLMRouterinstance
Session Workflows
These modules manage interactive, multi-turn workflows that run before or after crew execution. Each creates a "session" via the shared SessionRegistry that maintains state across multiple API calls.
session_manager.py — Session Registry
Generic thread-safe session registry with TTL-based cleanup. Eliminates boilerplate across all session modules.
SessionRegistry[T]: Generic class parameterized by session type- Tenant-scoped keying:
{tenant_id}:{identifier} - Thread-safe
get(),put(),remove()operations cleanup_stale()removes sessions older than a configurable TTL
Used by: brief_researcher, task_planner, podcast_generator, agent_researcher.
brief_researcher.py — Brief Research
LLM-powered pre-kickoff brief refinement through multi-turn conversation. Analyzes engagement briefs, asks clarifying questions via a chat interface, and reformats the refined brief into a comprehensive, token-efficient prompt before crew execution.
Workflow: Start session → Multi-turn chat → Finalize → Save refined brief as deliverable
Key class: BriefResearchSession — manages conversation state, messages, and transcript saving. Uses "analytical" tier LLM calls and tracks cost per interaction.
task_planner.py — Task Pipeline Planner
LLM-powered engagement task pipeline generation. Analyzes the engagement brief and available agent roster, then generates a tailored multi-phase task pipeline for user approval before crew execution.
Workflow: Start session → Generate tasks (with optional feedback) → Approve → Save to engagement
Key class: TaskPlannerSession — generates cost-aware task arrays (tasks cost ~$0.40-0.80 each), scales phases by budget (1-4 phases), supports regeneration with user feedback, and validates JSON output. Supports template-based planning for engagement templates.
podcast_generator.py — Podcast Generator
SCIPA brief synthesis + optional two-speaker podcast audio. Synthesizes all engagement deliverables into a SCIPA-structured document (Situation, Complication, Implication, Proposal, Ask, Benefit) suitable for Google NotebookLM or OpenAI TTS.
Workflow: Start generation → Background thread produces SCIPA brief → Generate dialogue script → Render audio via OpenAI TTS → Save as binary deliverable
Key class: PodcastSession — manages async generation with phase tracking (generating_brief → generating_audio → complete). Uses "writing" tier for SCIPA and dialogue generation. Supports BYOK OpenAI key for TTS.
agent_researcher.py — Agent Research & Creation
4-phase agent creation pipeline: discovery through conversation, job board research, profile synthesis, and agent creation with overlap detection.
Workflow: Discovery (chat) → Research (DuckDuckGo job boards) → Synthesis (profile generation) → Create (with overlap check against existing roster)
Key class: AgentResearchSession — manages the full 4-phase workflow. Uses DuckDuckGo-based web search for job postings and LLM-driven JSON profile extraction. Checks for overlap with existing agents before creating a new DynamicAgent.
Utilities
encryption.py — BYOK Key Encryption
Fernet (AES-128-CBC + HMAC) encryption for tenant BYOK API keys. Provides encrypt_api_key() and decrypt_api_key() helpers.
Requires the LLM_KEY_ENCRYPTION_KEY environment variable. Validates key format at startup via validate_encryption_key(). Security-critical — handles plaintext API keys for 6 providers (OpenAI, Anthropic, xAI, Google, DeepSeek, Mistral).
attachment_parser.py — Multi-Format File Parser
Parses uploaded files and generates LLM summaries. Supports PDF (pdfplumber), XLSX (openpyxl), DOCX (python-docx), PPTX (python-pptx), CSV, images (PIL), JSON, and plain text.
Main entry point: parse_and_summarize() — returns {parsed, summary, meta, error}. Full text extraction is capped at 100KB. Uses "analytical" tier LLM for summarization (~$0.00025/doc). Extracts format-specific metadata (page count, sheet names, column count, image dimensions, EXIF data).
rss_ingester.py — RSS Feed Ingestion
RSS 2.0 and Atom feed parser. Fetches feeds, extracts articles, and stores as library entries via StorageAdapter.
Key safety feature: SSRF validation — blocks private IP ranges before fetching. Falls back to Wayback Machine for dead links. Caps at 20 entries per feed. Uses stdlib urllib.request (no external HTTP library).
metadata_extractor.py — Deliverable Metadata Extractor
Deterministic post-processor for extracting structured metadata from deliverable content. Zero LLM calls — pure regex.
Extracts:
- BLUF (Bottom Line Up Front) from
<!-- BLUF: ... -->HTML comment markers (with fallback to first paragraph after H1) - Confidence levels from
<!-- CONFIDENCE: high|medium|low|verify -->markers - Heading structure with line ranges and nesting
- Sources section and next steps section detection
Main entry point: extract_metadata() — returns (cleaned_content, metadata_dict) with markers stripped from the clean content.
cost_tracking.py — Activity Cost Tracking
Utility for extracting token counts from LLM responses and persisting ActivityCost records asynchronously.
extract_llm_usage(): Extracts{model, input_tokens, output_tokens, cost}from LangChain response metadatatrack_activity_cost(): Extract and persist cost in a fire-and-forget daemon thread
Called by every module that makes LLM calls — the centralized cost attribution point for the platform.
errors.py — Error Hierarchy
Structured exception types so API routes can map failures to appropriate HTTP status codes.
| Exception | HTTP Status | When |
|---|---|---|
EngineError | 500 | Base class for all engine errors |
LLMError | 502 | Provider returned error or unusable output |
StorageError | 500 | Database/storage operation failed |
BudgetExceededError | 402 | Engagement budget limit reached |
SessionNotFoundError | 404 | Requested session not found |
EngagementNotFoundError | 404 | Engagement not found |
ConfigurationError | 500 | Engine config is invalid |
model_resolver.py — Self-Healing Model Resolution
Validates model names at startup by probing provider APIs and resolves invalid models at runtime via a cascading fallback strategy. Zero impact on the happy path — resolution only fires when a provider returns a model-not-found error.
Resolution cascade:
- Check cache — previously resolved models are returned immediately (1-hour TTL)
- Strip date suffix — e.g.,
claude-sonnet-4-6-20250217→claude-sonnet-4-6 - Tier fallback chain — ordered list of
(provider, model)alternatives per tier - Terminal fallback —
gpt-4.1-nanoas the last resort
Key class: ModelResolver — instantiated with the router's model_names dict. validate_at_startup() probes OpenAI, Anthropic, and xAI model list endpoints (skips Ollama, Gemini, DeepSeek, Mistral). Returns a validation summary dict per tier showing model, provider, validity, and any auto-resolution applied.
Providers are guessed from model name prefixes (claude- → Anthropic, gpt- → OpenAI, grok- → xAI). Non-blocking — all exceptions are caught, boot is never prevented.
temporal.py — Temporal Context Injection
Provides current-date awareness and AI-era timeline calibration directives injected into all LLM prompts. Without this, LLMs default to their training cutoff and produce outdated references.
Two directives:
| Function | Purpose | Token Cost |
|---|---|---|
temporal_context() | Injects today's date (UTC) and instructs the LLM to prefer recent data and not cite pre-previous-year sources as "current" | ~50 tokens |
timeline_calibration() | Instructs agents to present dual timelines (traditional pre-AI vs AI-enabled) for any roadmaps or effort estimates | ~200 tokens |
The timeline calibration directive ensures agents present both a traditional human-team estimate and an AI-accelerated estimate (typically 5-20% of traditional), helping clients see the acceleration AI provides. Injected via the context windowing pipeline in context.py.
simulation.py — Dry-Run Simulation
Zero-cost pipeline verification mode. Generates template content and cost estimates so dry-run executions exercise the full persistence pipeline (deliverables, metadata extraction, observer scoring) without making any LLM API calls.
Two functions:
| Function | Purpose |
|---|---|
generate_simulation_content() | Produces 300-500 word markdown tagged with [SIMULATION] banner. Includes structural markers (BLUF, CONFIDENCE, Sources, Next Steps) that exercise the metadata_extractor |
estimate_run_cost() | Projects per-task costs using MODEL_PRICING and AGENT_LLM_OPTIONS, plus overhead for observer, red-team, bibliography, and ops review operations |
The cost estimator uses fixed token assumptions per task (2,000 input / 1,500 output) and per overhead operation. Returns a detailed breakdown with per-task costs, overhead subtotals, total estimated cost, required agents, and required providers. Filters out disabled tasks automatically.
models.py — Model Name Definitions
Single source of truth for tier-to-model mappings. Imported by router.py (LLM dispatch) and costs.py (pricing lookups).
Key constants:
DEFAULT_MODEL_NAMES: Dict mapping tiers to default modelsFALLBACK_CAPABLE: Fallback for premium tiers (gpt-4.1)FALLBACK_MIDTIER: Fallback for utility tiers (gpt-4.1-mini)FALLBACK_ANALYTICAL: Non-OpenAI escape hatch (claude-haiku-4-5-20251001)
Agent Tools
CrewAI tool implementations that agents can invoke during execution. Each is a subclass of CrewAI's BaseTool with rate limiting per engagement.
tools/web_search.py — Web Search & Fetch
DuckDuckGo-based web search and page text extraction for source verification.
WebSearchTool: Rate-limited search (max 100 per engagement). Returns results with source tier labels.WebFetchTool: Rate-limited page fetch (max 30 per engagement). Strips scripts, styles, and HTML tags.classify_source_tier(): Domain-to-tier classification (T1-T4) using allowlist fromsource_tiers.json._check_url_status(): HEAD request for URL existence verification.
tools/academic_search.py — Academic Search APIs
Free academic search APIs for citation verification. Three providers, no API keys required.
SemanticScholarTool: 200M+ papers. Returns title, authors, year, citation count, abstract, DOI.CrossRefTool: 150M+ DOIs. Uses polite pool header for higher rate limits.OpenAlexTool: 250M+ works. Broadest coverage, includes open access URLs.
Rate limits: max 30 queries, max 20 lookups per engagement.
tools/excel_builder.py — Excel Model Builder
Generates .xlsx financial models from JSON specifications. Agents output a JSON spec; the tool converts it to a workbook with openpyxl.
Key feature: Assumptions tab — highlighted in yellow with 5 columns (Variable, Estimated Value, Source, Your Actual, Effective Value). Generates formulas that reference the Effective Value column, allowing clients to override assumptions and see model results update automatically.
tools/ideogram.py — Ideogram Image Generator
Generates logos, brand imagery, and design assets via the Ideogram 3.0 API. Used by design department agents (brand_designer, design_system_architect).
- Model: Ideogram V_2A
- Cost: ~$0.06 per image
- Rate limit: 20 generations per engagement
- Styles: AUTO, DESIGN, REALISTIC, RENDER_3D, ANIME
- Aspect ratios: 1:1, 16:9, 9:16, 4:3, 3:4
- Output: 1-8 images per generation, URLs saved as external resources via
StorageAdapter - Env var:
IDEOGRAM_API_KEY
Costs are tracked via CostTracker.record_api_cost(). Uses stdlib urllib.request for HTTP.
tools/gpt_image.py — GPT Image Generator
Generates images via OpenAI's DALL-E 3 API. Complements the Ideogram tool — better for illustrations and conceptual imagery.
- Cost: $0.04 (standard 1024x1024) to $0.12 (HD 1792x1024)
- Rate limit: 20 generations per engagement
- Sizes: 1024x1024, 1792x1024, 1024x1792
- Quality: standard or HD
- Note: DALL-E 3 generates 1 image per API request, so
num_images=4results in 4 sequential requests - Env var: Uses existing
OPENAI_API_KEY
tools/github_repo.py — GitHub Repository Manager
Creates GitHub repositories and pushes multi-file codebases atomically. Used by prototype_engineer and ui_engineer agents to deploy working code.
Two actions:
| Action | Purpose |
|---|---|
create_repo | Creates a public repo under the configured org with auto-init (README) |
push_files | Atomic multi-file commit via Git tree API (create blobs → assemble tree → create commit → update ref) |
- Rate limit: 10 operations per engagement
- Env vars:
GITHUB_PAT(personal access token with repo scope),GITHUB_ORG - Output: Repository URL, commit SHA, saved as external resource
tools/vercel_deploy.py — Vercel Deployment Manager
Creates Vercel projects linked to GitHub repositories and triggers deployments with status polling.
Three actions:
| Action | Purpose |
|---|---|
create_project | Links a GitHub repo to a new Vercel project (auto-deploy on push) |
deploy | Triggers a production deployment, polls every 10s for up to 120s until READY or ERROR |
get_status | Checks a deployment's current state |
- Rate limit: 10 operations per engagement
- Env vars:
VERCEL_TOKEN,VERCEL_TEAM_ID - Output: Live deployment URL saved as external resource when status = READY
tools/playwright_eval.py — Screenshot & Page Evaluator
Captures screenshots of deployed prototypes at multiple viewports using headless Chromium via Playwright. Used by ux_evaluator and design_researcher agents.
- Viewports: desktop (1440x900), tablet (768x1024), mobile (375x812) — all at 2x (Retina) scale
- Rate limit: 50 screenshots per engagement
- Metrics collected: HTTP status, page load time (ms), console errors/warnings
- Output: PNG screenshots saved as binary deliverables via
write_binary_deliverable() - Dependency:
playwright+ Chromium browser
Supports full-page scrolling captures or viewport-only. Console errors are collected and reported (up to 10 shown).
tools/vision_evaluate.py — Vision UX Evaluator
Sends screenshots to a multimodal LLM (Gemini Flash) for structured UX quality scoring across 5 weighted dimensions.
Scoring dimensions:
| Dimension | Weight | Evaluates |
|---|---|---|
| Visual hierarchy | 30% | CTA prominence, reading flow, whitespace, visual rhythm |
| Responsive behavior | 20% | Layout adaptation, stacking, touch targets (≥44px) |
| Accessibility | 25% | Color contrast (WCAG AA 4.5:1), focus indicators, ARIA |
| Brand consistency | 15% | Color palette adherence, typography, spacing system |
| Load performance | 10% | Image optimization, render-blocking, lazy loading |
- Rate limit: 20 evaluations per engagement
- Cost: ~$0.15 per evaluation
- Output: Structured JSON with per-dimension scores (0-100), weighted overall score, and actionable recommendations
- Used by: Loop exit conditions in the design prototyping pipeline
API Routes
FastAPI router modules that decompose api.py into logical groupings. All require the X-Tenant-ID header.
routes/runs.py — Run Management
The largest route module. Handles the full run lifecycle:
POST /runs/start: Load BYOK keys, createEngagementRun, inject alert-based tasks, launch execution threadPOST /runs/{slug}/pause: Pause with optional state snapshotPOST /runs/{slug}/resume: Restore from snapshot and resumePOST /runs/{slug}/stop: Cancel execution and clean upGET /runs/{slug}/status: Poll current run state (status, cost, steps, tasks)GET /runs/{slug}/stream: SSE endpoint for real-time updatesGET /runs/{slug}/deliverables: List deliverables for the runGET /runs/{slug}/deliverables/{filename}: Read specific deliverable content
routes/sessions.py — Interactive Workflows
Endpoints for all pre/post-execution session workflows:
- Brief research: start → respond (multi-turn) → finalize → save refined brief
- Task planner: start → generate (with feedback loop) → approve
- Podcast: start async generation → poll status until complete
- Agent researcher: discovery → research → synthesis → create agent
Handles BYOK key loading and surfaces decryption errors as warnings rather than blocking.
routes/analysis.py — Analytics & Insights
Aggregation and analysis endpoints:
GET /model-performance: Per-rubric scoring, cost efficiency, score-per-dollar metrics fromModelRoutingDecisiontablePOST /pattern-analysis: Cross-engagement pattern extraction viaPatternAnalyzerGET /cost-breakdown: Per-activity, per-tier cost aggregation fromActivityCosttable
routes/attachments.py — File Processing
Attachment parsing and context regeneration:
POST /attachments/parse-and-summarize: Download file from signed URL → parse → generate LLM summary → update attachment record → regenerate context files (_manifest.json,_context_index.md)
Uses httpx.AsyncClient for file download and delegates to attachment_parser.parse_and_summarize() for format-specific processing.
routes/admin.py — Administration
Health and admin utility endpoints:
GET /health: Returns status, version, active run count, and engine modePOST /admin/rss-ingest: Ingest RSS/Atom feed URL → create library entriesPOST /admin/import-insights: Import markdown insights toPlatformInsighttable
routes/queue.py — Queue Management
Queue status, cancellation, and platform capacity endpoints.
| Method | Path | Purpose |
|---|---|---|
GET | /queue/status | List queued and processing entries for the tenant (with queue position) |
POST | /queue/{queue_id}/cancel | Cancel a queued entry (rejects if status ≠ queued) |
GET | /queue/platform-status | Admin endpoint: platform-wide running count, max concurrent, available slots |
The status endpoint returns entries ordered by priority (descending) with queue position numbers for entries still waiting. Cancellation is tenant-scoped — entries are validated against the requesting tenant's ID.
Storage Layer
adapter.py — StorageAdapter (Abstract Base Class)
Defines the interface for all storage operations. Methods cover:
- Agent and engagement configuration loading
- Run state persistence
- Deliverable read/write/list/version (text and binary)
- Context source loading (attachments, manifests, org knowledge)
- Library and insight persistence
- Pause/resume snapshots
- Log file paths
- Activity cost persistence
- Tenant LLM key loading
database.py — DatabaseStorageAdapter
Production implementation connecting to PostgreSQL via psycopg2.
CrewAI Temp File Handling: CrewAI's Task.output_file parameter expects a filesystem path. The DatabaseStorageAdapter solves this by:
- Providing a temporary filesystem path for CrewAI to write to
- After task completion, reading the temp file content
- Persisting the content to PostgreSQL (the
Deliverabletable) - Cleaning up the temp file
All queries are scoped by tenant_id, maintaining strict tenant isolation.
filesystem.py — FileSystemStorageAdapter
Local development and testing implementation. Reads and writes to the local filesystem, mirroring the directory structure of the original single-user Groundtruth system.
Directory layout:
| Path | Content |
|---|---|
config/agents/*.yaml | Agent configuration files |
config/engagements/*.yaml | Engagement configuration files |
data/run_state.json | All run states (single JSON file) |
data/run_reports/{slug}.json | Per-engagement run reports |
data/pause_snapshots/{slug}.json | Pause state snapshots |
deliverables/{slug}/*.md | Text deliverables |
attachments/{slug}/_context_index.md | Attachment context index |
attachments/{slug}/_manifest.json | Attachment manifest |
consulting_library/*.json + *.md | Library entries (JSON + markdown pairs) |
consulting_library/_insights_index.md | Org-wide insights index |
logs/{slug}/*.log | CrewAI log files |
Supports text and binary deliverables. Versioning uses incrementing .v1, .v2 suffixes via shutil.copy2(). External resource operations (save_external_resource, list_external_resources) are no-ops since external resources are a database-only concept.
SSE (Server-Sent Events)
During active runs, the engine streams real-time updates to connected clients via the /runs/{slug}/stream endpoint. Event types include:
| Event | Data | Purpose |
|---|---|---|
log.line | { message } | Real-time log streaming |
cost.updated | { estimated_cost, steps, budget } | Real-time cost ticker |
run.started | { slug, status } | Run began |
run.completed | { slug, status, estimated_cost, error } | Run finished |
dag.updated | { dag_state: { nodes, edges, levels, total_levels } } | DAG state change (task status transitions, level progress) |
agent.activity | { agent_slug, agent_role, department, action, tool, coworker, message } | Per-agent structured activity (step, tool_use, delegation, finish) |
task.started | Task ID and name | Task began execution |
task.completed | Task ID, output summary, score | Task finished |
The dag.updated event is pushed every time _update_dag_state() is called in the DAG engine — after building the DAG, when tasks change status, and after observer scoring. The agent.activity event is pushed from _record_agent_step() in the runner, providing real-time visibility into which agents are working, what tools they are using, and when they delegate to coworkers.
The Next.js web app proxies these SSE connections through its own API (/api/engagements/[id]/stream), adding auth and CORS handling. On the frontend, three hooks manage the connection:
useEventSource— Low-level SSE connection management with auto-reconnect. Listens for all known event types includingagent.activity.useRunStream— State management for SSE data. Maintainsagents(Map of agent states),feedEntries(structured activity feed),dagState(full DAG with nodes, edges, levels), plus cost, logs, and status.useMissionControl— Orchestration hook that derives higher-level state fromuseRunStream: active/idle agents (15s timeout), tasks grouped by level, phase label, elapsed time, and action handlers (pause/stop/resume).
Testing the Engine
Engine tests are in packages/engine/tests/. Because the LLMRouter is class-based and the StorageAdapter is an abstract interface, both can be mocked or stubbed per-test without global state pollution.
Key testing patterns:
- Instantiate
LLMRouterwith mock providers - Use
FileSystemStorageAdapterwith a temp directory - Test the
CostTrackerpurely in-memory (no database needed) - Feed
observer.pywithoutput_textstrings directly - Use
httpx.AsyncClientwith FastAPI'sTestClientfor route testing
Related Documentation
- Architecture Overview — how the engine fits into the overall system
- Agent Roster — all 28 agents with roles, goals, tiers, and departments
- Local Development — running the engine locally
- Testing Guide — test patterns and running the test suite
- Deployment Guide — deploying the engine to Railway