Skip to main content

Documentation Index

Fetch the complete documentation index at: https://s2.dev/docs/llms.txt

Use this file to discover all available pages before exploring further.

S2 integrates with the Anthropic Messages API through the @s2-dev/resumable-stream package. The integration persists Anthropic stream events to S2 and replays them as Anthropic-style SSE.

Prerequisites

npm install @s2-dev/resumable-stream @anthropic-ai/sdk
Create an S2 access token and basin first:
  • Sign up here, generate an access token, and set it as S2_ACCESS_TOKEN.
  • Create a basin with Create Stream on Append and Create Stream on Read enabled, and set it as S2_BASIN.
  • Set ANTHROPIC_API_KEY for the server route that calls Anthropic.
The helper does not create streams manually. Stream creation should be handled by the basin configuration above.

Import paths

The integration has separate server and client entrypoints.
// Server routes only
import { createResumableChat } from '@s2-dev/resumable-stream/anthropic';

// Browser/client code only
import { createChatClient } from '@s2-dev/resumable-stream/anthropic/client';
The server entrypoint uses S2 credentials. The client entrypoint is browser-safe and only talks to your HTTP routes.

Setup

Create one chat helper and share it across routes:
lib/s2.ts
import { createResumableChat } from '@s2-dev/resumable-stream/anthropic';

export const chat = createResumableChat({
  accessToken: process.env.S2_ACCESS_TOKEN!,
  basin: process.env.S2_BASIN!,
  mode: 'session',
});
In the route snippets below, chat is this S2 helper. anthropic is the Anthropic SDK client created in the POST route. Use one stable S2 stream name per chat:
lib/stream-name.ts
const CHAT_ID_PATTERN = /^[a-zA-Z0-9_-]{1,64}$/;

export function isValidChatId(value: unknown): value is string {
  return typeof value === 'string' && CHAT_ID_PATTERN.test(value);
}

export function streamName(chatId: string): string {
  return `anthropic-chat-${chatId}`;
}

Server: POST route

makeResumable persists each yielded event. In session mode, yield a user_message event before the Anthropic stream when S2 should store both sides of the chat.
app/api/chat/route.ts
import Anthropic from '@anthropic-ai/sdk';
import type { MessageParam } from '@anthropic-ai/sdk/resources/messages';
import {
  type HistorySnapshot,
  type HistoryTurn,
} from '@s2-dev/resumable-stream/anthropic';
import { chat } from '@/lib/s2';
import { isValidChatId, streamName } from '@/lib/stream-name';

const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY!,
});

function turnsForModel(turns: HistoryTurn[]): MessageParam[] {
  const messages: MessageParam[] = [];
  for (const { user, assistant } of turns) {
    if (!assistant) continue;
    messages.push(
      { role: 'user', content: user },
      { role: 'assistant', content: assistant.content },
    );
  }
  return messages;
}

export async function POST(req: Request) {
  const body = await req.json();
  if (
    !isValidChatId(body.id) ||
    typeof body.message !== 'string' ||
    body.message.length === 0
  ) {
    return new Response('Missing or invalid id/message', { status: 400 });
  }

  const historyResponse = await chat.history(streamName(body.id));
  const history = (await historyResponse.json()) as HistorySnapshot;
  const messages: MessageParam[] = [
    ...turnsForModel(history.turns),
    { role: 'user', content: body.message },
  ];

  const source = (async function* () {
    yield { type: 'user_message', message: body.message } as const;
    yield* anthropic.messages.stream({
      model: process.env.ANTHROPIC_MODEL ?? 'claude-haiku-4-5-20251001',
      max_tokens: 1024,
      messages,
    });
  })();

  return chat.makeResumable(streamName(body.id), source, {
    delivery: 'replay',
  });
}
delivery: 'replay' makes the POST return 202. Clients read chunks from the replay route, so every tab can consume the same S2 records. If your runtime can terminate work after a response returns, pass its background-task hook as waitUntil.

Server: replay

The replay route reads from S2 and returns Anthropic-style SSE frames.
app/api/chat/stream/route.ts
import { chat } from '@/lib/s2';
import { isValidChatId, streamName } from '@/lib/stream-name';

function parseFromSeqNum(value: string | null): number | undefined {
  if (value === null) return undefined;
  const parsed = Number.parseInt(value, 10);
  return Number.isSafeInteger(parsed) && parsed >= 0 ? parsed : undefined;
}

export async function GET(req: Request) {
  const url = new URL(req.url);
  const id = url.searchParams.get('id');
  if (!isValidChatId(id)) {
    return new Response('Missing id query parameter', { status: 400 });
  }

  return chat.replay(streamName(id), {
    fromSeqNum: parseFromSeqNum(url.searchParams.get('from')),
    live: url.searchParams.get('live') === '1',
  });
}
Use live: true in session mode when a tab should stay subscribed after the current assistant turn ends.

Server: history

The history route returns a JSON snapshot derived from S2.
app/api/chat/history/route.ts
import { chat } from '@/lib/s2';
import { isValidChatId, streamName } from '@/lib/stream-name';

export async function GET(req: Request) {
  const url = new URL(req.url);
  const id = url.searchParams.get('id');
  if (!isValidChatId(id)) {
    return new Response('Missing id query parameter', { status: 400 });
  }

  return chat.history(streamName(id));
}
The response shape is:
{
  turns: Array<{
    user: string;
    assistant?: Message;
    error?: { type: string; message: string };
  }>;
  messages: Message[];
  nextSeqNum: number;
}
Render turns for ordered chat history, then replay from nextSeqNum to read anything still in progress.

Client

Pass the send, history, and replay endpoints to the Anthropic client helper.
app/chat-client.ts
import { createChatClient } from '@s2-dev/resumable-stream/anthropic/client';

export function createAnthropicClient(chatId: string) {
  const id = encodeURIComponent(chatId);
  return createChatClient({
    sendUrl: '/api/chat',
    historyUrl: `/api/chat/history?id=${id}`,
    subscribeUrl: (cursor) =>
      `/api/chat/stream?id=${id}&from=${cursor ?? 0}&live=1`,
  });
}
For a live session listener, pass stopOnTerminal: false so message_stop ends the current assistant turn without closing the subscription.
const client = createAnthropicClient(chatId);

const history = await client.loadHistory();
renderTurns(history.turns);

const subscription = await client.subscribe({ stopOnTerminal: false });

for await (const event of subscription.events) {
  renderEvent(event);
}
Use a normal POST while a live subscription is open. The subscription receives the stored S2 records.
await fetch('/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ id: chatId, message }),
});
If you are not keeping a live subscription open, client.send(...) can start a turn and return a per-send subscription.
const sent = await client.send({ id: chatId, message });
for await (const event of sent.events) {
  renderEvent(event);
}

User messages

The Anthropic helper does not infer user messages. Store them by yielding a user_message event before the Anthropic stream if S2 should be the chat log.
yield { type: 'user_message', message } as const;
yield* anthropic.messages.stream(...);
Omit user_message only when another store owns user messages, or when you do not want user text stored in S2. In that setup, the app must read user messages from the other store when rendering history and building the next Anthropic messages array.

Flow

  1. The browser loads chat.history(...) and sets its cursor to nextSeqNum.
  2. The browser opens chat.replay(..., { live: true }).
  3. The browser sends POST /api/chat.
  4. The server yields user_message, then Anthropic stream events.
  5. makeResumable appends those records to S2.
  6. Replay delivers the same records to subscribed tabs.
  7. Reconnects continue from the last SSE id.

Modes

mode controls how generations map to S2 streams.
modeuse whenrefresh behavior
single-useEach assistant turn has its own stream name.Replays only the active turn.
sharedOne stream name should hold the active generation only.Replays the active generation after lease/fence coordination.
sessionYou are building a full chat session.Replays history and tails later turns from one durable log.
delivery controls how the request that starts generation returns chunks.
deliveryuse whenresponse behavior
responseYou want the simplest one-tab flow.The POST response streams the assistant output directly.
replayYou want refresh recovery, session history, or multi-tab sync.The POST returns 202; clients read from replay.
For chat sessions, the usual setup is mode: 'session', delivery: 'replay', and replay(..., { live: true }).

Configuration

Relevant options on createResumableChat:
optiondefaultwhat it controls
mode"single-use""single-use" uses one stream per generation. "shared" reuses one active-generation stream. "session" stores a durable chat session log.
endpointsS2 defaultsOptional endpoint overrides, for example when using S2 Lite.
leaseDurationMs5000Only for shared mode. Max pause within an active generation before a new claim can take it over.
onErrorgeneric messageMaps upstream errors to the stored error event shown to clients.
batchSize / lingerDuration10 / 50msS2 append batching knobs.
Relevant options on makeResumable:
optiondefaultwhat it controls
delivery"response""response" streams on the POST response. "replay" returns 202 and expects clients to read from replay.
waitUntilnoneKeeps persistence running after the response returns in runtimes that support background tasks.
Relevant options on replay:
optionwhat it controls
fromSeqNumS2 cursor to resume from. Use history.nextSeqNum after loading history.
liveSession mode only. Keeps the SSE open at the tail for future turns.
Relevant options on createChatClient:
optionwhat it controls
sendUrlPOST endpoint that starts generation.
subscribeUrlGET endpoint that returns replay SSE. String URLs get ?from= added automatically; function URLs receive the cursor.
historyUrlOptional URL returning { turns, messages, nextSeqNum }.
stopUrlOptional URL called by stop() with DELETE.
headers / credentialsAuth and fetch options for send, stop, history, and subscribe requests.
reconnectBackoffMsMillisecond backoff schedule for reconnects. Pass [] to disable reconnect.
fetchCustom fetch implementation for tests or framework integrations.
Relevant options on send and subscribe:
optiondefaultwhat it controls
signalnoneAbort signal for the request or subscription.
stopOnTerminaltrueSet false for live session streams so message_stop does not close the listener.

Example

A runnable Bun server + vanilla-JS client with session replay and live sync is available in the SDK repo: examples/anthropic-resumable-chat.