Skip to main content

Documentation Index

Fetch the complete documentation index at: https://internal.september.wtf/llms.txt

Use this file to discover all available pages before exploring further.

You can’t improve what you don’t measure, and you can’t measure agents without a runner. The Engine doesn’t ship its own eval harness — most teams write a small one. This page covers what a useful harness looks like.

What a harness does

1. read a list of eval cases
2. for each case (in parallel up to a concurrency budget):
   a. spawn a fresh task_id
   b. POST /execute with the case's input
   c. consume the SSE stream, capturing every event
   d. score the result against the case's expected outcome
3. aggregate results
4. compare against a baseline
5. emit a report
A working harness in Python is ~150 lines.

Skeleton

import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from pathlib import Path

import httpx

ENGINE_URL = "http://localhost:8000"
ENGINE_KEY = "..."
CONCURRENCY = 5

@dataclass
class Case:
    id: str
    category: str
    input: dict
    expected: dict

@dataclass
class Result:
    case_id: str
    passed: bool
    score: dict
    text: str
    tool_calls: list
    usage: dict
    elapsed_ms: int

async def run_case(case: Case, run_id: str) -> Result:
    task_id = f"eval-{run_id}-{case.id}"
    start = time.time()

    text_parts = []
    tool_calls = []
    usage = {"input_tokens": 0, "output_tokens": 0, "cache_hit_tokens": 0}

    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream(
            "POST",
            f"{ENGINE_URL}/execute",
            headers={"X-Engine-Key": ENGINE_KEY},
            json={**case.input, "task_id": task_id},
        ) as resp:
            event_type = None
            async for line in resp.aiter_lines():
                if line.startswith("event:"):
                    event_type = line[6:].strip()
                elif line.startswith("data:"):
                    data = json.loads(line[5:].strip())
                    if event_type == "text_delta":
                        text_parts.append(data.get("text", ""))
                    elif event_type == "tool_call":
                        tool_calls.append(data["tool"])
                    elif event_type == "usage":
                        for k in usage:
                            usage[k] += data.get(k, 0)

    text = "".join(text_parts)
    score = score_against(case.expected, text, tool_calls, usage)
    return Result(
        case_id=case.id,
        passed=score["passed"],
        score=score,
        text=text,
        tool_calls=tool_calls,
        usage=usage,
        elapsed_ms=int((time.time() - start) * 1000),
    )

async def run_suite(cases: list[Case]) -> list[Result]:
    run_id = str(uuid.uuid4())[:8]
    sem = asyncio.Semaphore(CONCURRENCY)

    async def bounded(case):
        async with sem:
            return await run_case(case, run_id)

    return await asyncio.gather(*(bounded(c) for c in cases))

def score_against(expected, text, tool_calls, usage):
    passed = True
    notes = []

    for tool in expected.get("must_call_tools", []):
        if tool not in tool_calls:
            passed = False
            notes.append(f"missing required tool call: {tool}")

    for tool in expected.get("must_not_call_tools", []):
        if tool in tool_calls:
            passed = False
            notes.append(f"forbidden tool call: {tool}")

    for needle in expected.get("output_must_contain", []):
        if needle not in text:
            passed = False
            notes.append(f"output missing: {needle!r}")

    if (max_tokens := expected.get("max_tokens")) is not None:
        if usage["output_tokens"] > max_tokens:
            passed = False
            notes.append(f"output_tokens={usage['output_tokens']} > {max_tokens}")

    return {"passed": passed, "notes": notes}
That’s the whole runner. Add LLM-as-judge scoring as a separate function that gets called after the deterministic checks.

Loading cases

Cases live in JSON files (one per case) or a single JSONL file. Both work. JSON files are easier to diff in PRs; JSONL is easier to manage at scale.
def load_cases(path: Path) -> list[Case]:
    cases = []
    for f in sorted(path.glob("*.json")):
        d = json.loads(f.read_text())
        cases.append(Case(**d))
    return cases

Reporting

Pretty output is what makes evals usable:
============================================================
Eval run: 2026-04-27-a1b2c3d4
Cases: 47/50 passed (94%)
============================================================

Categories:
  refactoring          15/15 ✓
  research              9/10 ✗
  classification       12/12 ✓
  multi-step-reasoning 11/13 ✗

Failed cases:
  research/eval-2026-04-001 - missing required tool call: web_search
  multi-step-reasoning/eval-2026-04-014 - output_tokens=12000 > 8000
  multi-step-reasoning/eval-2026-04-019 - output missing: 'rate_limit'

Cost: $4.23
Duration: 4m 12s
P99 latency: 38s
Cache hit ratio: 78%

vs. previous run:
  Pass rate: 94% → 94% (no change)
  Cost: $4.23 → $4.21 (-1¢)
  P99: 38s → 41s (regression)
Track previous runs to compare. Even a flat pass rate hides drifts in cost and latency.

Test isolation

Use a fresh task_id per case (suffix with the run ID and case ID). Otherwise memory bleeds between cases. For really clean isolation, run against a fresh Engine instance with an empty brain. A make eval-clean that wipes the brain volume and brings up the Engine gives you reproducible runs.

Concurrency

Run cases in parallel up to a budget. Provider rate limits set the ceiling; usually 5–10 concurrent works. Higher and you’ll start hitting LLM_RATE_LIMITED.

Storing results

Treat eval runs as data:
evals/
  cases/                    # the suite, version controlled
    2026-04-001.json
    2026-04-002.json
    ...
  runs/                     # results, gitignored or stored elsewhere
    2026-04-27-a1b2c3d4/
      results.json
      summary.txt
      cost.json
Compare across runs to detect drift. A run that passed 95% last week and 90% this week is news, even if the absolute number is fine.

CI integration

Quick suite (~20 cases) on every PR; full suite (~100 cases) on schedule (nightly) and before any release.
# .github/workflows/eval.yml
on:
  pull_request:
    paths:
      - 'agents/**'
      - 'catalog/**'
      - 'src/**'
jobs:
  quick-eval:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: |
          docker compose up -d engine
          python -m evals.run_quick
          docker compose down --volumes
Fail the PR if the pass rate drops below the previous run.

See also