Bytewax connector for S2 can be used when you want S2 streams as source or sink in your Bytewax pipelines.

Real-time insights from Bluesky firehose data

To help showcase the connector, let’s build two Bytewax pipelines:

  1. Process events from Bluesky firehose and store them with S2Sink.
  2. Use an S2Source to generate insights.

Bluesky users typically post different things. Let’s say we are particularly interested in knowing about the frequency of English language posts that have links to select external sites.

Prerequisites

  • Ensure S2 CLI is installed.
  • Generate an authentication token from the dashboard if you don’t have one already.
  • Export the authentication token in shell sessions, in which you would like to run S2 CLI commands and Bytewax pipelines:
    export S2_AUTH_TOKEN="<YOUR_AUTH_TOKEN>"
    
  • Ensure uv is installed and clone bytewax-s2 if you would like to exercise the example pipelines locally. Install the dependencies in a virtual environment and activate it:
    cd bytewax-s2
    uv sync
    source .venv/bin/activate
    

Create basin and streams

One of the key features of S2 is that we can model any domain naturally, and we can create as many streams as we like. Let’s say we are interested in a few ecommerce, social, and news sites. We can create a stream for each of those sites. We can also hierarchically organize them using a naming convention like <OFFSITE_KIND>/<OFFSITE>.

  • Create a new basin if you don’t have one already:
    s2 create-basin s2://<your_basin_name>
    
  • Create a set of streams:
    streams=(
        "ecommerce/amazon"
        "ecommerce/ebay"
        "ecommerce/etsy"
        "social/facebook"
        "social/reddit"
        "social/youtube"
        "news/bbc"
        "news/cbc"
        "news/cnn"
    )
    for stream in "${streams[@]}"; do
        s2 create-stream "s2://<your_basin_name>/$stream"
    done
    

Pipeline design

First, let’s look at the pipeline that sinks to S2. It should contain the following steps:

  • Read events from Bluesky firehose.
  • Filter events to only post creations, and only those that have links to external sites. We can also fold any required data transformations in this step.
  • Write processed events to S2.

With those steps, core pipeline definition could look like the following:

flow = Dataflow("s2_sink_example")
bsky_events = op.input(
    "bluesky_source",
    flow,
    JetstreamSource(wss_uri="wss://jetstream2.us-west.bsky.network/subscribe"),
)
offsite_en_posts = op.filter_map(
    "filter_offsite_english_posts", bsky_events, offsite_english_posts
)
op.output(
    "s2_sink",
    offsite_en_posts,
    S2Sink(
        config=S2Config(auth_token=AUTH_TOKEN),
        basin=BASIN,
        stream_prefix="",
        partition_fn=S2SinkPartitionFn.DIRECT,
    ),
)

Let’s take a closer look at S2Sink creation. stream_prefix="" means all streams in that basin are considered as partitions to which the items are sunk. Items passed to S2Sink must be a (key, value) 2-tuple, and the key helps in routing the value to the appropriate stream. partition_fn=S2SinkPartitionFn.DIRECT means that key will be the name of the stream. Here is the code for the entire pipeline.

Now, let’s look at the pipeline that reads from S2 and sinks to a streaming bar chart that can be viewed in the terminal. Let’s say we are interested in looking at charts that show the counts for social sites linked in Bluesky posts. We can have two charts: one that shows the counts from most recent time window and one that shows cumulative counts since this pipeline started running. The pipeline should contain the following steps:

  • Read records from S2.
  • Parse the records.
  • Make use of the count_window operator that emits per-window counts.
  • Adapt the outputs from the windowing operator and make it suitable for sinking to StreamingBarChartSink.

With those steps, core pipeline definition could look like the following:

flow = Dataflow("s2_source_example")
offsite_posts_raw = op.input(
    "s2_source",
    flow,
    S2Source(
        config=S2Config(auth_token=AUTH_TOKEN),
        basin=BASIN,
        stream_prefix=OFFSITE_KIND,
    ),
)
offsite_posts = op.map(
    "parse_offsite_posts", offsite_posts_raw, lambda sr: json.loads(sr.body)
)
windowed_offsite_mentions = count_window(
    "count_offsite_mentions",
    offsite_posts,
    clock,
    windower,
    lambda event: event["offsite"],
)
formatted_offsite_mentions = op.map(
    "format_offsite_mentions",
    windowed_offsite_mentions.down,
    lambda x: {"window_id": x[1][0], "label": x[0], "value": x[1][1]},
)
op.output("chart_sink", formatted_offsite_mentions, chart)

Let’s take a closer look at S2Source creation. OFFSITE_KIND can have any of these values: "ecommerce", "social", "news", but as we are interested only in social sites it should be "social". stream_prefix="social" would mean records from all matching streams i.e. "social/facebook", "social/reddit", and "social/youtube" will be read. Here is the code for the entire pipeline.

Pipelines in action

If you would like to exercise the pipelines locally, ensure you have followed all the prerequisites.

  • In a terminal, run the S2 sink example pipeline:

    S2_BASIN="<YOUR_BASIN_NAME>" python -m bytewax.run examples/sink.py
    
  • In another terminal, run the S2 source example pipeline:

    S2_BASIN="<YOUR_BASIN_NAME>" OFFSITE_KIND="social" python -m bytewax.run examples/source.py
    

    and then you should see something like the following: