.in) carries incoming user messages to your task. The output stream (.out) carries everything the agent produces back to your clients: AI generation parts (text, reasoning, tool calls) and any custom data parts you write.
Sessions also orchestrate the runs that process those streams. A Session is keyed on your stable id (externalId — for chat, the chatId) and owns its current run: when a run suspends, idles out, or hands off to a new version, the Session starts or swaps to a fresh run and the streams carry on. Clients keep sending and reading against the same id; they never know a run changed underneath.
chat.agent is built on Sessions. You can also use them directly for any pattern that needs durable bi-directional streaming across runs: long-lived agent inboxes, multi-step approval flows, server-to-server pipelines that survive worker restarts.
A minimal example
A task that echoes whatever lands on its input stream, and a backend that starts the session, sends a message, and reads the reply:trigger/inbox.ts
Your backend
send and the read — the streams are durable, so nothing is lost and the client code doesn’t change.
Sessions and runs
One Session spans many runs over its lifetime. The Session row trackscurrentRunId; the runs do the work:
- First run: created atomically by
sessions.start(no gap where the session exists but nothing is listening). - Idle suspend: a run blocked on
in.waitsuspends and frees compute. A new record on.inwakes it. - Continuation: when a run ends (idle timeout,
chat.endRun, a crash, a version upgrade), the next incoming record triggers a fresh run against the same Session. The new run picks up the streams where the old one left off.
externalId) is what your clients address. See How it works for how chat.agent drives this loop.
When to reach for Sessions directly
chat.agent handles 90% of chat-shaped workloads — message accumulation, the turn loop, stop signals, lifecycle hooks. Use the raw sessions API when you need any of:
- Non-chat conversational state: an agent inbox where each “turn” is a webhook event rather than a UI message.
- Server-to-server bi-directional streaming where an external service produces records the task consumes (and vice-versa) over the same durable channel.
- A custom turn loop where the agent abstraction doesn’t fit but you still want session-survival across runs.
chat.agent or chat.createSession.
sessions namespace
sessions.start(body, requestOptions?)
Atomically create a Session row and trigger its first run. Idempotent on (env, externalId) — two concurrent calls with the same externalId converge to one session.
| Field | Type | Notes |
|---|---|---|
type | string | Free-form discriminator. chat.agent uses "chat.agent". |
externalId | string? | Your stable identity. Cannot start with session_ (reserved). |
taskIdentifier | string | Task this session triggers runs against. |
triggerConfig | SessionTriggerConfig | Trigger options applied to every run: tags, queue, machine, maxAttempts, idleTimeoutInSeconds, basePayload. |
tags | string[]? | Up to 10 tags on the Session row (separate from triggerConfig.tags). |
metadata | Record<string, unknown>? | Arbitrary JSON. |
expiresAt | Date? | Hard retention deadline. |
CreatedSessionResponseBody:
| Field | Type | Notes |
|---|---|---|
id | string | Server-assigned session_* friendlyId. |
runId | string | The first run created alongside the session. |
publicAccessToken | string | Session-scoped PAT (read:sessions:{id} + write:sessions:{id}). |
isCached | boolean | true if the session already existed (idempotent upsert). |
sessions.retrieve(idOrExternalId, requestOptions?)
Retrieve a Session by either its server-assigned session_* id or your user-supplied externalId. The server disambiguates via the session_ prefix.
sessions.update(idOrExternalId, body, requestOptions?)
Mutate tags, metadata, or externalId on an existing Session. Pass externalId: null to explicitly clear it.
sessions.close(idOrExternalId, body?, requestOptions?)
Mark a Session as closed. Terminal and idempotent. The optional reason is stored on the row.
sessions.list(options?, requestOptions?)
Cursor-paginated list of Sessions in the current environment. Returns a CursorPagePromise you can iterate with for await.
| Filter | Type | Notes |
|---|---|---|
type | string | string[] | e.g. "chat.agent" |
tag | string | string[] | Matches triggerConfig.tags |
taskIdentifier | string | string[] | Filter by task |
externalId | string | Exact match |
status | "ACTIVE" | "CLOSED" | "EXPIRED" | Lifecycle state |
period / from / to | window | Time-range filter |
limit / after / before | cursor | Pagination (1–100 per page; default 20) |
sessions.open(idOrExternalId)
Open a lightweight SessionHandle to the realtime channels. Does not hit the network — each handle method calls the corresponding endpoint lazily.
SessionHandle
streams.define (out) and streams.input (in), but are session-scoped rather than run-scoped — they survive across run boundaries.
session.out — task → clients
The output channel. The task writes; external clients (browser, server action, another task) read via SSE.
out.append(value, options?)
Append a single record. Routes through writer internally so SSE consumers see the same parsed-object shape on every event.
out.pipe(stream, options?)
Pipe an AsyncIterable or ReadableStream directly to S2 (the durable backing store). Returns { stream, waitUntilComplete }.
out.writer({ execute, ... })
Imperative writer. execute({ write, merge }) runs against an in-memory queue whose records are piped to S2.
out.read(options?)
Subscribe to SSE records on .out. Returns an async-iterable stream with auto-retry and Last-Event-ID resume.
out.writeControl(subtype, extraHeaders?)
Write a Trigger control record. Carries a trigger-control header valued with subtype (e.g. turn-complete, upgrade-required); the body is empty. The SDK transport filters control records out of the consumer-facing chunk stream — readers route them via onControl instead.
Returns { lastEventId } — useful for trim chains.
out.trimTo(earliestSeqNum)
Append an S2 trim command. Records with seq_num < earliestSeqNum are eventually deleted. Idempotent and monotonic. chat.agent uses this to keep session.out bounded to roughly one turn at steady state.
session.in — clients → task
The input channel. External clients call send; the task consumes via on / once / peek / wait / waitWithIdleTimeout.
in.send(value, requestOptions?)
Append a single record. Called from outside the task (browser, server action, another task).
in.on(handler)
Register a handler that fires for every record landing on .in. Buffered records flush on attach. Returns { off }.
in.once(options?)
Wait for the next record without suspending the run. { ok: true, output } or { ok: false, error } on timeout. Chain .unwrap() to get the data directly.
in.peek()
Non-blocking peek at the head of the .in buffer.
in.wait(options?)
Suspend the current run until the next record arrives — frees compute while blocked. Only callable from inside task.run().
in.waitWithIdleTimeout({ idleTimeoutInSeconds, timeout, ... })
Hybrid: stay warm for idleTimeoutInSeconds, then suspend via wait if nothing arrives. chat.agent’s turn loop uses this to balance responsiveness and cost.
in.lastDispatchedSeqNum()
The highest S2 seq_num this channel has delivered to a consumer. Used by chat.agent to persist a resume cursor on each turn-complete so the next worker boot subscribes past already-processed records.
Authorization
Browser and server-side clients use a session-scoped Public Access Token:/sessions/{externalId}/... and /sessions/session_*/....
For the chat.agent transport, auth.createPublicToken is wrapped by accessToken in useTriggerChatTransport; for direct session access from your server, mint a token per request just like any other realtime resource.
See also
- How it works — How
chat.agentbuilds on Sessions. - Backend —
chat.agent/chat.createSession/ rawtask()with chat primitives. - Client Protocol — The wire-level view of
.in/appendand.outSSE. - Persistence and replay — How tails are read at boot.

