Skip to main content

Documentation Index

Fetch the complete documentation index at: https://internal.september.wtf/llms.txt

Use this file to discover all available pages before exploring further.

The Engine speaks plain HTTP and Server-Sent Events. There’s no magic in the wire format. You can integrate from any language with an HTTP client and a JSON parser. This page covers what’s available and what’s on the roadmap.

What we ship

There are no official Engine client libraries today. By design — the API is small enough that a wrapper is more code than the calls themselves. The quickstarts at curl, Python, and Node each show a complete client in around 30 lines.

Using the Engine from your language of choice

Whatever HTTP client you’d reach for is what the Engine wants:
LanguageWhat works
Pythonhttpx (sync or async), aiohttp, requests (with stream=True)
Node / TypeScriptBuilt-in fetch (Node 18+), axios, undici
Gonet/http standard library
Rustreqwest, hyper
Rubynet/http, httparty
Java / KotlinOkHttp, Java HTTPClient
.NETHttpClient
Shellcurl
The same three things matter regardless of language:
  • Set X-Engine-Key on every request except /health.
  • Disable response buffering for streaming (so you see SSE events as they arrive).
  • Set the read timeout high (or None) — turns can take minutes.

Wrapping the API: when it’s worth it

A thin client wrapper makes sense when:
  • You’ll have many call sites in your codebase and want shared retry logic.
  • You want strongly-typed event handling (e.g. TypeScript discriminated unions for SSE event types).
  • You need to mock the Engine in tests.
A wrapper is overkill when:
  • You have one or two call sites.
  • The API is moving and the wrapper would lag.
Most teams write a 100-line wrapper around their three or four most common patterns and let everything else go through plain HTTP.

What a useful wrapper has

class EngineClient:
    def __init__(self, url: str, key: str):
        self.url = url
        self.headers = {"X-Engine-Key": key}

    def execute(self, message: str, task_id: str) -> Iterator[Event]:
        """Yield events as the agent runs."""
        ...

    def replay(self, task_id: str, after: int = 0) -> Iterator[Event]:
        """Replay buffered events from a given timestamp."""
        ...

    def hitl_respond(self, task_id: str, answer: str) -> dict:
        """Answer a pending HITL request."""
        ...

    def health(self) -> dict:
        """Liveness check."""
        ...

    def memory_search(self, store: str, query: str, limit: int = 20) -> list:
        """Search a memory store."""
        ...

    def feedback(self, task_id: str, feedback_type: str, **kwargs) -> dict:
        """Submit feedback."""
        ...
That’s the surface area worth wrapping. Everything else is infrastructure (auth refresh, asset connections) that doesn’t need a nice abstraction.

Resilient streaming wrapper

A wrapper that handles reconnection cleanly is worth the lines:
class ResilientStream:
    def __init__(self, client: EngineClient, message: str, task_id: str):
        self.client = client
        self.message = message
        self.task_id = task_id
        self.last_ts = 0

    def __iter__(self):
        try:
            for event in self.client.execute(self.message, self.task_id):
                self.last_ts = event["timestamp"]
                yield event
        except (ConnectionError, ReadTimeout):
            yield from self._replay()

    def _replay(self):
        for event in self.client.replay(self.task_id, after=self.last_ts):
            self.last_ts = event["timestamp"]
            yield event
See Durability for the underlying semantics.

Roadmap

Official client libraries are planned for:
  1. Pythonengine-client (or similar), pip-installable.
  2. TypeScript — npm package, isomorphic (Node + browser).
When they ship, they’ll cover:
  • Type-safe event handling.
  • Built-in retry / reconnection.
  • Async iterators / generators for streams.
  • Mock support for tests.
Until then, copy the quickstart code and tune to your needs.

See also