Skip to main content
This page walks through your first Engine call from Python. We use httpx because it has good streaming support and a nice async API, but anything that speaks HTTP and can read SSE will work.

Install

pip install httpx

A minimal client

Save this as engine_quickstart.py:
import json
import os
import httpx

ENGINE_URL = os.environ.get("ENGINE_URL", "http://localhost:8000")
ENGINE_KEY = os.environ["ENGINE_KEY"]

def execute(message: str, task_id: str) -> None:
    """Stream a response from the Engine and print text deltas."""
    headers = {
        "X-Engine-Key": ENGINE_KEY,
        "Content-Type": "application/json",
    }
    payload = {"message": message, "task_id": task_id}

    with httpx.Client(timeout=None) as client:
        with client.stream("POST", f"{ENGINE_URL}/execute",
                           json=payload, headers=headers) as resp:
            resp.raise_for_status()
            for line in resp.iter_lines():
                if not line or not line.startswith("data:"):
                    continue
                data = json.loads(line[len("data:"):].strip())
                if "text" in data:
                    print(data["text"], end="", flush=True)
            print()  # final newline

if __name__ == "__main__":
    execute("Say hello in one short sentence.", task_id="demo-001")
Set your environment and run:
export ENGINE_URL=http://localhost:8000
export ENGINE_KEY=dev-engine-key

python engine_quickstart.py
You should see the agent’s reply stream into your terminal, one chunk at a time.

Async version

If your application is async, here’s the same client using httpx.AsyncClient:
import asyncio
import json
import os
import httpx

ENGINE_URL = os.environ.get("ENGINE_URL", "http://localhost:8000")
ENGINE_KEY = os.environ["ENGINE_KEY"]

async def execute(message: str, task_id: str) -> None:
    headers = {
        "X-Engine-Key": ENGINE_KEY,
        "Content-Type": "application/json",
    }
    payload = {"message": message, "task_id": task_id}

    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("POST", f"{ENGINE_URL}/execute",
                                 json=payload, headers=headers) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line or not line.startswith("data:"):
                    continue
                data = json.loads(line[len("data:"):].strip())
                if "text" in data:
                    print(data["text"], end="", flush=True)
            print()

if __name__ == "__main__":
    asyncio.run(execute("Say hello in one short sentence.", task_id="demo-001"))

Handling other event types

In the example above we only print text_delta events. The Engine emits many more — tool calls, thinking blocks, HITL requests, lifecycle events. A more complete handler would route on event type:
def handle_event(event_type: str, data: dict) -> None:
    if event_type == "text_delta":
        print(data.get("text", ""), end="", flush=True)
    elif event_type == "tool_call":
        print(f"\n[tool call: {data['tool']}]", flush=True)
    elif event_type == "tool_result":
        print(f"\n[tool result: {data.get('output', '')[:60]}...]", flush=True)
    elif event_type == "hitl_request":
        print(f"\n[engine asks: {data['question']}]", flush=True)
    elif event_type == "thread_lifecycle":
        print(f"\n[{data['phase']}]", flush=True)
To get the event type from the SSE stream you need to track event: lines as well as data: lines:
event_type = None
for line in resp.iter_lines():
    if line.startswith("event:"):
        event_type = line[len("event:"):].strip()
    elif line.startswith("data:"):
        data = json.loads(line[len("data:"):].strip())
        handle_event(event_type, data)
        event_type = None

Continuing a task

Reusing the same task_id continues the same conversation thread. The Engine pulls the relevant memory and history automatically:
execute("Repeat what you just said in French.", task_id="demo-001")

Where to go next