Session Windows
Overview
This tutorial covers building a Reduction job to emit session windows. A session window represents a period of continuous activity that ends after a period of inactivity. If you want to capture a logical grouping of events related to a user like a single online shopping experience you'll want to use session windows.

Session windows split by gaps of inactivity
Session windows were my first introduction to stateful stream processing. My team needed to capture user sessions that ended 15 minutes after the last session event. I was surprised when a developer's implementation was more complicated than the high-level session window API I expected. Just a couple simple sounding requirements like "sessions can't be more than 24 hours long" had fundamentally changed the approach. A goal of Reduction is accommodate new requirements without making developers learn an entirely new set of concepts.
For this example we'll work with view events that represent a user viewing a website:
{
"user_id": "user-a",
"timestamp": "2025-01-30T12:45:10Z"
}
And we'll emit events representing the time intervals that a user was actively on the site:
{
"user_id": "user-a",
"interval": "2025-01-30T12:45:00Z/2025-01-30T13:00:00Z"
}
We'll convert these JSON objects into the following types:
- Go
- TypeScript
// ViewEvent represents a user viewing a page
type ViewEvent struct {
UserID string `json:"user_id"`
Timestamp time.Time `json:"timestamp"`
}
// SessionEvent represents a user's continuous session on the site
type SessionEvent struct {
UserID string `json:"user_id"`
Interval string `json:"interval"`
}
// The ViewEvent represents a user viewing a page
export interface ViewEvent {
userID: string;
timestamp: string;
}
// SessionEvent represents a user's continuous session on the site
export interface SessionEvent {
userID: string;
interval: string;
}
Keying Events
First we'll make a function to transform our JSON page view events into
Reduction's KeyedEvent
type.
- Go
- TypeScript
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ViewEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
}}, nil
}
// KeyEvent takes the raw data from our source and returns events with timestamps and keys
export function keyEvent(eventData: Uint8Array): KeyedEvent[] {
const event: ViewEvent = JSON.parse(eventData.toString());
return [
{
key: Buffer.from(event.userID),
timestamp: Temporal.Instant.from(event.timestamp),
value: Buffer.from([]),
},
];
}
Representing Session State
For session windows, we need to track the start and end time of each active session. We could use two different state values for that, but it's more convenient to using a single value to represent the session state.
We'll create a Session
type to represent this state and a codec to handle
encoding and decoding of the session data.
- Go
- TypeScript
// Session represents the internal state of an active session
type Session struct {
Start time.Time
End time.Time
}
func (s Session) IsZero() bool {
return s.Start.IsZero() && s.End.IsZero()
}
func (s Session) Interval() string {
return fmt.Sprintf("%s/%s", s.Start.Format(time.RFC3339), s.End.Format(time.RFC3339))
}
// SessionCodec encodes and decodes Session values
type SessionCodec struct{}
// DecodeValue returns a Session from a string representation with ISO timestamps
func (c SessionCodec) Decode(b []byte) (Session, error) {
parts := strings.Split(string(b), "/")
if len(parts) != 2 {
return Session{}, fmt.Errorf("invalid session format: %s", b)
}
start, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
return Session{}, fmt.Errorf("invalid start time format: %w", err)
}
end, err := time.Parse(time.RFC3339, parts[1])
if err != nil {
return Session{}, fmt.Errorf("invalid end time format: %w", err)
}
return Session{start, end}, nil
}
// EncodeValue returns the string representation of a Session as ISO timestamps
func (c SessionCodec) Encode(value Session) ([]byte, error) {
return []byte(value.Interval()), nil
}
// Session represents the internal state of an active session
export interface Session {
start: Temporal.Instant;
end: Temporal.Instant;
}
// createSessionEvent creates a SessionEvent for the sink
function createSessionEvent(userID: Uint8Array, session: Session): SessionEvent {
return {
userID: Buffer.from(userID).toString(),
interval: sessionInterval(session),
};
}
// Returns a interval string like "2025-01-01T00:00Z/2025-01-01T12:50Z".
function sessionInterval(session: Session): string {
return [
session.start.toString({ smallestUnit: "minute" }),
session.end.toString({ smallestUnit: "minute" }),
].join("/");
}
// This is a custom codec to serialize and deserialize the session state.
export const sessionCodec = new ValueCodec<Session | undefined>({
encode(value) {
assert(value, "will only persist defined values");
return Buffer.from(sessionInterval(value));
},
decode(data) {
const [start, end] = Buffer.from(data)
.toString("utf8")
.split("/")
.map(Temporal.Instant.from);
return { start, end };
},
});
Creating the Handler
Our handler uses:
- A Sink type that accepts SessionEvent
- A ValueSpec to manage session state
- An inactivity threshold to parameterize the sessions window size
- Go
- TypeScript
// Handler is the session window operator handler
type Handler struct {
Sink rxn.Sink[SessionEvent]
SessionSpec rxn.ValueSpec[Session]
InactivityThreshold time.Duration
}
/**
* Handler processes view events and maintains session windows.
*/
export class Handler implements OperatorHandler {
private sink: topology.Sink<SessionEvent>;
private sessionSpec: topology.ValueSpec<Session | undefined>;
private sessionTimeout: Temporal.Duration;
constructor(
sessionSpec: topology.ValueSpec<Session | undefined>,
sink: topology.Sink<SessionEvent>,
inactivityThreshold: Temporal.Duration
) {
this.sink = sink;
this.sessionSpec = sessionSpec;
this.sessionTimeout = inactivityThreshold;
}
Processing Each Event
In the OnEvent
method, we'll either:
- Start a new session if we have no existing session for the user
- Extend the current session if the event falls within its inactivity threshold
- Close the current session and start a new one
- Go
- TypeScript
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
// cut-start: on-event-24h
sessionState := h.SessionSpec.StateFor(subject)
session := sessionState.Value()
eventTime := subject.Timestamp()
if session.IsZero() {
// Start a new session for the user
session = Session{Start: eventTime, End: eventTime}
} else if eventTime.After(session.End.Add(h.InactivityThreshold)) {
// Emit the session event and start a new session
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
session = Session{Start: eventTime, End: eventTime}
} else {
// Extend the current session
session = Session{Start: session.Start, End: eventTime}
}
sessionState.Set(session)
subject.SetTimer(session.End.Add(h.InactivityThreshold))
return nil
}
onEvent(subject: Subject, event: KeyedEvent): void {
// cut-start: on-event-24h
const sessionState = this.sessionSpec.stateFor(subject);
let session = sessionState.value;
const eventTime = event.timestamp;
if (session === undefined) {
// Start a new session for the user
session = { start: eventTime, end: eventTime };
} else if (
Temporal.Duration.compare(session.end.until(eventTime), this.sessionTimeout) > 0
) {
// Emit the session event and start a new session
this.sink.collect(subject, createSessionEvent(subject.key, session));
session = { start: eventTime, end: eventTime };
} else {
// Extend the current session
session = { ...session, end: eventTime };
}
sessionState.setValue(session);
subject.setTimer(session.end.add(this.sessionTimeout));
}
By observing the events of a user we can tell when there's a 15m gap but what
happens if we never get another event from the user? By setting a timer to
trigger after 15m passes with subject.SetTimer
, we can close the session in
the OnTimerExpired
method without needing to wait for another event from the
user.
Processing Timers
When a timer expires, we check if the session should be closed based on the inactivity threshold. If so, we emit the session event and clean up the session state:
- Go
- TypeScript
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
sessionState := h.SessionSpec.StateFor(subject)
session := sessionState.Value()
// Check whether this is the latest timer we set for this subject
if timestamp.Equal(session.End.Add(h.InactivityThreshold)) {
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
sessionState.Drop()
}
return nil
}
onTimerExpired(subject: Subject, timestamp: Temporal.Instant): void {
const sessionState = this.sessionSpec.stateFor(subject);
const session = sessionState.value;
assert(session, "session must exist");
// Determine if this is the latest timer we set for this subject
const isLatestTimer = timestamp.equals(session.end.add(this.sessionTimeout));
if (isLatestTimer) {
this.sink.collect(subject, createSessionEvent(subject.key, session));
sessionState.drop();
}
}
Testing
Let's test our session window implementation with a series of events that should create two distinct sessions.
First we set up our job for the test. This uses the embedded source and memory sink for testing.
- Go
- TypeScript
job := &topology.Job{}
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
KeyEvent: sessionwindow.KeyEvent,
})
memorySink := memory.NewSink[sessionwindow.SessionEvent](job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
return &sessionwindow.Handler{
Sink: memorySink,
SessionSpec: topology.NewValueSpec(op, "Session", sessionwindow.SessionCodec{}),
InactivityThreshold: 15 * time.Minute,
}
},
})
source.Connect(operator)
operator.Connect(memorySink)
// Setup job
const job = new topology.Job({
workerCount: 1,
workingStorageLocation: "storage",
});
const source = new embedded.Source(job, "Source", {
keyEvent,
});
const memorySink = new memory.Sink<SessionEvent>(job, "Sink");
const operator = new topology.Operator(job, "Operator", {
parallelism: 1,
handler: (op) => {
// Setup our session spec with our custom codec. When there is no data for
// a user, the session will be undefined.
const sessionSpec = new topology.ValueSpec(op, "Session", sessionCodec, undefined);
// 15 minutes in milliseconds
const inactivityThreshold = Temporal.Duration.from({ minutes: 15 });
return new Handler(sessionSpec, memorySink, inactivityThreshold);
},
});
source.connect(operator);
operator.connect(memorySink);
Then we create a test run with events to test our handler:
- Go
- TypeScript
tr := job.NewTestRun()
// First session with events close together
addViewEvent(tr, "user", "2025-01-01T00:01:00Z")
addViewEvent(tr, "user", "2025-01-01T00:05:00Z")
addViewEvent(tr, "user", "2025-01-01T00:10:00Z")
tr.AddWatermark()
// Gap in activity (>15 minutes)
// Second session
addViewEvent(tr, "user", "2025-01-01T00:30:00Z")
addViewEvent(tr, "user", "2025-01-01T00:35:00Z")
tr.AddWatermark()
// Events from another user advances event time
addViewEvent(tr, "other-user", "2025-01-01T01:00:00Z")
tr.AddWatermark()
require.NoError(t, tr.Run())
// Setup test run
const testRun = job.createTestRun();
// First session with events close together
addViewEvent(testRun, "user", "2025-01-01T00:01:00Z");
addViewEvent(testRun, "user", "2025-01-01T00:05:00Z");
addViewEvent(testRun, "user", "2025-01-01T00:10:00Z");
testRun.addWatermark();
// Gap in activity (>15 minutes)
// Second session
addViewEvent(testRun, "user", "2025-01-01T00:30:00Z");
addViewEvent(testRun, "user", "2025-01-01T00:35:00Z");
testRun.addWatermark();
// Events from another user advances event time
addViewEvent(testRun, "other-user", "2025-01-01T01:00:00Z");
testRun.addWatermark();
// Run the test
await testRun.run();
And finally we assert that we get the sessions we expect:
- Go
- TypeScript
// Filter events to just focus on "user"
userEvents := []sessionwindow.SessionEvent{}
for _, event := range memorySink.Records {
if event.UserID == "user" {
userEvents = append(userEvents, event)
}
}
assert.Equal(t, []sessionwindow.SessionEvent{
{UserID: "user", Interval: "2025-01-01T00:01:00Z/2025-01-01T00:10:00Z"},
{UserID: "user", Interval: "2025-01-01T00:30:00Z/2025-01-01T00:35:00Z"},
}, userEvents)
// Filter events to just focus on "user"
const userEvents = memorySink.records.filter((event) => event.userID === "user");
expect(userEvents).toEqual([
{
userID: "user",
interval: "2025-01-01T00:01Z/2025-01-01T00:10Z",
},
{
userID: "user",
interval: "2025-01-01T00:30Z/2025-01-01T00:35Z",
},
]);
Wrapping Up
We've implemented session windows that close after 15 minutes of inactivity. Now how would we manage that new requirement I mentioned earlier: "sessions cannot exceed 24h"?
We can add a new condition to our OnEvent
handler for this requirement:
- Go
- TypeScript
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
sessionState := h.SessionSpec.StateFor(subject)
session := sessionState.Value()
eventTime := subject.Timestamp()
if session.IsZero() {
// Start a new session for the user
session = Session{Start: eventTime, End: eventTime}
} else if eventTime.After(session.End.Add(h.InactivityThreshold)) {
// If inactive, emit the session event and start a new session
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
session = Session{Start: eventTime, End: eventTime}
} else if eventTime.Sub(session.Start) >= 24*time.Hour {
// If session reaches 24 hours, emit it and start a new one
session.End = session.Start.Add(24 * time.Hour)
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
session = Session{Start: eventTime, End: eventTime}
} else {
// Just extend the current session
session = Session{Start: session.Start, End: eventTime}
}
sessionState.Set(session)
subject.SetTimer(session.End.Add(h.InactivityThreshold))
return nil
}
onEvent(subject: Subject, event: KeyedEvent): void {
const sessionState = this.sessionSpec.stateFor(subject);
let session = sessionState.value;
const eventTime = event.timestamp;
const maxLength = Temporal.Duration.from({ hours: 24 });
if (session === undefined) {
// Start a new session for the user
session = { start: eventTime, end: eventTime };
} else if (
Temporal.Duration.compare(session.end.until(eventTime), this.sessionTimeout) > 0
) {
// Emit the session event and start a new session
this.sink.collect(subject, createSessionEvent(subject.key, session));
session = { start: eventTime, end: eventTime };
} else if (
Temporal.Duration.compare(session.start.until(eventTime), maxLength) >= 0
) {
// The session reached 24 hours, emit a 24h session and start a new one
const end = session.start.add(maxLength);
this.sink.collect(
subject,
createSessionEvent(subject.key, { ...session, end })
);
session = { start: end, end };
} else {
// Extend the current session
session = { ...session, end: eventTime };
}
sessionState.setValue(session);
subject.setTimer(session.end.add(this.sessionTimeout));
}