Skip to main content
The AI Agents and Prompts surface ships as part of the v4.5 release candidate. Install with @trigger.dev/sdk@rc (or pin 4.5.0-rc.0 or later) to use these features — they aren’t yet on the latest stable, and APIs may still change before the 4.5.0 GA. See supported AI SDK versions and the AI chat changelog for details.
A custom agent is a task you register with chat.customAgent() and drive yourself — either with the managed turn iterator from chat.createSession(), or with a fully hand-rolled loop over the raw chat primitives. You give up chat.agent()’s lifecycle hooks and automatic continuation recovery; you gain inline control over every turn, and (at the lowest level) full control over the stream conversion. See the comparison table before dropping down. The frontend is unchanged either way: all levels speak the same wire protocol, so useTriggerChatTransport points at a custom agent exactly like a chat.agent().

chat.customAgent()

chat.customAgent() is a thin wrapper around task() that does two things: it registers the task as an agent (so it appears in the agent dashboard, the playground, and the MCP server’s list_agents), and it binds the run to its backing Session so the chat.* primitives resolve to the right .in/.out channels. There is no managed lifecycle — no turn loop, no hooks, no preload handling. A plain task() works with the same primitives but stays invisible to the agent surfaces, so prefer customAgent unless you specifically don’t want the task listed as an agent. Inside the wrapper, pick one of two loop styles:
  • Managed loopchat.createSession() yields turns; the SDK handles stop signals, accumulation, idle suspend/resume, and turn-complete signaling. You write the turn body.
  • Hand-rolled loop — you write the loop itself with chat.messages, MessageAccumulator, pipeAndCapture, and writeTurnComplete. The right choice when you need complete control over .toUIMessageStream() (e.g. onFinish, originalMessages) beyond what chat.setUIMessageStreamOptions() provides, or you’re implementing a custom protocol.

Managed loop: chat.createSession()

chat.createSession() gives you an async iterator of ChatTurn objects. Each turn arrives with the accumulated history, a combined stop+cancel signal, and helpers to finish the turn:
trigger/my-chat.ts
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText, stepCountIs } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const myChat = chat.customAgent({
  id: "my-chat",
  run: async (payload: ChatTaskWirePayload, { signal }) => {
    // One-time initialization — plain code, no hooks. Upsert, not create:
    // continuation runs boot with the row already in place.
    const clientData = payload.metadata as { userId: string };
    await db.chat.upsert({
      where: { id: payload.chatId },
      create: { id: payload.chatId, userId: clientData.userId },
      update: {},
    });

    const session = chat.createSession(payload, {
      signal,
      idleTimeoutInSeconds: 60,
      timeout: "1h",
    });

    for await (const turn of session) {
      // Persist the incoming user message BEFORE streaming — this is your
      // onTurnStart equivalent. Without it, a page reload mid-stream
      // restores the assistant text (replayed from the session) but loses
      // the user message that prompted it.
      await db.chat.update({
        where: { id: turn.chatId },
        data: { messages: turn.uiMessages },
      });

      const result = streamText({
        model: anthropic("claude-sonnet-4-5"),
        messages: turn.messages,
        abortSignal: turn.signal,
        stopWhen: stepCountIs(15),
      });

      // Pipe, capture, accumulate, and signal turn-complete — all in one call
      await turn.complete(result);

      // Persist the full exchange after the turn — your onTurnComplete equivalent
      await db.chat.update({
        where: { id: turn.chatId },
        data: { messages: turn.uiMessages },
      });
    }
  },
});
If you pass compaction or pendingMessages to chat.createSession(), you must also pass prepareStep: turn.prepareStep() to streamText (or spread chat.toStreamTextOptions(), which wires it automatically). Without it, both features silently no-op.

ChatSessionOptions

OptionTypeDefaultDescription
signalAbortSignalrequiredRun-level cancel signal (from task context)
idleTimeoutInSecondsnumber30Seconds to stay idle between turns before suspending
timeoutstring"1h"Duration string for suspend timeout
maxTurnsnumber100Max turns before ending
compactionChatAgentCompactionOptionsundefinedAutomatic context compaction — same options as on chat.agent()
pendingMessagesPendingMessagesOptionsundefinedMid-execution message injection — same options as on chat.agent()
Between turns the run idles on waitWithIdleTimeout: after idleTimeoutInSeconds with no message it suspends (compute is freed), and the next message restores it on the same run — the same warm/suspended pipeline chat.agent() uses.

ChatTurn

Each turn yielded by the iterator provides:
FieldTypeDescription
numbernumberTurn number (0-indexed)
chatIdstringChat session ID
triggerstringWhat triggered this turn
clientDataunknownClient data from the transport
messagesModelMessage[]Full accumulated model messages — pass to streamText
uiMessagesUIMessage[]Full accumulated UI messages — use for persistence
signalAbortSignalCombined stop+cancel signal (fresh each turn)
stoppedbooleanWhether the user stopped generation this turn
continuationbooleanWhether this is a continuation run
previousTurnUsageLanguageModelUsage | undefinedToken usage from the previous turn (undefined on turn 0)
totalUsageLanguageModelUsageCumulative token usage across all completed turns
MethodDescription
turn.complete(source)Pipe stream, capture response, accumulate, and signal turn-complete
turn.done()Signal turn-complete only (when you have piped manually)
turn.addResponse(response)Add a response to the accumulator manually
turn.setMessages(uiMessages)Replace the accumulated messages — continuation seeding and on-demand compaction
turn.prepareStep()prepareStep callback wiring compaction + injection — pass to streamText when not spreading chat.toStreamTextOptions()

Continuation runs and history seeding

chat.agent() rebuilds conversation history automatically when a chat continues on a fresh run (after a cancel, crash, version upgrade, or TTL expiry) — via its snapshot/replay boot or your hydrateMessages hook. Custom agents do none of that: a continuation run starts with an empty accumulator, and history restoration is your job. With createSession, check turn.continuation on the first turn and seed from your store with turn.setMessages():
for await (const turn of session) {
  if (turn.continuation && turn.number === 0) {
    const row = await db.chat.findUnique({ where: { id: turn.chatId } });
    const stored = (row?.messages ?? []) as UIMessage[];
    if (stored.length > 0) {
      // Keep any incoming message that isn't already persisted
      const incoming = turn.uiMessages.filter((m) => !stored.some((s) => s.id === m.id));
      await turn.setMessages([...stored, ...incoming]);
    }
  }

  // ... streamText + turn.complete as usual
}
Without this, a resumed chat silently loses its history: the model sees only the message that triggered the continuation. In a hand-rolled loop, seed by passing the stored history into the turn-0 addIncoming call — shown in the example below.

turn.complete() vs manual control

turn.complete(result) is the one-call path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts on a stop, and writing the turn-complete chunk. For more control, you can do each step manually:
for await (const turn of session) {
  const result = streamText({
    model: anthropic("claude-sonnet-4-5"),
    messages: turn.messages,
    abortSignal: turn.signal,
    stopWhen: stepCountIs(15),
  });

  // Manual: pipe and capture separately
  const response = await chat.pipeAndCapture(result, { signal: turn.signal });

  if (response) {
    // Custom processing before accumulating
    await turn.addResponse(response);
  }

  // Custom persistence, analytics, etc.
  await db.chat.update({ ... });

  // Must call done() when not using complete()
  await turn.done();
}

Hand-rolled loop with primitives

For full control, skip createSession and compose the primitives directly:
PrimitiveDescription
chat.messagesInput stream for incoming messages — use .waitWithIdleTimeout() to wait for the next turn
chat.createStopSignal()Create a managed stop signal wired to the stop input stream
chat.pipeAndCapture(result)Pipe a StreamTextResult to the chat stream and capture the response
chat.writeTurnComplete()Signal the frontend that the current turn is complete
chat.MessageAccumulatorAccumulates conversation messages across turns
chat.pipe(stream)Pipe a stream to the frontend (no response capture)
chat.cleanupAbortedParts(msg)Clean up incomplete parts from a stopped response
A complete loop:
trigger/my-chat-raw.ts
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText, stepCountIs } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const myChat = chat.customAgent({
  id: "my-chat-raw",
  run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => {
    let currentPayload = payload;

    // Handle preload — wait for the first real message
    if (currentPayload.trigger === "preload") {
      const result = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for first message",
      });
      if (!result.ok) return;
      currentPayload = result.output;
    }

    const stop = chat.createStopSignal();
    const conversation = new chat.MessageAccumulator();

    // Continuation runs (cancel, crash, upgrade) start with an empty
    // accumulator — fetch stored history so turn 0 can seed it.
    let continuationSeed: UIMessage[] = [];
    if (currentPayload.continuation) {
      const row = await db.chat.findUnique({ where: { id: currentPayload.chatId } });
      continuationSeed = (row?.messages ?? []) as UIMessage[];
    }

    for (let turn = 0; turn < 100; turn++) {
      stop.reset();

      // The wire payload carries at most one new message per turn. Turn 0
      // REPLACES the accumulator, so seed stored history through
      // addIncoming together with the incoming message — a setMessages
      // call before the loop would be wiped here.
      const incoming = currentPayload.message ? [currentPayload.message] : [];
      const turnInput =
        turn === 0 && continuationSeed.length > 0
          ? [...continuationSeed.filter((s) => !incoming.some((m) => m.id === s.id)), ...incoming]
          : incoming;
      const messages = await conversation.addIncoming(turnInput, currentPayload.trigger, turn);

      // Persist the incoming user message before streaming so a
      // mid-stream reload doesn't lose it.
      await db.chat.update({
        where: { id: currentPayload.chatId },
        data: { messages: conversation.uiMessages },
      });

      const combinedSignal = AbortSignal.any([runSignal, stop.signal]);

      const result = streamText({
        model: anthropic("claude-sonnet-4-5"),
        messages,
        abortSignal: combinedSignal,
        stopWhen: stepCountIs(15),
      });

      let response;
      try {
        response = await chat.pipeAndCapture(result, { signal: combinedSignal });
      } catch (error) {
        if (error instanceof Error && error.name === "AbortError") {
          if (runSignal.aborted) break;
          // Stop — fall through to accumulate partial
        } else {
          throw error;
        }
      }

      if (response) {
        const cleaned =
          stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response;
        await conversation.addResponse(cleaned);
      }

      if (runSignal.aborted) break;

      // Persist, analytics, etc.
      await db.chat.update({
        where: { id: currentPayload.chatId },
        data: { messages: conversation.uiMessages },
      });

      await chat.writeTurnComplete();

      // Wait for the next message
      const next = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for next message",
      });
      if (!next.ok) break;
      currentPayload = next.output;
    }

    stop.cleanup();
  },
});

MessageAccumulator

addIncoming(messages, trigger, turn) has two modes:
  • Turn 0 or trigger === "regenerate-message": replaces the accumulator with exactly what you pass. This is why continuation seeding goes through addIncoming (above), and why a regenerate needs you to slice your own history — the wire omits the message on regenerate, so pass the stored history minus the last assistant message.
  • Every other turn: appends what you pass (the wire carries at most the one new user message).
const conversation = new chat.MessageAccumulator();

// Returns full accumulated ModelMessage[] for streamText
const messages = await conversation.addIncoming(
  payload.message ? [payload.message] : [],
  payload.trigger,
  turn
);

// After piping, add the response
const response = await chat.pipeAndCapture(result);
if (response) await conversation.addResponse(response);

// Access accumulated messages for persistence
conversation.uiMessages; // UIMessage[]
conversation.modelMessages; // ModelMessage[]
The constructor also accepts compaction and pendingMessages options (same shapes as on chat.agent()); pass prepareStep: conversation.prepareStep() to streamText to activate them. See pending messages for the manual steering wiring.

Hand-rolled loop checklist

Things the managed levels do for you that a raw loop has to get right:
  • Don’t bare-await result.totalUsage. On a stop-abort the AI SDK’s totalUsage promise never settles, which wedges the loop forever. Race it with a timeout:
    const turnUsage = await Promise.race([
      result.totalUsage,
      new Promise((resolve) => setTimeout(() => resolve(undefined), 2000)),
    ]);
    
  • Persist the user message before streaming (shown in the example above). The session replay restores the assistant’s streamed text after a page reload, but nothing restores a user message you haven’t written down.
  • Seed history on continuation runs through the turn-0 addIncoming (shown above). payload.continuation is true when this run picked up an existing chat; the accumulator starts empty — and because turn 0 replaces the accumulator, a setMessages call before the loop gets wiped.
  • Clean up aborted parts on a stop with chat.cleanupAbortedParts() before accumulating, or the partial response carries half-open tool calls into the next turn’s prompt.
  • Read payload.message (singular). The wire payload carries at most one new message per turn; there is no messages array on the payload.

Next steps

Backend overview

The three abstraction levels compared, and everything chat.agent() adds on top.

Sessions

The durable stream pair every agent — managed or custom — is built on.

Compaction

Automatic context compression — works with createSession and MessageAccumulator.

Client protocol

The wire format your loop is speaking, chunk by chunk.