Reference

Documentation for the S2 components for Bento is available at:

Getting started

We are going to take inputs from a number of S2 streams, process the records using Bento, and output the records merged into another S2 stream.

Prerequisites

1

Generate an S2 authentication token

Generate an authentication token by logging onto the web console at s2.dev and set the S2_AUTH_TOKEN environment variable:

export S2_AUTH_TOKEN="<YOUR-AUTH-TOKEN-HERE>"
2

Install the S2 CLI

Install the S2 CLI and set the authentication token:

s2 config set --auth-token ${S2_AUTH_TOKEN}
3

Install the Bento CLI

Install the latest version of the Bento CLI:

curl -Lsf https://warpstreamlabs.github.io/bento/sh/install | bash

Ensure that the CLI supports the S2 plugin:

bento list | grep 's2$'

Setup

1

Create a new basin

Basin names are globally unique. They must be between 8 and 48 characters long and comprise lowercase letters, numbers and hyphens. They cannot begin or end with a hyphen.

export MY_BASIN_NAME="<YOUR-BASIN-NAME-HERE>"
s2 create-basin ${MY_BASIN_NAME}
2

Create source streams

Create source streams with the prefix pup/ and append some “woofs”:

PUP_NAMES=("buddy" "yoyo" "scooby")

for PUP in "${PUP_NAMES[@]}"; do
    s2 create-stream "s2://${MY_BASIN_NAME}/pup/${PUP}"

    # Generate and append 10000 random woofs.
    for _ in {1..10000}; do
        echo "${RANDOM}"
    done | s2 append "s2://${MY_BASIN_NAME}/pup/${PUP}"
done

We can verify that the streams have been appended with random numbers using:

s2 read s2://${MY_BASIN_NAME}/pup/yoyo -n 10

The above command should output a list of 10 random numbers.

3

Create a new stream called woofs to store the processed records:

s2 create-stream "s2://${MY_BASIN_NAME}/woofs"

The Pipeline

1

Configuration

Create a file called woof-pipeline.yml:

cache_resources:
  - label: seq_num_cache
    noop: {}

input:
  label: woof_input
  s2:
    basin: ${MY_BASIN_NAME}
    # All streams with the prefix `pup/`
    streams: pup/
    auth_token: ${S2_AUTH_TOKEN}
    cache: seq_num_cache

pipeline:
  processors:
    - mapping: |
        root = "%v woofs from %s".format(this, meta("s2_stream").re_replace("^pup/", ""))

output:
  label: woof_output
  s2:
    basin: ${MY_BASIN_NAME}
    stream: woofs
    auth_token: ${S2_AUTH_TOKEN}
2

Running the pipeline

Start the pipeline using:

bento -c woof-pipeline.yml

Open up another terminal and see the records being appended to the woofs stream:

s2 read s2://${MY_BASIN_NAME}/woofs