First Reduction Job: High Scores
Let's create a simple Reduction job that tracks high scores for players in a game. When a player achieves a new personal best score, our job will emit an event to celebrate their achievement.
Overview
This job will:
- Process a stream of score events containing user IDs and numeric scores
- Remember the high score for each user
- Emit events whenever a user beats their previous best score
The input events will look like:
{
"user_id": "player123",
"score": 100,
"timestamp": "2024-01-30T12:45:10Z"
}
And we'll print messages when users achieve new high scores:
🏆 New high score for player123: 100 (previous: 0)
Complete Code
Here's the complete code for our high scores job:
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"reduction.dev/reduction-go/connectors/stdio"
"reduction.dev/reduction-go/rxn"
"reduction.dev/reduction-go/topology"
)
// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
UserID string `json:"user_id"`
Score int `json:"score"`
Timestamp time.Time `json:"timestamp"`
}
// Handler tracks high scores for each user
type Handler struct {
Sink rxn.Sink[stdio.Event]
HighScoreSpec rxn.ValueSpec[int]
}
// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ScoreEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
Value: eventData,
}}, nil
}
// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
var event ScoreEvent
if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
return err
}
// Get current high score state for this user
highScore := h.HighScoreSpec.StateFor(subject)
// Check if this is a new high score
if event.Score > highScore.Value() {
// Format and emit the high score message
message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
event.UserID, event.Score, highScore.Value())
h.Sink.Collect(ctx, []byte(message))
// Update the stored high score
highScore.Set(event.Score)
}
return nil
}
// OnTimerExpired is not used in this handler
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
return nil
}
func main() {
// Configure the job
job := &topology.Job{WorkerCount: 1, WorkingStorageLocation: "storage"}
// Create a source that reads from stdin
source := stdio.NewSource(job, "Source", &stdio.SourceParams{
KeyEvent: KeyEvent,
Framing: stdio.Framing{Delimiter: []byte{'\n'}},
})
// Create a sink that writes to stdout
sink := stdio.NewSink(job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
return &Handler{
Sink: sink,
HighScoreSpec: topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{}),
}
},
})
source.Connect(operator)
operator.Connect(sink)
job.Run()
}
Code Walkthrough
Let's step through the key parts of our job.
Create Go Project
First you'll need to create a Go project for your handler. We'll call it "highscores".
mkdir highscores && cd highscores # create a directory for your module
go mod init highscores # initialize the module
go get reduction.dev/reduction-go # install the Go SDK
For ths small example you can put all the code in a main.go
in the root
of your module directory.
Event Type
We define a single type for parsing the input score events:
// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
UserID string `json:"user_id"`
Score int `json:"score"`
Timestamp time.Time `json:"timestamp"`
}
State Management
Our handler maintains a single piece of state per user: their current high score.
// Handler tracks high scores for each user
type Handler struct {
Sink rxn.Sink[stdio.Event]
HighScoreSpec rxn.ValueSpec[int]
}
The ValueSpec[int]
is a type that tells Reduction how to store integers (the
high scores) and lets us retrieve a state value on each OnEvent
call.
Event Processing
KeyEvent
is a stateless function that accepts the raw JSON input and specifies
a key and a timestamp with its return value. The key allows Reduction to
partition our data stream and the timestamp allows it track the time relative to
the events ("event time").
// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ScoreEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
Value: eventData,
}}, nil
}
Once events are keyed and distributed in our Reduction cluster, they'll be
handled by OnEvent
. In this method we:
- Decode the value of our KeyedEvent
- Load the current high score from state
- Update the current high score and send a new high score event if the user beat their previous high score.
// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
var event ScoreEvent
if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
return err
}
// Get current high score state for this user
highScore := h.HighScoreSpec.StateFor(subject)
// Check if this is a new high score
if event.Score > highScore.Value() {
// Format and emit the high score message
message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
event.UserID, event.Score, highScore.Value())
h.Sink.Collect(ctx, []byte(message))
// Update the stored high score
highScore.Set(event.Score)
}
return nil
}
Job Configuration
Finally, we configure and run our job.
func main() {
// Configure the job
job := &topology.Job{WorkerCount: 1, WorkingStorageLocation: "storage"}
// Create a source that reads from stdin
source := stdio.NewSource(job, "Source", &stdio.SourceParams{
KeyEvent: KeyEvent,
Framing: stdio.Framing{Delimiter: []byte{'\n'}},
})
// Create a sink that writes to stdout
sink := stdio.NewSink(job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
return &Handler{
Sink: sink,
HighScoreSpec: topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{}),
}
},
})
source.Connect(operator)
operator.Connect(sink)
job.Run()
}
Running the Job
To run the job locally with our stdin
source and sink, we'll first create a
named pipe that we can write to.
mkfifo events
Build your Reduction handler:
go build # creates highscore file
In one terminal, start the job reading from the pipe:
reduction dev ./highscore < events
Then in another terminal, you can send events by echoing JSON to the pipe:
# First score for alice - new high score!
echo '{"user_id":"alice","score":100,"timestamp":"2024-01-30T12:00:00Z"}' > events
# Lower score for alice - no event emitted
echo '{"user_id":"alice","score":50,"timestamp":"2024-01-30T12:01:00Z"}' > events
# Bob's first score
echo '{"user_id":"bob","score":75,"timestamp":"2024-01-30T12:02:00Z"}' > events
# Alice beats her high score!
echo '{"user_id":"alice","score":150,"timestamp":"2024-01-30T12:03:00Z"}' > events
You should see output like:
🏆 New high score for alice: 100 (previous: 0)
🏆 New high score for bob: 75 (previous: 0)
🏆 New high score for alice: 150 (previous: 100)
The job will keep running and processing new events as you send them. When you're done testing, press Ctrl+C to stop the job and remove the named pipe:
rm events
Next Steps
This high scores example demonstrates the basics of building a stateful streaming application with Reduction. From here, you can start learning about windows in the Tumbling Windows tutorial.