Documentation Index
Fetch the complete documentation index at: https://internal.september.wtf/llms.txt
Use this file to discover all available pages before exploring further.
This page walks through your first Engine call from Python. We use httpx
because it has good streaming support and a nice async API, but anything
that speaks HTTP and can read SSE will work.
Install
A minimal client
Save this as engine_quickstart.py:
import json
import os
import httpx
ENGINE_URL = os.environ.get("ENGINE_URL", "http://localhost:8000")
ENGINE_KEY = os.environ["ENGINE_KEY"]
def execute(message: str, task_id: str) -> None:
"""Stream a response from the Engine and print text deltas."""
headers = {
"X-Engine-Key": ENGINE_KEY,
"Content-Type": "application/json",
}
payload = {"message": message, "task_id": task_id}
with httpx.Client(timeout=None) as client:
with client.stream("POST", f"{ENGINE_URL}/execute",
json=payload, headers=headers) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if not line or not line.startswith("data:"):
continue
data = json.loads(line[len("data:"):].strip())
if "text" in data:
print(data["text"], end="", flush=True)
print() # final newline
if __name__ == "__main__":
execute("Say hello in one short sentence.", task_id="demo-001")
Set your environment and run:
export ENGINE_URL=http://localhost:8000
export ENGINE_KEY=dev-engine-key
python engine_quickstart.py
You should see the agent’s reply stream into your terminal, one chunk at a
time.
Async version
If your application is async, here’s the same client using httpx.AsyncClient:
import asyncio
import json
import os
import httpx
ENGINE_URL = os.environ.get("ENGINE_URL", "http://localhost:8000")
ENGINE_KEY = os.environ["ENGINE_KEY"]
async def execute(message: str, task_id: str) -> None:
headers = {
"X-Engine-Key": ENGINE_KEY,
"Content-Type": "application/json",
}
payload = {"message": message, "task_id": task_id}
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("POST", f"{ENGINE_URL}/execute",
json=payload, headers=headers) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line or not line.startswith("data:"):
continue
data = json.loads(line[len("data:"):].strip())
if "text" in data:
print(data["text"], end="", flush=True)
print()
if __name__ == "__main__":
asyncio.run(execute("Say hello in one short sentence.", task_id="demo-001"))
Handling other event types
In the example above we only print text_delta events. The Engine emits
many more — tool calls, thinking blocks, HITL requests, lifecycle events.
A more complete handler would route on event type:
def handle_event(event_type: str, data: dict) -> None:
if event_type == "text_delta":
print(data.get("text", ""), end="", flush=True)
elif event_type == "tool_call":
print(f"\n[tool call: {data['tool']}]", flush=True)
elif event_type == "tool_result":
print(f"\n[tool result: {data.get('output', '')[:60]}...]", flush=True)
elif event_type == "hitl_request":
print(f"\n[engine asks: {data['question']}]", flush=True)
elif event_type == "thread_lifecycle":
print(f"\n[{data['phase']}]", flush=True)
To get the event type from the SSE stream you need to track event: lines
as well as data: lines:
event_type = None
for line in resp.iter_lines():
if line.startswith("event:"):
event_type = line[len("event:"):].strip()
elif line.startswith("data:"):
data = json.loads(line[len("data:"):].strip())
handle_event(event_type, data)
event_type = None
Continuing a task
Reusing the same task_id continues the same conversation thread. The
Engine pulls the relevant memory and history automatically:
execute("Repeat what you just said in French.", task_id="demo-001")
Where to go next