Tumbling Windows
Overview
This tutorial covers creating tumbling windows of data. Tumbling windows capture a window of time in fixed increments across all subjects. For instance you may want to aggregate statistics over each minute and emit an event when a minute passes.

As a motivating use case let's say we have several video channels on our website that viewers watch. We'd like to know how many viewers are watching each channel so that we can show a view count on the channel. Our job will aggregate events by minute and emit a sum of view events for every channel on every minute.
The data we'll be working with is an event stream of JSON objects that look like this:
{
"channel_id": "channel-a",
"timestamp": "2025-01-30T12:45:10Z"
}
And our goal is to emit a new event with a sum each minute:
{
"channel_id": "channel-a",
"timestamp": "2025-01-30T12:45:00Z",
"sum": 5
}
Keying Events
We'll begin by making a handler object that Reduction will call to process the
event stream. The first thing a handler does is extract the timestamp and
subject key from an event and return a KeyedEvent
type. Reduction uses the
KeyedEvent
timestamp to derive the relative time in a stream of events and it
uses the key to partition events between the operators that it manages
internally.
// The ViewEvent represents a user viewing a channel
type ViewEvent struct {
ChannelID string `json:"channel_id"`
Timestamp time.Time `json:"timestamp"`
}
// KeyEvent takes the raw data from our source and returns events with timestamps and keys
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ViewEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.ChannelID),
Timestamp: event.Timestamp,
}}, nil
}
Processing Each Event
As Reduction processes the view events, it will call the handler's OnEvent
method for each event to give us an opportunity to store state and trigger
timers. In this case we'll want to store the sum of events by minute in a map
and set a timer to trigger when a minute is over.
You may wonder why we need to store events by minute rather than just storing a
single value representing a sum of events for the current minute. In Reduction,
timers are asynchronous and setting a timer represents the earliest a timer
may fire. This is similar to timers in programming languages like Go
(time.After
) or JavaScript (setTimeout
). Events for the next minute will
arrive before the timer fires for the previous minute.
Each supported language has state specs to handle converting our types to a
binary format and back according to a provided codec (coder/decoder). For a map
of timestamps to integers, we can use the MapSpec
type.
// Handler processes view events and maintains counts per minute for each channel.
// It emits sum events when a minute window closes.
type Handler struct {
// Sink sends aggregated view counts to the configured destination
Sink rxn.Sink[SumEvent]
// CountsByMinute stores the running count of views per minute
CountsByMinute rxn.MapSpec[time.Time, int]
}
In our handler's OnEvent
method we'll load the state, increment
the sum for the event's minute (rounding down), and set a timer
to fire on the next minute.
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
// Load the map state for counts by minute
state := h.CountsByMinute.StateFor(subject)
// Increment the count for the event's minute
minute := subject.Timestamp().Truncate(time.Minute)
sum, _ := state.Get(minute)
state.Set(minute, sum+1)
// Set a timer to flush the minute's count once we reach the next minute
subject.SetTimer(minute.Add(time.Minute))
return nil
}
Processing Timers
When a timer fires, we'll send the sum value to our sink and remove the obsolete minute bucket from our map.
Let's create an object to handle the JSON serialization.
// The SumEvent is the total number of views for a channel over a time interval
type SumEvent struct {
ChannelID string `json:"channel_id"`
Timestamp time.Time `json:"timestamp"`
Sum int `json:"sum"`
}
And then write the OnTimerExpired
method to send these events to the sink.
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
// Load the map state for counts by minute
state := h.CountsByMinute.StateFor(subject)
// Emit the sums for every earlier minute bucket
for minute, sum := range state.All() {
if minute.Before(timestamp) {
// Emit the count for the minute
h.Sink.Collect(ctx, SumEvent{
ChannelID: string(subject.Key()),
Timestamp: minute,
Sum: sum,
})
// Clean up the emitted minute entry
state.Delete(minute)
}
}
return nil
}
Testing
You may have spotted opportunities for unit test in our handler implementation
but the most interesting parts of a Reduction job are how the callbacks and
state mutations work together to produce our final output. We can test how the
handler will work with a production Reduction cluster by using the
topology.TestRun
utility. This utility invokes the reduction test
command
with a list of test events to process with the handler. When all the events have
been processed, we can inspect an in-memory sink to see what events we would
have sent.
Our Handler's Sink
member is an interface that allows us to collect our
SumEvent
events (connectors.SinkRuntime[SumEvent]
). When testing we can use
Reduction's memory sink type to record sink events rather than having the
cluster handle them. Let's start the test by setting up our job.
job := &topology.Job{}
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
KeyEvent: tumblingwindow.KeyEvent,
})
memorySink := memory.NewSink[tumblingwindow.SumEvent](job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
return &tumblingwindow.Handler{
Sink: memorySink,
CountsByMinute: topology.NewMapSpec(op, "CountsByMinute", rxn.ScalarMapCodec[time.Time, int]{}),
}
},
})
source.Connect(operator)
operator.Connect(memorySink)
In our "test run" we'll add a some view events for a channel and advance the watermark.
tr := job.NewTestRun()
addViewEvent(tr, "channel", "2025-01-01T00:01:00Z")
addViewEvent(tr, "channel", "2025-01-01T00:01:30Z")
addViewEvent(tr, "channel", "2025-01-01T00:01:59Z")
addViewEvent(tr, "channel", "2025-01-01T00:02:10Z")
addViewEvent(tr, "channel", "2025-01-01T00:03:01Z")
tr.AddWatermark()
tr.Run()
In stream processing, a watermark indicates that all events up to a certain timestamp have been processed. When we advance the watermark in testing, we're simulating the passage of time and allowing timers to fire.
And then we check the memory sink for the events we expect to have been emitted.
assert.Equal(t, []tumblingwindow.SumEvent{
{ChannelID: "channel", Timestamp: mustParseTime("2025-01-01T00:01:00Z"), Sum: 3},
{ChannelID: "channel", Timestamp: mustParseTime("2025-01-01T00:02:00Z"), Sum: 1},
}, memorySink.Records)
You might wonder why we get a closed window for minute 2 but not minute 3 in this test. The window for minute three is still open because, according to event time, more events may arrive to fill that window.
Wrapping Up
Tumbling windows are a good introduction to stateful processing with timers. They're pretty simple and can be thought of as batching aggregations by key. However they can also accommodate complex aggregation and filtering logic within the windows.