Stream operations comprise the core data plane of S2, and are centered around the three stream verbs:
The SDKs provide several layers of abstraction depending on your streaming needs.
Methods
SDKs expose stream operations in two forms:
Unary methods make a single HTTP request and return a response. append, read, and checkTail are unary. They’re straightforward but incur per-request overhead.
Streaming methods open a persistent connection for continuous data flow. appendSession sends batches and receives acks over a bidirectional stream. readSession receives records as a server-to-client stream. Sessions enable pipelining and reduce per-record latency.
Under the hood, sessions use the S2S protocol, a minimal binary framing layer over HTTP/2.
In environments without HTTP/2 (notably browsers), the TypeScript SDK falls back to HTTP/1.1. You get the same session APIs, just without protocol-level multiplexing.
Append
Batches are the atomic unit of appending. A batch can contain up to 1000 records or 1 MiB of data. Each append—whether unary or via a session—writes exactly one batch.
For anything beyond simple one-off writes, use an append session. Sessions enable pipelining (multiple batches in flight), maintain strict ordering, and gracefully handle throttling. The Producer API builds on sessions to provide per-record semantics with automatic batching.
For concurrency control, appends support strongly-consistent matchSeqNum (optimistic locking) and fencing tokens. See concurrency control in the API docs.
A single batch of records can be appended by calling append on a stream client:
const stream = basin.stream(streamName);
const ack = await stream.append(
AppendInput.create([
AppendRecord.string({ body: "first event" }),
AppendRecord.string({ body: "second event" }),
]),
);
// ack tells us where the records landed
console.log(`Wrote records ${ack.start.seqNum} through ${ack.end.seqNum - 1}`);
ack, _ := stream.Append(ctx, &s2.AppendInput{
Records: []s2.AppendRecord{
{Body: []byte("first event")},
{Body: []byte("second event")},
},
})
// ack tells us where the records landed
fmt.Printf("Wrote records %d through %d\n", ack.Start.SeqNum, ack.End.SeqNum-1)
let stream = basin.stream(stream_name.clone());
let records = AppendRecordBatch::try_from_iter([
AppendRecord::new("first event")?,
AppendRecord::new("second event")?,
])?;
let ack = stream.append(AppendInput::new(records)).await?;
// ack tells us where the records landed
println!(
"Wrote records {} through {}",
ack.start.seq_num,
ack.end.seq_num - 1
);
This works well for simple cases, but each append is a separate HTTP request.
For higher throughput, and guaranteeing ordering across batches, use an append session or the producer API.
Append Session
An append session maintains a bidirectional stream with S2: you send batches of records, and S2 sends back acknowledgements.
This enables pipelining - you can have multiple batches in flight simultaneously, dramatically improving throughput compared to waiting for each append to complete before sending the next, while still maintaining the ordering of records across batches.
Contrast this with multiple concurrent unary append() requests; while this would also allow high throughput, each concurrent append would be independent, and the ordering in which concurrent batches become durable would not be guaranteed.
A session provides a stateful handle on a stream that is being appended to. Batches can be appended to that session by calling submit().
Submitting a batch is an async function. It resolves to a BatchSubmitTicket when the batch is accepted by the session; this async func can exhibit backpressure, to prevent overwhelming the session.
A ticket tracks an append while it is pending. The batch is only durable once it has been acknowledged by S2. This can be awaited via the ticket’s ack() method, which resolves only when the written batch is fully durable on object storage. The resulting AppendAck contains information about the batch’s resulting position in the stream.
const session = await stream.appendSession();
// Submit a batch - this enqueues it and returns a ticket
const ticket = await session.submit(
AppendInput.create([
AppendRecord.string({ body: "event-1" }),
AppendRecord.string({ body: "event-2" }),
]),
);
// The ticket resolves when the batch is durable
const ack = await ticket.ack();
console.log(`Durable at seqNum ${ack.start.seqNum}`);
await session.close();
session, _ := stream.AppendSession(ctx, nil)
defer session.Close()
// Submit a batch - this enqueues it and returns a ticket
fut, _ := session.Submit(&s2.AppendInput{
Records: []s2.AppendRecord{
{Body: []byte("event-1")},
{Body: []byte("event-2")},
},
})
// Wait for enqueue (this is where backpressure happens)
ticket, _ := fut.Wait(ctx)
// Wait for durability
ack, _ := ticket.Ack(ctx)
fmt.Printf("Durable at seqNum %d\n", ack.Start.SeqNum)
let session = stream.append_session(AppendSessionConfig::new());
// Submit a batch - this enqueues it and returns a ticket
let records = AppendRecordBatch::try_from_iter([
AppendRecord::new("event-1")?,
AppendRecord::new("event-2")?,
])?;
let ticket = session.submit(AppendInput::new(records)).await?;
// Wait for durability
let ack = ticket.await?;
println!("Durable at seqNum {}", ack.start.seq_num);
session.close().await?;
The ticket allows the SDK user (that’s you!) to decide whether to await confirmation of a batch before moving on.
For some use cases, “fire-and-forget” may be fine; for others, you need to confirm that either a specific append has finished, or that all writes up to a certain point have been persisted.
If an acknowledgement is not received for a batch within the configured requestTimeout, the SDK will mark it failed and either retry, if so configured, or surface an error.
Backpressure
The session tracks how much data is “in flight” (submitted but not yet acknowledged). When you hit the limits, submit() blocks until capacity frees up.
This is intentional: it prevents unbounded memory growth and naturally throttles your application to match what S2 can handle.
| Option | Default | Description |
|---|
maxInflightBytes | 5 MiB | Maximum unacknowledged bytes before submit() blocks |
maxInflightBatches | None | Maximum unacknowledged batches (optional additional limit) |
Batches and throughput
Batches are the atomic unit. While each individual stream can support tens of MiB/second worth of appends, they are limited to 200 append batches per second — callers who try to submit more will be automatically throttled (or unary callers may receive a 429 with Retry-After info).
So for achieving the highest throughput, make sure to send batches approaching the single batch limit of 1000 records of 1MiB of data.
The SDKs provide convenience utilities for auto-batching, which may be used directly with append sessions, or as part of the Producer API.
Producer
The Producer API provides a record-oriented interface over append sessions. Instead of thinking in batches, you submit individual records to a producer and get back a ticket for each one. The producer handles batching transparently based on configurable thresholds.
This is particularly useful when:
- You’re receiving records one at a time (from a message queue, HTTP requests, etc.)
- You want confirmation that specific records are durable
- You want the SDK to handle batch lingering, back pressure, ordering for you, all while being able to pipeline writes
const producer = new Producer(
new BatchTransform({ lingerDurationMillis: 5 }),
await stream.appendSession(),
);
// Submit individual records
const ticket = await producer.submit(
AppendRecord.string({ body: "my event" }),
);
// Get the exact sequence number for this record
const ack = await ticket.ack();
console.log(`Record durable at seqNum ${ack.seqNum()}`);
await producer.close();
session, _ := stream.AppendSession(ctx, nil)
batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{
Linger: 5 * time.Millisecond,
})
producer := s2.NewProducer(ctx, batcher, session)
// Submit individual records
fut, _ := producer.Submit(s2.AppendRecord{Body: []byte("my event")})
ticket, _ := fut.Wait(ctx)
ack, _ := ticket.Ack(ctx)
fmt.Printf("Record durable at seqNum %d\n", ack.SeqNum())
producer.Close()
let producer = stream.producer(
ProducerConfig::new()
.with_batching(BatchingConfig::new().with_linger(Duration::from_millis(5))),
);
// Submit individual records
let ticket = producer.submit(AppendRecord::new("my event")?).await?;
// Get the exact sequence number
let ack = ticket.await?;
println!("Record durable at seqNum {}", ack.seq_num);
producer.close().await?;
The producer maintains the same ordering guarantee as the underlying append session: records are durable in exactly the order you submitted them. Even though they’re batched, the producer tracks which record is which, so your per-record ticket resolves with the correct sequence number.
Batching Configuration
| Option | Default | Description |
|---|
linger | 5ms | How long to wait for more records before flushing a partial batch |
maxBatchRecords | 1000 | Flush when the batch reaches this many records |
maxBatchBytes | 1 MiB | Flush when the batch reaches this size |
The producer flushes whenever any threshold is hit. For latency-sensitive applications, a shorter linger time means records are written sooner. For throughput-sensitive applications, a longer linger time means more efficient batching.
Read
At the simplest level, you can read a batch of records with a single call:
const batch = await stream.read({
start: { from: { seqNum: 0 } },
stop: { limits: { count: 100 } },
});
for (const record of batch.records) {
console.log(`[${record.seqNum}] ${record.body}`);
}
batch, _ := stream.Read(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Count: s2.Uint64(100),
})
for _, record := range batch.Records {
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
let batch = stream
.read(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
)
.await?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
Unary reads return at most one batch: up to 1000 records or 1 MiB of data. For larger reads or to follow the stream in real-time, use a read session.
Navigating a stream
Every record in a stream is accessible by its sequence number, timestamp, or offset from the current tail.
This is the case for both unary and streaming reads.
Streaming reads that do not specify a stop condition will continue to follow updates in real-time.
Starting from a sequence number
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
readSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
})
defer readSession.Close()
for readSession.Next() {
record := readSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
if err := readSession.Err(); err != nil {
log.Fatal(err)
}
let mut session = stream
.read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Starting from a tail offset
Read the last N records in the stream, then follow for new ones:
// Start reading from 10 records before the current tail
const session = await stream.readSession({
start: { from: { tailOffset: 10 } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
// Start reading from 10 records before the current tail
tailOffsetSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
TailOffset: s2.Int64(10),
})
defer tailOffsetSession.Close()
for tailOffsetSession.Next() {
record := tailOffsetSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Start reading from 10 records before the current tail
let mut session = stream
.read_session(
ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Starting from a timestamp
Read records starting from a point in time:
// Start reading from a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
start: { from: { timestamp: oneHourAgo } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
// Start reading from a specific timestamp
oneHourAgo := uint64(time.Now().Add(-time.Hour).UnixMilli())
timestampSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
Timestamp: &oneHourAgo,
})
defer timestampSession.Close()
for timestampSession.Next() {
record := timestampSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Start reading from a specific timestamp
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Reading until a timestamp
Read records up to a point in time, then stop:
// Read records until a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
stop: { untilTimestamp: oneHourAgo },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
// Read records until a specific timestamp
oneHourAgo = uint64(time.Now().Add(-time.Hour).UnixMilli())
untilSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Until: &oneHourAgo,
})
defer untilSession.Close()
for untilSession.Next() {
record := untilSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Read records until a specific timestamp
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_until(..one_hour_ago)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Read Session
A read session streams records from a starting position. It handles reconnection on transient failures and provides a simple iterator interface.
Following live updates
By default, a read session without stop conditions will follow the stream indefinitely, waiting for new records as they arrive. When you provide a stop condition (count, bytes, or until), the read stops when either the condition is met or it reaches the current tail—whichever comes first.
See follow live updates in the API docs for the full semantics.
Long polling
The wait parameter controls how long to wait for new records when caught up to the tail. This works for both unary reads (long polling) and sessions.
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
stop: { waitSecs: 30 },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
waitSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Wait: s2.Int32(30),
})
defer waitSession.Close()
for waitSession.Next() {
record := waitSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Read all available records, and once reaching the current tail, wait an additional 30 seconds
// for new ones
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_wait(30)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Check Tail
To find the current end of the stream without reading any records:
const { tail } = await stream.checkTail();
console.log(`Stream has ${tail.seqNum} records`);
tail, _ := stream.CheckTail(ctx)
fmt.Printf("Stream has %d records\n", tail.Tail.SeqNum)
let tail = stream.check_tail().await?;
println!("Stream has {} records", tail.seq_num);
The tail position tells you the sequence number that will be assigned to the next record appended to the stream.