Skip to main content

First Reduction Job: High Scores

Let's create a simple Reduction job that tracks high scores for players in a game. When a player achieves a new personal best score, our job will send an event to celebrate their achievement.

Overview

This job will:

  1. Process a stream of score events containing user IDs and numeric scores
  2. Remember the high score for each user
  3. Emit events whenever a user beats their previous best score

The input events represent the score that the player has at the end of a game:

{
"user_id": "player123",
"score": 100,
"timestamp": "2024-01-30T12:45:10Z"
}

And we'll print messages when a user achieves a new high score:

🏆 New high score for player123: 100 (previous: 0)

Complete Code

Here's the complete code for our high scores job:

main.go
package main

import (
"context"
"encoding/json"
"fmt"
"time"

"reduction.dev/reduction-go/connectors/stdio"
"reduction.dev/reduction-go/rxn"
"reduction.dev/reduction-go/topology"
)

// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
UserID string `json:"user_id"`
Score int `json:"score"`
Timestamp time.Time `json:"timestamp"`
}


// Handler tracks high scores for each user
type Handler struct {
// The sink collects the high score messages
Sink rxn.Sink[stdio.Event]

// ValueSpec tells reduction how to store and retrieve high scores for each user
HighScoreSpec rxn.ValueSpec[int]
}


// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ScoreEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}

return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
Value: eventData,
}}, nil
}


// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
var event ScoreEvent
if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
return err
}

// Get current high score state for this user
highScore := h.HighScoreSpec.StateFor(subject)

// Check if this is a new high score
if event.Score > highScore.Value() {
// Format and send the high score message
message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
event.UserID, event.Score, highScore.Value())
h.Sink.Collect(ctx, []byte(message))

// Update the stored high score
highScore.Set(event.Score)
}

return nil
}


// OnTimerExpired is not used in this handler
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
return nil
}

func main() {
// Configure the job
job := &topology.Job{
WorkerCount: topology.IntValue(1),
WorkingStorageLocation: topology.StringValue("storage"),
}

// Create a source that reads from stdin
source := stdio.NewSource(job, "Source", &stdio.SourceParams{
KeyEvent: KeyEvent,
Framing: stdio.Framing{Delimiter: []byte{'\n'}},
})

// Create a sink that writes to stdout
sink := stdio.NewSink(job, "Sink")

operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
// This is where we configure the operator handler. We define the value
// spec in the context of the operator, making the state spec available
// as static configuration.
Handler: func(op *topology.Operator) rxn.OperatorHandler {
highScoreSpec := topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{})
return &Handler{
Sink: sink,
HighScoreSpec: highScoreSpec,
}
},
})

source.Connect(operator)
operator.Connect(sink)

job.Run()
}

Code Walkthrough

Let's step through creating this example project and the key parts of the job code.

Create The Project

First you'll need to create a project with your preferred language. We'll call this project "highscores".

mkdir highscores && cd highscores # create a directory for your module
go mod init highscores # initialize the module
go get reduction.dev/reduction-go # install the Go SDK

For this small example you can put all the code in a main.go in the root of your module directory.

Event Type

We define a ScoreEvent type for parsing the JSON data of incoming score events:

// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
UserID string `json:"user_id"`
Score int `json:"score"`
Timestamp time.Time `json:"timestamp"`
}

State Management

Our handler maintains a single piece of state per user: their current high score.

// Handler tracks high scores for each user
type Handler struct {
// The sink collects the high score messages
Sink rxn.Sink[stdio.Event]

// ValueSpec tells reduction how to store and retrieve high scores for each user
HighScoreSpec rxn.ValueSpec[int]
}

Event Processing

KeyEvent is a stateless function that accepts the raw JSON input and specifies a key and a timestamp with its return value. The key allows Reduction to partition our data stream and the timestamp allows it track the time relative to the events ("event time").

// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ScoreEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}

return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
Value: eventData,
}}, nil
}

Once events are keyed and distributed in our Reduction cluster, they'll be handled by OnEvent. In this method we:

  • Decode the value of our KeyedEvent
  • Load the current high score from state
  • Update the current high score and send a new high score event if the user beat their previous high score.
// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
var event ScoreEvent
if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
return err
}

// Get current high score state for this user
highScore := h.HighScoreSpec.StateFor(subject)

// Check if this is a new high score
if event.Score > highScore.Value() {
// Format and send the high score message
message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
event.UserID, event.Score, highScore.Value())
h.Sink.Collect(ctx, []byte(message))

// Update the stored high score
highScore.Set(event.Score)
}

return nil
}

Job Configuration

Finally, we configure and run our job.

func main() {
// Configure the job
job := &topology.Job{
WorkerCount: topology.IntValue(1),
WorkingStorageLocation: topology.StringValue("storage"),
}

// Create a source that reads from stdin
source := stdio.NewSource(job, "Source", &stdio.SourceParams{
KeyEvent: KeyEvent,
Framing: stdio.Framing{Delimiter: []byte{'\n'}},
})

// Create a sink that writes to stdout
sink := stdio.NewSink(job, "Sink")

operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
// This is where we configure the operator handler. We define the value
// spec in the context of the operator, making the state spec available
// as static configuration.
Handler: func(op *topology.Operator) rxn.OperatorHandler {
highScoreSpec := topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{})
return &Handler{
Sink: sink,
HighScoreSpec: highScoreSpec,
}
},
})

source.Connect(operator)
operator.Connect(sink)

job.Run()
}

Running the Job

To run the job locally with our stdin source and sink, we'll first create a named pipe that we can write to.

mkfifo events

Build your Reduction handler:

go build # creates highscore file

In one terminal, start the job reading from the pipe:

reduction dev ./highscore < events

Then in another terminal, you can send events by echoing JSON to the pipe:

# First score for alice - new high score!
echo '{"user_id":"alice","score":100,"timestamp":"2024-01-30T12:00:00Z"}' > events

# Lower score for alice - no event emitted
echo '{"user_id":"alice","score":50,"timestamp":"2024-01-30T12:01:00Z"}' > events

# Bob's first score
echo '{"user_id":"bob","score":75,"timestamp":"2024-01-30T12:02:00Z"}' > events

# Alice beats her high score!
echo '{"user_id":"alice","score":150,"timestamp":"2024-01-30T12:03:00Z"}' > events

You should see output like:

🏆 New high score for alice: 100 (previous: 0)
🏆 New high score for bob: 75 (previous: 0)
🏆 New high score for alice: 150 (previous: 100)

The job will keep running and processing new events as you send them. When you're done testing, press Ctrl+C to stop the job and remove the named pipe:

rm events