Skip to main content

Read

At the simplest level, you can read a batch of records with a single call:
const batch = await stream.read({
	start: { from: { seqNum: 0 } },
	stop: { limits: { count: 100 } },
});

for (const record of batch.records) {
	console.log(`[${record.seqNum}] ${record.body}`);
}
Unary reads return at most one batch: up to 1000 records or 1 MiB of data. For larger reads or to follow the stream in real-time, use a read session. Every record in a stream is accessible by its sequence number, timestamp, or offset from the current tail. This is the case for both unary and streaming reads. Streaming reads that do not specify a stop condition will continue to follow updates in real-time.
Starting from a sequence number
const session = await stream.readSession({
	start: { from: { seqNum: 0 } },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}
Starting from a tail offset
Read the last N records in the stream, then follow for new ones:
// Start reading from 10 records before the current tail
const session = await stream.readSession({
	start: { from: { tailOffset: 10 } },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}
Starting from a timestamp
Read records starting from a point in time:
// Start reading from a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
	start: { from: { timestamp: oneHourAgo } },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}
Reading until a timestamp
Read records up to a point in time, then stop:
// Read records until a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
	start: { from: { seqNum: 0 } },
	stop: { untilTimestamp: oneHourAgo },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}

Read Session

A read session streams records from a starting position. It handles reconnection on transient failures and provides a simple iterator interface.

Following live updates

By default, a read session without stop conditions will follow the stream indefinitely, waiting for new records as they arrive. When you provide a stop condition (count, bytes, or until), the read stops when either the condition is met or it reaches the current tail—whichever comes first. See follow live updates in the API docs for the full semantics.

Long polling

The wait parameter controls how long to wait for new records when caught up to the tail. This works for both unary reads (long polling) and sessions.
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
const session = await stream.readSession({
	start: { from: { seqNum: 0 } },
	stop: { waitSecs: 30 },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}

Check Tail

To find the current end of the stream without reading any records:
const { tail } = await stream.checkTail();
console.log(`Stream has ${tail.seqNum} records`);
The tail position tells you the sequence number that will be assigned to the next record appended to the stream.