Sliding Windows
Overview
This tutorial covers sliding windows of data. Sliding, like tumbling windows, are uniform across subjects but overlap with each other. If you want a user's total page views during the last seven days but want that data updated every minute you want a sliding window.

Three sliding windows closing for a stream of events
If you've used other stream processing engines you may have been burned by sliding windows. If you plan to process about 100 MB of data in each window for a week and want to slide it by a minute, a naive implementation would result in 1 TB of data in state. The trick is to do some aggregation by bucket as events arrive and progressively drop data from the tail of a sliding window while adding it to the head.

Closing a sliding window
More than tumbling windows, there's some nuance to sliding windows. For instance if just one event arrives for a user is it critical to emit all 10,080 windows for a minute-sliding week or do we just care when the value changes?
As we implement this "total page views count by user in the last seven days" feature, we'll focus on the common use case of keeping a database fresh. This means that we only need to emit data for a window when the value changes.
For this job, the incoming events represent page view events by a user:
{
"user_id": "user-a",
"timestamp": "2025-01-30T12:45:10Z"
}
And our goal is to emit a new event with the weekly sum every minute:
{
"user_id": "user-a",
"total_views": 5
}
Keying Events
First we'll make a function to transform our JSON page view events into
Reduction's KeyedEvent
type.
- Go
- TypeScript
// ViewEvent represents a user viewing a page
type ViewEvent struct {
UserID string `json:"user_id"`
Timestamp time.Time `json:"timestamp"`
}
// KeyEvent takes the raw data from our source and returns events with timestamps and keys
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ViewEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
}}, nil
}
// The ViewEvent represents a user viewing a page
export interface ViewEvent {
userID: string;
timestamp: string;
}
// KeyEvent takes the raw data from our source and returns events with timestamps and keys
export function keyEvent(eventData: Uint8Array): KeyedEvent[] {
const event: ViewEvent = JSON.parse(eventData.toString());
return [
{
key: Buffer.from(event.userID),
timestamp: Temporal.Instant.from(event.timestamp),
value: Buffer.from([]),
},
];
}
Creating the Operator Handler
For this handler's state, we'll store the sum of the page views by minute in a map. We'll also store the previous sum window's sum so that we can avoid emitting events if the sum hasn't changed between windows.
Our sink will take a SumEvent
type to collect the results of each window.
- Go
- TypeScript
// SumEvent represents the sum of views for a user over a time interval
type SumEvent struct {
UserID string `json:"user_id"`
Interval string `json:"interval"`
TotalViews int `json:"total_views"`
}
type Handler struct {
Sink rxn.Sink[SumEvent]
CountsByMinuteSpec rxn.MapSpec[time.Time, int]
PreviousWindowSumSpec rxn.ValueSpec[int]
}
// SumEvent represents the sum of views for a user over a time interval
export interface SumEvent {
userID: string;
interval: string;
totalViews: number;
}
/**
* Handler processes view events and maintains a sliding window of view counts.
*/
export class Handler implements OperatorHandler {
private sink: topology.Sink<SumEvent>;
private countsByMinuteSpec: topology.MapSpec<Temporal.Instant, number>;
private previousWindowSumSpec: topology.ValueSpec<number>;
constructor(
countsByMinuteSpec: topology.MapSpec<Temporal.Instant, number>,
previousWindowSumSpec: topology.ValueSpec<number>,
sink: topology.Sink<SumEvent>
) {
this.countsByMinuteSpec = countsByMinuteSpec;
this.previousWindowSumSpec = previousWindowSumSpec;
this.sink = sink;
}
Processing Each Event
In OnEvent
we'll load the map state, increment the sum for the event's minute,
and set a timer to fire on the next minute.
- Go
- TypeScript
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
// Load the map state for counts by minute
counts := h.CountsByMinuteSpec.StateFor(subject)
// Increment the count for the event's minute
minute := subject.Timestamp().Truncate(time.Minute)
sum, _ := counts.Get(minute)
counts.Set(minute, sum+1)
// Set a timer to flush the minute's count once we reach the next minute
subject.SetTimer(minute.Add(time.Minute))
return nil
}
onEvent(subject: Subject, event: KeyedEvent): void {
// Load the map state for counts by minute
const counts = this.countsByMinuteSpec.stateFor(subject);
// Increment the count for the event's minute
const minute = event.timestamp.round({
smallestUnit: "minute",
roundingMode: "trunc",
});
counts.set(minute, (counts.get(minute) ?? 0) + 1);
// Set a timer to flush the minute's count once we reach the next minute
subject.setTimer(minute.add({ minutes: 1 }));
}
So whenever an event comes in, we'll set a timer for the next minute. This works
as long as we have a steady stream of events invoking the OnEvent
method for
this key but we also need to consider what happens when the events stop for this
user. We still need to collect new data as the windows continue to slide and the
total number of events decreases. We could set a timer for every future sliding
window this event will be part of but for our use case we can minimize the
number of timers by setting a timer in the OnTimerExpired
method.
Processing Timers
When a timer fires, we'll sum all buckets relevant for the current window, remove any obsolete minute buckets from our map, and only emit when the total changes.
Let's write the OnTimerExpired
method to send these events:
- Go
- TypeScript
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
// Load the map state for counts by minute
counts := h.CountsByMinuteSpec.StateFor(subject)
// Our window starts 7 days ago and ends now
windowStart := timestamp.Add(-7 * 24 * time.Hour)
windowEnd := timestamp
// Add to the window sum, delete the minute if it's outside the window, or
// retain the minute sum for a future window
windowSum := 0
for minute, sum := range counts.All() {
if !minute.Before(windowStart) && minute.Before(windowEnd) {
windowSum += sum
} else if minute.Before(windowStart) {
counts.Delete(minute)
}
}
// Only collect a window sum if it changed
prevWindowSum := h.PreviousWindowSumSpec.StateFor(subject)
if prevWindowSum.Value() != windowSum {
h.Sink.Collect(ctx, SumEvent{
UserID: string(subject.Key()),
Interval: windowStart.Format(time.RFC3339) + "/" + windowEnd.Format(time.RFC3339),
TotalViews: windowSum,
})
prevWindowSum.Set(windowSum)
}
// Set a timer to emit future windows in case the user gets no more view events
if counts.Size() > 0 {
subject.SetTimer(subject.Watermark().Truncate(time.Minute).Add(time.Minute))
}
return nil
}
onTimerExpired(subject: Subject, windowEnd: Temporal.Instant): void {
// Load the map state for counts by minute
const counts = this.countsByMinuteSpec.stateFor(subject);
// Our window starts 7 days ago and ends now
const windowStart = windowEnd.subtract({ hours: 7 * 24 });
let windowSum = 0;
for (const [minute, sum] of counts.entries()) {
if (
Temporal.Instant.compare(minute, windowStart) >= 0 &&
Temporal.Instant.compare(minute, windowEnd) < 0
) {
// Add to window sum when the minute is within the window
windowSum += sum;
} else if (Temporal.Instant.compare(minute, windowStart) < 0) {
// Delete the minute if the window has passed
counts.delete(minute);
} else {
// Retain the minute sum for a future window
}
}
// Only collect a window sum if it changed
const prevWindowSum = this.previousWindowSumSpec.stateFor(subject);
if (prevWindowSum.value !== windowSum) {
this.sink.collect(subject, {
userID: Buffer.from(subject.key).toString("utf8"),
interval: [
windowStart.toString({ smallestUnit: "minute" }),
windowEnd.toString({ smallestUnit: "minute" }),
].join("/"),
totalViews: windowSum,
});
prevWindowSum.setValue(windowSum);
}
// Set a timer to emit future windows in case the user gets no more view events
if (counts.size > 0) {
const nextMinute = subject.watermark
.round({ smallestUnit: "minute", roundingMode: "trunc" })
.add({ minutes: 1 });
subject.setTimer(nextMinute);
}
}
One optimization here (highlighted) is that we only set another timer when there is some value in the window. When there's no data in the map, we can rely on any new event setting another timer.
Also notice that when we set the next timer, we set it based on the current watermark and not just the expired timer value. Remember that to be completely correct we should assume that timers can fire arbitrarily late. If a timer fires more than a minute late and we set a timer for the following minute based on that value, that next timer will be dropped if it's past the watermark.
Watermark is short for "low watermark" and is a common concept in stream processing. Incoming events don't have to be in strict order, but at some point the job has to decide when it can safely fire a timer and expect to have seen all the earlier events. That moving threshold is the watermark.
Testing
For our testing we can use the TestRun
utility to get the results of running
our handler against a set of ViewEvents.
We'll set up our job with a source, sink, and operator with our handler. This is where we configure the two State Specs:
countsByMinute
: Map Spec of timestamps to countspreviousWindowSum
: Value Spec for a number
- Go
- TypeScript
job := &topology.Job{}
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
KeyEvent: slidingwindow.KeyEvent,
})
memorySink := memory.NewSink[slidingwindow.SumEvent](job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
return &slidingwindow.Handler{
Sink: memorySink,
CountsByMinuteSpec: topology.NewMapSpec(op, "CountsByMinute", rxn.ScalarMapCodec[time.Time, int]{}),
PreviousWindowSumSpec: topology.NewValueSpec(op, "PreviousWindowSum", rxn.ScalarValueCodec[int]{}),
}
},
})
source.Connect(operator)
operator.Connect(memorySink)
// Setup job
const job = new topology.Job({
workerCount: 1,
workingStorageLocation: "storage",
});
const source = new embedded.Source(job, "Source", {
keyEvent,
});
const memorySink = new memory.Sink<SumEvent>(job, "Sink");
const operator = new topology.Operator(job, "Operator", {
parallelism: 1,
handler: (op) => {
const countsByMinute = new topology.MapSpec<Temporal.Instant, number>(
op,
"CountsByMinute",
new MapCodec({
keyCodec: timestampValueCodec,
valueCodec: uint64ValueCodec,
})
);
const previousWindowSum = new topology.ValueSpec<number>(
op,
"PreviousWindowSum",
uint64ValueCodec,
0,
);
return new Handler(countsByMinute, previousWindowSum, memorySink);
},
});
source.connect(operator);
operator.connect(memorySink);
Then we set up our test run with a series of ViewEvents to process.
- Go
- TypeScript
tr := job.NewTestRun()
/* Events for user accumulate */
// Two events in one minute
addViewEvent(tr, "user", "2025-01-08T00:01:00Z")
addViewEvent(tr, "user", "2025-01-08T00:01:10Z")
// Two events in next minute
addViewEvent(tr, "user", "2025-01-08T00:02:10Z")
addViewEvent(tr, "user", "2025-01-08T00:02:59Z")
// One event and then no more
addViewEvent(tr, "user", "2025-01-08T00:03:00Z")
/* Events from other users advance event time */
// Advance the watermark near the middle of user's window
addViewEvent(tr, "other-user", "2025-01-11T00:00:00Z")
tr.AddWatermark()
// Advance the watermark near the end of user's window
addViewEvent(tr, "other-user", "2025-01-15T00:01:00Z")
tr.AddWatermark()
addViewEvent(tr, "other-user", "2025-01-15T00:02:00Z")
tr.AddWatermark()
addViewEvent(tr, "other-user", "2025-01-15T00:03:00Z")
tr.AddWatermark()
addViewEvent(tr, "other-user", "2025-01-15T00:04:00Z")
tr.AddWatermark()
addViewEvent(tr, "other-user", "2025-01-15T00:05:00Z")
tr.AddWatermark()
if err := tr.Run(); err != nil {
t.Fatalf("failed to run handler: %v", err)
}
// Setup test run
const testRun = job.createTestRun();
/* Events for user accumulate */
// Two events in one minute
addViewEvent(testRun, "user", "2025-01-08T00:01:00Z");
addViewEvent(testRun, "user", "2025-01-08T00:01:10Z");
// Two events in next minute
addViewEvent(testRun, "user", "2025-01-08T00:02:10Z");
addViewEvent(testRun, "user", "2025-01-08T00:02:59Z");
// One event and then no more
addViewEvent(testRun, "user", "2025-01-08T00:03:00Z");
/* Events from other users advance event time */
// Advance the watermark near the middle of user's window
addViewEvent(testRun, "other-user", "2025-01-11T00:00:00Z");
testRun.addWatermark();
// Advance the watermark near the end of user's window
addViewEvent(testRun, "other-user", "2025-01-15T00:01:00Z");
testRun.addWatermark();
addViewEvent(testRun, "other-user", "2025-01-15T00:02:00Z");
testRun.addWatermark();
addViewEvent(testRun, "other-user", "2025-01-15T00:03:00Z");
testRun.addWatermark();
addViewEvent(testRun, "other-user", "2025-01-15T00:04:00Z");
testRun.addWatermark();
addViewEvent(testRun, "other-user", "2025-01-15T00:05:00Z");
testRun.addWatermark();
// Run the test
await testRun.run();
The use of events with another key ("other-user") and the watermarks demonstrate how event time and watermarks progress in the absence of events for "user". Other timestamped events and periodic watermarks continue to mark the passage of time and allow relevant timers to fire.
Finally we can assert on the events in the memorySink
.
- Go
- TypeScript
// Filter events to just focus on "user"
userEvents := []slidingwindow.SumEvent{}
for _, event := range memorySink.Records {
if event.UserID == "user" {
userEvents = append(userEvents, event)
}
}
assert.Equal(t, []slidingwindow.SumEvent{
// TotalViews accumulate for the first 3 minutes
{UserID: "user", Interval: "2025-01-01T00:02:00Z/2025-01-08T00:02:00Z", TotalViews: 2},
{UserID: "user", Interval: "2025-01-01T00:03:00Z/2025-01-08T00:03:00Z", TotalViews: 4},
{UserID: "user", Interval: "2025-01-01T00:04:00Z/2025-01-08T00:04:00Z", TotalViews: 5},
// TotalViews decrease as windows at the end of the week close
{UserID: "user", Interval: "2025-01-08T00:02:00Z/2025-01-15T00:02:00Z", TotalViews: 3},
{UserID: "user", Interval: "2025-01-08T00:03:00Z/2025-01-15T00:03:00Z", TotalViews: 1},
{UserID: "user", Interval: "2025-01-08T00:04:00Z/2025-01-15T00:04:00Z", TotalViews: 0},
}, userEvents, "events should match expected sequence")
// Filter events to just focus on "user"
const userEvents = memorySink.records.filter(event => event.userID === "user");
expect(userEvents).toEqual([
// TotalViews accumulate for the first 3 minutes
{
userID: "user",
interval: "2025-01-01T00:02Z/2025-01-08T00:02Z",
totalViews: 2
},
{
userID: "user",
interval: "2025-01-01T00:03Z/2025-01-08T00:03Z",
totalViews: 4
},
{
userID: "user",
interval: "2025-01-01T00:04Z/2025-01-08T00:04Z",
totalViews: 5
},
// TotalViews decrease as windows at the end of the week close
{
userID: "user",
interval: "2025-01-08T00:02Z/2025-01-15T00:02Z",
totalViews: 3
},
{
userID: "user",
interval: "2025-01-08T00:03Z/2025-01-15T00:03Z",
totalViews: 1
},
{
userID: "user",
interval: "2025-01-08T00:04Z/2025-01-15T00:04Z",
totalViews: 0
}
]);
Wrapping Up
Often teams start with a daily batch SQL job and find that running it more frequently would be valuable. Maybe twice a day? Every hour? Stream processing with sliding windows can dramatically increase the freshness of data.
Although a high-level API for sliding windows could be built on top of the
OnEvent
and OnTimerExpired
methods, I hope you can see how the specifics of
a use case lead to optimizations or custom business rules that would be
difficult to express in a format like SQL.