Skip to main content

Session Windows

Overview

This tutorial covers building a Reduction job to emit session windows. A session window represents a period of continuous activity that ends after a period of inactivity. If you want to capture a logical grouping of events related to a user like a single online shopping experience you'll want to use session windows.

Session windows split by gaps of inactivity

My Introduction to Stream Processing

Session windows were my first introduction to stateful stream processing. My team needed to capture user sessions that ended 15 minutes after the last session event. I was surprised when a developer's implementation was more complicated than the high-level session window API I expected. Just a couple simple sounding requirements like "sessions can't be more than 24 hours long" had fundamentally changed the approach. A goal of Reduction is accommodate new requirements without making developers learn an entirely new set of concepts.

For this example we'll work with view events that represent a user viewing a website:

{
"user_id": "user-a",
"timestamp": "2025-01-30T12:45:10Z"
}

And we'll emit events representing the time intervals that a user was actively on the site:

{
"user_id": "user-a",
"interval": "2025-01-30T12:45:00Z/2025-01-30T13:00:00Z"
}

We'll convert these JSON objects into the following types:

// ViewEvent represents a user viewing a page
type ViewEvent struct {
UserID string `json:"user_id"`
Timestamp time.Time `json:"timestamp"`
}

// SessionEvent represents a user's continuous session on the site
type SessionEvent struct {
UserID string `json:"user_id"`
Interval string `json:"interval"`
}

Keying Events

First we'll make a function to transform our JSON page view events into Reduction's KeyedEvent type.

func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ViewEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}

return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
}}, nil
}

Representing Session State

For session windows, we need to track the start and end time of each active session. We could use two different state values for that, but it's more convenient to using a single value to represent the session state.

We'll create a Session type to represent this state and a codec to handle encoding and decoding of the session data.

// Session represents the internal state of an active session
type Session struct {
Start time.Time
End time.Time
}

func (s Session) IsZero() bool {
return s.Start.IsZero() && s.End.IsZero()
}

func (s Session) Interval() string {
return fmt.Sprintf("%s/%s", s.Start.Format(time.RFC3339), s.End.Format(time.RFC3339))
}

// SessionCodec encodes and decodes Session values
type SessionCodec struct{}

// DecodeValue returns a Session from a string representation with ISO timestamps
func (c SessionCodec) Decode(b []byte) (Session, error) {
parts := strings.Split(string(b), "/")
if len(parts) != 2 {
return Session{}, fmt.Errorf("invalid session format: %s", b)
}

start, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
return Session{}, fmt.Errorf("invalid start time format: %w", err)
}

end, err := time.Parse(time.RFC3339, parts[1])
if err != nil {
return Session{}, fmt.Errorf("invalid end time format: %w", err)
}

return Session{start, end}, nil
}

// EncodeValue returns the string representation of a Session as ISO timestamps
func (c SessionCodec) Encode(value Session) ([]byte, error) {
return []byte(value.Interval()), nil
}

Creating the Handler

Our handler uses:

  • A Sink type that accepts SessionEvent
  • A ValueSpec to manage session state
  • An inactivity threshold to parameterize the sessions window size
// Handler is the session window operator handler
type Handler struct {
Sink rxn.Sink[SessionEvent]
SessionSpec rxn.ValueSpec[Session]
InactivityThreshold time.Duration
}

Processing Each Event

In the OnEvent method, we'll either:

  • Start a new session if we have no existing session for the user
  • Extend the current session if the event falls within its inactivity threshold
  • Close the current session and start a new one
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
// cut-start: on-event-24h
sessionState := h.SessionSpec.StateFor(subject)
session := sessionState.Value()
eventTime := subject.Timestamp()

if session.IsZero() {
// Start a new session for the user
session = Session{Start: eventTime, End: eventTime}
} else if eventTime.After(session.End.Add(h.InactivityThreshold)) {
// Emit the session event and start a new session
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
session = Session{Start: eventTime, End: eventTime}
} else {
// Extend the current session
session = Session{Start: session.Start, End: eventTime}
}

sessionState.Set(session)
subject.SetTimer(session.End.Add(h.InactivityThreshold))
return nil
}

By observing the events of a user we can tell when there's a 15m gap but what happens if we never get another event from the user? By setting a timer to trigger after 15m passes with subject.SetTimer, we can close the session in the OnTimerExpired method without needing to wait for another event from the user.

Processing Timers

When a timer expires, we check if the session should be closed based on the inactivity threshold. If so, we emit the session event and clean up the session state:

func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
sessionState := h.SessionSpec.StateFor(subject)
session := sessionState.Value()

// Check whether this is the latest timer we set for this subject
if timestamp.Equal(session.End.Add(h.InactivityThreshold)) {
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
sessionState.Drop()
}
return nil
}

Testing

Let's test our session window implementation with a series of events that should create two distinct sessions.

First we set up our job for the test. This uses the embedded source and memory sink for testing.

job := &topology.Job{}
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
KeyEvent: sessionwindow.KeyEvent,
})
memorySink := memory.NewSink[sessionwindow.SessionEvent](job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
return &sessionwindow.Handler{
Sink: memorySink,
SessionSpec: topology.NewValueSpec(op, "Session", sessionwindow.SessionCodec{}),
InactivityThreshold: 15 * time.Minute,
}
},
})
source.Connect(operator)
operator.Connect(memorySink)

Then we create a test run with events to test our handler:

tr := job.NewTestRun()

// First session with events close together
addViewEvent(tr, "user", "2025-01-01T00:01:00Z")
addViewEvent(tr, "user", "2025-01-01T00:05:00Z")
addViewEvent(tr, "user", "2025-01-01T00:10:00Z")
tr.AddWatermark()

// Gap in activity (>15 minutes)

// Second session
addViewEvent(tr, "user", "2025-01-01T00:30:00Z")
addViewEvent(tr, "user", "2025-01-01T00:35:00Z")
tr.AddWatermark()

// Events from another user advances event time
addViewEvent(tr, "other-user", "2025-01-01T01:00:00Z")
tr.AddWatermark()

require.NoError(t, tr.Run())

And finally we assert that we get the sessions we expect:

// Filter events to just focus on "user"
userEvents := []sessionwindow.SessionEvent{}
for _, event := range memorySink.Records {
if event.UserID == "user" {
userEvents = append(userEvents, event)
}
}

assert.Equal(t, []sessionwindow.SessionEvent{
{UserID: "user", Interval: "2025-01-01T00:01:00Z/2025-01-01T00:10:00Z"},
{UserID: "user", Interval: "2025-01-01T00:30:00Z/2025-01-01T00:35:00Z"},
}, userEvents)

Wrapping Up

We've implemented session windows that close after 15 minutes of inactivity. Now how would we manage that new requirement I mentioned earlier: "sessions cannot exceed 24h"?

We can add a new condition to our OnEvent handler for this requirement:

func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
sessionState := h.SessionSpec.StateFor(subject)
session := sessionState.Value()
eventTime := subject.Timestamp()

if session.IsZero() {
// Start a new session for the user
session = Session{Start: eventTime, End: eventTime}
} else if eventTime.After(session.End.Add(h.InactivityThreshold)) {
// If inactive, emit the session event and start a new session
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
session = Session{Start: eventTime, End: eventTime}
} else if eventTime.Sub(session.Start) >= 24*time.Hour {
// If session reaches 24 hours, emit it and start a new one
session.End = session.Start.Add(24 * time.Hour)
h.Sink.Collect(ctx, SessionEvent{string(subject.Key()), session.Interval()})
session = Session{Start: eventTime, End: eventTime}
} else {
// Just extend the current session
session = Session{Start: session.Start, End: eventTime}
}

sessionState.Set(session)
subject.SetTimer(session.End.Add(h.InactivityThreshold))
return nil
}