Tumbling Windows
Overview
This tutorial covers creating tumbling windows of data. Tumbling windows capture a window of time in fixed increments across all subjects. For instance you may want to aggregate statistics over each minute and emit an event when a minute passes.

As a motivating use case let's say we have several video channels on our website that viewers watch. We'd like to know how many viewers are watching each channel so that we can show a view count on the channel. Our job will aggregate events by minute and emit a sum of view events for every channel on every minute.
The data we'll be working with is an event stream of JSON objects that look like this:
{
"channel_id": "channel-a",
"timestamp": "2025-01-30T12:45:10Z"
}
And our goal is to emit a new event with a sum each minute:
{
"channel_id": "channel-a",
"timestamp": "2025-01-30T12:45:00Z",
"sum": 5
}
Keying Events
We'll begin by making a handler object that Reduction will call to process the
event stream. The first thing a handler does is extract the timestamp and
subject key from an event and return a KeyedEvent
type. Reduction uses the
KeyedEvent
timestamp to derive the relative time in a stream of events and it
uses the key to partition events between the operators that it manages
internally.
- Go
- TypeScript
// The ViewEvent represents a user viewing a channel
type ViewEvent struct {
ChannelID string `json:"channel_id"`
Timestamp time.Time `json:"timestamp"`
}
// KeyEvent takes the raw data from our source and returns events with timestamps and keys
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ViewEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.ChannelID),
Timestamp: event.Timestamp,
}}, nil
}
// KeyEvent takes the raw data from our source and returns events with timestamps and keys
export function keyEvent(eventData: Uint8Array): KeyedEvent[] {
const event: ViewEvent = JSON.parse(Buffer.from(eventData).toString());
return [
{
key: Buffer.from(event.channelId),
timestamp: Temporal.Instant.from(event.timestamp),
value: Buffer.from([]),
},
];
}
Creating the Operator Handler
We define State Specs and Sinks when configuring the job and then pass them to the Operator Handler. The handler uses these predefined State Specs and Sinks at runtime.
To store the state of our tumbling window we'll give our handler a Map Spec where the keys are are timestamps rounded down to the nearest minute and the values are the sum of view events for that minute.
You may wonder why we need to store events by minute rather than just storing a
single value representing a sum of events for the current minute. In Reduction,
timers are asynchronous and setting a timer represents the earliest a timer
may fire. This is similar to timers in programming languages like Go
(time.After
) or JavaScript (setTimeout
). Events for the next minute will
arrive before the timer fires for the previous minute.
We'll also define a SumEvent
type that we'll use to collect the results of
each tumbling window.
- Go
- TypeScript
// The SumEvent is the total number of views for a channel over a time interval
type SumEvent struct {
ChannelID string `json:"channel_id"`
Timestamp time.Time `json:"timestamp"`
Sum int `json:"sum"`
}
// Handler processes view events and maintains counts per minute for each channel.
// It emits sum events when a minute window closes.
type Handler struct {
// Sink sends aggregated view counts to the configured destination
Sink rxn.Sink[SumEvent]
// CountsByMinute stores the running count of views per minute
CountsByMinute rxn.MapSpec[time.Time, int]
}
// The SumEvent is the total number of views for a channel over a time interval
export interface SumEvent {
channelId: string;
timestamp: Temporal.Instant;
sum: number;
}
/**
* Handler processes view events and maintains counts per minute for each channel.
*/
export class Handler implements OperatorHandler {
private sink: topology.Sink<SumEvent>;
private countsByMinuteSpec: topology.MapSpec<Temporal.Instant, number>;
constructor(
countSpec: topology.MapSpec<Temporal.Instant, number>,
sink: topology.Sink<SumEvent>
) {
this.sink = sink;
this.countsByMinuteSpec = countSpec;
}
}
Processing Events
As Reduction processes the view events, it will call the handler's OnEvent
method for each event to give us an opportunity to store state, set timers, or
collect new events.
In our handler's OnEvent
method we'll load the state, increment
the sum for the event's minute (rounding down), and set a timer
to fire on the next minute.
- Go
- TypeScript
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
// Load the map state for counts by minute
state := h.CountsByMinute.StateFor(subject)
// Increment the count for the event's minute
minute := subject.Timestamp().Truncate(time.Minute)
sum, _ := state.Get(minute)
state.Set(minute, sum+1)
// Set a timer to flush the minute's count once we reach the next minute
subject.SetTimer(minute.Add(time.Minute))
return nil
}
onEvent(subject: Subject, event: KeyedEvent): void {
// Load the map state for counts by minute
const state = this.countsByMinuteSpec.stateFor(subject);
// Truncate to minute (set seconds and milliseconds to 0)
const minute = event.timestamp.round({
smallestUnit: "minute",
roundingMode: "trunc",
});
// Increment the count
state.set(minute, (state.get(minute) ?? 0) + 1);
// Set a timer to flush the minute's count once we reach the next minute
subject.setTimer(minute.add({ minutes: 1 }));
}
Processing Timers
When a timer elapses, we'll send the sum value to our sink and remove the
obsolete minute bucket from our map. Reduction calls the OnTimerExpired
method
when a timer expires.
- Go
- TypeScript
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
// Load the map state for counts by minute
state := h.CountsByMinute.StateFor(subject)
// Emit the sums for every earlier minute bucket
for minute, sum := range state.All() {
if minute.Before(timestamp) {
// Emit the count for the minute
h.Sink.Collect(ctx, SumEvent{
ChannelID: string(subject.Key()),
Timestamp: minute,
Sum: sum,
})
// Clean up the emitted minute entry
state.Delete(minute)
}
}
return nil
}
onTimerExpired(subject: Subject, timer: Temporal.Instant): void {
// Load the map state for counts by minute
const state = this.countsByMinuteSpec.stateFor(subject);
// Emit the sums for every earlier minute bucket
for (const [minute, sum] of state.entries()) {
if (Temporal.Instant.compare(minute, timer) < 0) {
// Emit the count for the minute
this.sink.collect(subject, {
channelId: Buffer.from(subject.key).toString("utf8"),
timestamp: minute,
sum: sum,
});
// Clean up the emitted minute entry
state.delete(minute);
}
}
}
Testing
You may have spotted opportunities for unit test in our handler implementation
but the most interesting parts of a Reduction job are how the callbacks and
state mutations work together to produce our final output. We can test how the
handler will work with a production Reduction cluster by using the
topology.TestRun
utility. This utility invokes the reduction testrun
command
with a list of test events to process with the handler. When all the events have
been processed, we can inspect an in-memory sink to see what events we would
have sent.
Our Handler's Sink
member is an interface that allows us to collect our
SumEvent
events (connectors.SinkRuntime[SumEvent]
). When testing we can use
Reduction's memory sink type to record sink events rather than having the
cluster handle them.
Let's start the test by setting up our job:
- Go
- TypeScript
job := &topology.Job{}
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
KeyEvent: tumblingwindow.KeyEvent,
})
memorySink := memory.NewSink[tumblingwindow.SumEvent](job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
return &tumblingwindow.Handler{
Sink: memorySink,
CountsByMinute: topology.NewMapSpec(op, "CountsByMinute", rxn.ScalarMapCodec[time.Time, int]{}),
}
},
})
source.Connect(operator)
operator.Connect(memorySink)
// Setup job
const job = new topology.Job({
workerCount: 1,
workingStorageLocation: "storage",
});
const source = new embedded.Source(job, "Source", {
keyEvent,
});
const memorySink = new memory.Sink<SumEvent>(job, "Sink");
const operator = new topology.Operator(job, "Operator", {
parallelism: 1,
handler: (op) => {
const state = new topology.MapSpec<Temporal.Instant, number>(
op,
"countsByMinute",
new MapCodec({
keyCodec: timestampValueCodec,
valueCodec: uint64ValueCodec,
})
);
return new Handler(state, memorySink);
},
});
source.connect(operator);
operator.connect(memorySink);
In our "test run" we'll add a some view events for a channel and advance the watermark.
- Go
- TypeScript
// Setup test run
tr := job.NewTestRun()
// Add view events
addViewEvent(tr, "channel", "2025-01-01T00:01:00Z")
addViewEvent(tr, "channel", "2025-01-01T00:01:30Z")
addViewEvent(tr, "channel", "2025-01-01T00:01:59Z")
addViewEvent(tr, "channel", "2025-01-01T00:02:10Z")
addViewEvent(tr, "channel", "2025-01-01T00:03:01Z")
// Add watermark to let time advance
tr.AddWatermark()
// Run the test
tr.Run()
// Setup test run
const testRun = job.createTestRun();
// Add view events
addViewEvent(testRun, "channel", "2025-01-01T00:01:00Z");
addViewEvent(testRun, "channel", "2025-01-01T00:01:30Z");
addViewEvent(testRun, "channel", "2025-01-01T00:01:59Z");
addViewEvent(testRun, "channel", "2025-01-01T00:02:10Z");
addViewEvent(testRun, "channel", "2025-01-01T00:03:01Z");
// Add watermark to let time advance
testRun.addWatermark();
// Run the test
await testRun.run();
In stream processing, a watermark indicates that all events up to a certain timestamp have been processed. When we advance the watermark in testing, we're simulating the passage of time and allowing timers to fire.
Then we check the memory sink for the events we expect to have been emitted.
- Go
- TypeScript
assert.Equal(t, []tumblingwindow.SumEvent{
{ChannelID: "channel", Timestamp: mustParseTime("2025-01-01T00:01:00Z"), Sum: 3},
{ChannelID: "channel", Timestamp: mustParseTime("2025-01-01T00:02:00Z"), Sum: 1},
}, memorySink.Records)
// Assert the results
expect(memorySink.records).toEqual([
{
channelId: "channel",
timestamp: Temporal.Instant.from("2025-01-01T00:01:00Z"),
sum: 3,
},
{
channelId: "channel",
timestamp: Temporal.Instant.from("2025-01-01T00:02:00Z"),
sum: 1,
},
]);
You might wonder why we get a closed window for minute 2 but not minute 3 in this test. The window for minute three is still open because, according to event time, more events may arrive to fill that window.
Wrapping Up
Tumbling windows are a good introduction to stateful processing with timers. They're pretty simple and can be thought of as batching aggregations by key. However they can also accommodate complex aggregation and filtering logic within the windows.