Documentation Index
Fetch the complete documentation index at: https://internal.september.wtf/llms.txt
Use this file to discover all available pages before exploring further.
You can’t improve what you don’t measure, and you can’t measure agents
without a runner. The Engine doesn’t ship its own eval harness — most
teams write a small one. This page covers what a useful harness looks
like.
What a harness does
1. read a list of eval cases
2. for each case (in parallel up to a concurrency budget):
a. spawn a fresh task_id
b. POST /execute with the case's input
c. consume the SSE stream, capturing every event
d. score the result against the case's expected outcome
3. aggregate results
4. compare against a baseline
5. emit a report
A working harness in Python is ~150 lines.
Skeleton
import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
import httpx
ENGINE_URL = "http://localhost:8000"
ENGINE_KEY = "..."
CONCURRENCY = 5
@dataclass
class Case:
id: str
category: str
input: dict
expected: dict
@dataclass
class Result:
case_id: str
passed: bool
score: dict
text: str
tool_calls: list
usage: dict
elapsed_ms: int
async def run_case(case: Case, run_id: str) -> Result:
task_id = f"eval-{run_id}-{case.id}"
start = time.time()
text_parts = []
tool_calls = []
usage = {"input_tokens": 0, "output_tokens": 0, "cache_hit_tokens": 0}
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
f"{ENGINE_URL}/execute",
headers={"X-Engine-Key": ENGINE_KEY},
json={**case.input, "task_id": task_id},
) as resp:
event_type = None
async for line in resp.aiter_lines():
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data = json.loads(line[5:].strip())
if event_type == "text_delta":
text_parts.append(data.get("text", ""))
elif event_type == "tool_call":
tool_calls.append(data["tool"])
elif event_type == "usage":
for k in usage:
usage[k] += data.get(k, 0)
text = "".join(text_parts)
score = score_against(case.expected, text, tool_calls, usage)
return Result(
case_id=case.id,
passed=score["passed"],
score=score,
text=text,
tool_calls=tool_calls,
usage=usage,
elapsed_ms=int((time.time() - start) * 1000),
)
async def run_suite(cases: list[Case]) -> list[Result]:
run_id = str(uuid.uuid4())[:8]
sem = asyncio.Semaphore(CONCURRENCY)
async def bounded(case):
async with sem:
return await run_case(case, run_id)
return await asyncio.gather(*(bounded(c) for c in cases))
def score_against(expected, text, tool_calls, usage):
passed = True
notes = []
for tool in expected.get("must_call_tools", []):
if tool not in tool_calls:
passed = False
notes.append(f"missing required tool call: {tool}")
for tool in expected.get("must_not_call_tools", []):
if tool in tool_calls:
passed = False
notes.append(f"forbidden tool call: {tool}")
for needle in expected.get("output_must_contain", []):
if needle not in text:
passed = False
notes.append(f"output missing: {needle!r}")
if (max_tokens := expected.get("max_tokens")) is not None:
if usage["output_tokens"] > max_tokens:
passed = False
notes.append(f"output_tokens={usage['output_tokens']} > {max_tokens}")
return {"passed": passed, "notes": notes}
That’s the whole runner. Add LLM-as-judge scoring as a separate function
that gets called after the deterministic checks.
Loading cases
Cases live in JSON files (one per case) or a single JSONL file. Both
work. JSON files are easier to diff in PRs; JSONL is easier to manage
at scale.
def load_cases(path: Path) -> list[Case]:
cases = []
for f in sorted(path.glob("*.json")):
d = json.loads(f.read_text())
cases.append(Case(**d))
return cases
Reporting
Pretty output is what makes evals usable:
============================================================
Eval run: 2026-04-27-a1b2c3d4
Cases: 47/50 passed (94%)
============================================================
Categories:
refactoring 15/15 ✓
research 9/10 ✗
classification 12/12 ✓
multi-step-reasoning 11/13 ✗
Failed cases:
research/eval-2026-04-001 - missing required tool call: web_search
multi-step-reasoning/eval-2026-04-014 - output_tokens=12000 > 8000
multi-step-reasoning/eval-2026-04-019 - output missing: 'rate_limit'
Cost: $4.23
Duration: 4m 12s
P99 latency: 38s
Cache hit ratio: 78%
vs. previous run:
Pass rate: 94% → 94% (no change)
Cost: $4.23 → $4.21 (-1¢)
P99: 38s → 41s (regression)
Track previous runs to compare. Even a flat pass rate hides drifts in
cost and latency.
Test isolation
Use a fresh task_id per case (suffix with the run ID and case ID).
Otherwise memory bleeds between cases.
For really clean isolation, run against a fresh Engine instance with an
empty brain. A make eval-clean that wipes the brain volume and brings
up the Engine gives you reproducible runs.
Concurrency
Run cases in parallel up to a budget. Provider rate limits set the
ceiling; usually 5–10 concurrent works. Higher and you’ll start hitting
LLM_RATE_LIMITED.
Storing results
Treat eval runs as data:
evals/
cases/ # the suite, version controlled
2026-04-001.json
2026-04-002.json
...
runs/ # results, gitignored or stored elsewhere
2026-04-27-a1b2c3d4/
results.json
summary.txt
cost.json
Compare across runs to detect drift. A run that passed 95% last week
and 90% this week is news, even if the absolute number is fine.
CI integration
Quick suite (~20 cases) on every PR; full suite (~100 cases) on
schedule (nightly) and before any release.
# .github/workflows/eval.yml
on:
pull_request:
paths:
- 'agents/**'
- 'catalog/**'
- 'src/**'
jobs:
quick-eval:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: |
docker compose up -d engine
python -m evals.run_quick
docker compose down --volumes
Fail the PR if the pass rate drops below the previous run.
See also