First Reduction Job: High Scores
Let's create a simple Reduction job that tracks high scores for players in a game. When a player achieves a new personal best score, our job will send an event to celebrate their achievement.
Overview
This job will:
- Process a stream of score events containing user IDs and numeric scores
- Remember the high score for each user
- Emit events whenever a user beats their previous best score
The input events represent the score that the player has at the end of a game:
{
"user_id": "player123",
"score": 100,
"timestamp": "2024-01-30T12:45:10Z"
}
And we'll print messages when a user achieves a new high score:
🏆 New high score for player123: 100 (previous: 0)
Complete Code
Here's the complete code for our high scores job:
- Go
- TypeScript
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"reduction.dev/reduction-go/connectors/stdio"
"reduction.dev/reduction-go/rxn"
"reduction.dev/reduction-go/topology"
)
// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
UserID string `json:"user_id"`
Score int `json:"score"`
Timestamp time.Time `json:"timestamp"`
}
// Handler tracks high scores for each user
type Handler struct {
// The sink collects the high score messages
Sink rxn.Sink[stdio.Event]
// ValueSpec tells reduction how to store and retrieve high scores for each user
HighScoreSpec rxn.ValueSpec[int]
}
// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ScoreEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
Value: eventData,
}}, nil
}
// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
var event ScoreEvent
if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
return err
}
// Get current high score state for this user
highScore := h.HighScoreSpec.StateFor(subject)
// Check if this is a new high score
if event.Score > highScore.Value() {
// Format and send the high score message
message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
event.UserID, event.Score, highScore.Value())
h.Sink.Collect(ctx, []byte(message))
// Update the stored high score
highScore.Set(event.Score)
}
return nil
}
// OnTimerExpired is not used in this handler
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
return nil
}
func main() {
// Configure the job
job := &topology.Job{
WorkerCount: topology.IntValue(1),
WorkingStorageLocation: topology.StringValue("storage"),
}
// Create a source that reads from stdin
source := stdio.NewSource(job, "Source", &stdio.SourceParams{
KeyEvent: KeyEvent,
Framing: stdio.Framing{Delimiter: []byte{'\n'}},
})
// Create a sink that writes to stdout
sink := stdio.NewSink(job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
// This is where we configure the operator handler. We define the value
// spec in the context of the operator, making the state spec available
// as static configuration.
Handler: func(op *topology.Operator) rxn.OperatorHandler {
highScoreSpec := topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{})
return &Handler{
Sink: sink,
HighScoreSpec: highScoreSpec,
}
},
})
source.Connect(operator)
operator.Connect(sink)
job.Run()
}
import type { KeyedEvent, OperatorHandler, Subject } from "reduction-ts";
import * as stdio from "reduction-ts/connectors/stdio";
import { uint64ValueCodec } from "reduction-ts/state";
import { Temporal } from "reduction-ts/temporal";
import * as topology from "reduction-ts/topology";
// ScoreEvent represents a user scoring points in a game
export interface ScoreEvent {
user_id: string;
score: number;
timestamp: string;
}
// Handler tracks high scores for each user
export class Handler implements OperatorHandler {
// The sink collects the high score messages
private sink: topology.Sink<Uint8Array>;
// ValueSpec tells reduction how to store and retrieve high scores for each user
private highScoreSpec: topology.ValueSpec<number>;
constructor(
highScoreSpec: topology.ValueSpec<number>,
sink: topology.Sink<Uint8Array>
) {
this.highScoreSpec = highScoreSpec;
this.sink = sink;
}
// onEvent processes each score event and emits when there's a new high score
onEvent(subject: Subject, keyedEvent: KeyedEvent): void {
const event: ScoreEvent = JSON.parse(Buffer.from(keyedEvent.value).toString());
// Get current high score state for this user
const highScore = this.highScoreSpec.stateFor(subject);
// Check if this is a new high score
if (event.score > highScore.value) {
// Format and send the high score message
const message = `🏆 New high score for ${event.user_id}: ${event.score} (previous: ${highScore.value})\n`;
this.sink.collect(subject, Buffer.from(message));
// Update the stored high score
highScore.setValue(event.score);
}
}
// Timers are not used in this example
onTimerExpired(subject: Subject, timestamp: Temporal.Instant) {}
}
// KeyEvent extracts the user ID as the key for event routing and a timestamp
export function keyEvent(eventData: Uint8Array): KeyedEvent[] {
const event: ScoreEvent = JSON.parse(Buffer.from(eventData).toString());
return [
{
key: Buffer.from(event.user_id),
timestamp: Temporal.Instant.from(event.timestamp),
value: eventData,
},
];
}
// Main function to run the job
if (require.main === module) {
// Configure the job
const job = new topology.Job({
workerCount: 1,
workingStorageLocation: "storage",
});
// Create a source that reads from stdin
const source = new stdio.Source(job, "Source", {
keyEvent,
framing: stdio.Framing.delimited({ delimiter: Buffer.from("\n") }),
});
// Create a sink that writes to stdout
const sink = new stdio.Sink(job, "Sink");
// Create the operator with our handler
const operator = new topology.Operator(job, "Operator", {
parallelism: 1,
// This is where we configure the operator handler. We define the value spec
// in the context of the operator, making the state spec available as static
// configuration.
handler: (op) => {
const highScoreSpec = new topology.ValueSpec<number>(
op,
"highscore",
uint64ValueCodec,
0
);
return new Handler(highScoreSpec, sink);
},
});
source.connect(operator);
operator.connect(sink);
job.run();
}
Code Walkthrough
Let's step through creating this example project and the key parts of the job code.
Create The Project
First you'll need to create a project with your preferred language. We'll call this project "highscores".
- Go
- TypeScript
mkdir highscores && cd highscores # create a directory for your module
go mod init highscores # initialize the module
go get reduction.dev/reduction-go # install the Go SDK
For this small example you can put all the code in a main.go
in the root
of your module directory.
For TypeScript I recommend Bun if you can use it because it's fast and can create single file executables.
mkdir highscores && cd highscores
bun init # choose "blank" project
bun add reduction-ts
We'll put all of our code for this example in the index.ts
file.
Event Type
We define a ScoreEvent
type for parsing the JSON data of incoming score events:
- Go
- TypeScript
// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
UserID string `json:"user_id"`
Score int `json:"score"`
Timestamp time.Time `json:"timestamp"`
}
// ScoreEvent represents a user scoring points in a game
export interface ScoreEvent {
user_id: string;
score: number;
timestamp: string;
}
State Management
Our handler maintains a single piece of state per user: their current high score.
- Go
- TypeScript
// Handler tracks high scores for each user
type Handler struct {
// The sink collects the high score messages
Sink rxn.Sink[stdio.Event]
// ValueSpec tells reduction how to store and retrieve high scores for each user
HighScoreSpec rxn.ValueSpec[int]
}
// Handler tracks high scores for each user
export class Handler implements OperatorHandler {
// The sink collects the high score messages
private sink: topology.Sink<Uint8Array>;
// ValueSpec tells reduction how to store and retrieve high scores for each user
private highScoreSpec: topology.ValueSpec<number>;
constructor(
highScoreSpec: topology.ValueSpec<number>,
sink: topology.Sink<Uint8Array>
) {
this.highScoreSpec = highScoreSpec;
this.sink = sink;
}
Event Processing
KeyEvent
is a stateless function that accepts the raw JSON input and specifies
a key and a timestamp with its return value. The key allows Reduction to
partition our data stream and the timestamp allows it track the time relative to
the events ("event time").
- Go
- TypeScript
// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
var event ScoreEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return nil, err
}
return []rxn.KeyedEvent{{
Key: []byte(event.UserID),
Timestamp: event.Timestamp,
Value: eventData,
}}, nil
}
// KeyEvent extracts the user ID as the key for event routing and a timestamp
export function keyEvent(eventData: Uint8Array): KeyedEvent[] {
const event: ScoreEvent = JSON.parse(Buffer.from(eventData).toString());
return [
{
key: Buffer.from(event.user_id),
timestamp: Temporal.Instant.from(event.timestamp),
value: eventData,
},
];
}
Once events are keyed and distributed in our Reduction cluster, they'll be
handled by OnEvent
. In this method we:
- Decode the value of our KeyedEvent
- Load the current high score from state
- Update the current high score and send a new high score event if the user beat their previous high score.
- Go
- TypeScript
// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
var event ScoreEvent
if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
return err
}
// Get current high score state for this user
highScore := h.HighScoreSpec.StateFor(subject)
// Check if this is a new high score
if event.Score > highScore.Value() {
// Format and send the high score message
message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
event.UserID, event.Score, highScore.Value())
h.Sink.Collect(ctx, []byte(message))
// Update the stored high score
highScore.Set(event.Score)
}
return nil
}
// onEvent processes each score event and emits when there's a new high score
onEvent(subject: Subject, keyedEvent: KeyedEvent): void {
const event: ScoreEvent = JSON.parse(Buffer.from(keyedEvent.value).toString());
// Get current high score state for this user
const highScore = this.highScoreSpec.stateFor(subject);
// Check if this is a new high score
if (event.score > highScore.value) {
// Format and send the high score message
const message = `🏆 New high score for ${event.user_id}: ${event.score} (previous: ${highScore.value})\n`;
this.sink.collect(subject, Buffer.from(message));
// Update the stored high score
highScore.setValue(event.score);
}
}
Job Configuration
Finally, we configure and run our job.
- Go
- TypeScript
func main() {
// Configure the job
job := &topology.Job{
WorkerCount: topology.IntValue(1),
WorkingStorageLocation: topology.StringValue("storage"),
}
// Create a source that reads from stdin
source := stdio.NewSource(job, "Source", &stdio.SourceParams{
KeyEvent: KeyEvent,
Framing: stdio.Framing{Delimiter: []byte{'\n'}},
})
// Create a sink that writes to stdout
sink := stdio.NewSink(job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
// This is where we configure the operator handler. We define the value
// spec in the context of the operator, making the state spec available
// as static configuration.
Handler: func(op *topology.Operator) rxn.OperatorHandler {
highScoreSpec := topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{})
return &Handler{
Sink: sink,
HighScoreSpec: highScoreSpec,
}
},
})
source.Connect(operator)
operator.Connect(sink)
job.Run()
}
// Main function to run the job
if (require.main === module) {
// Configure the job
const job = new topology.Job({
workerCount: 1,
workingStorageLocation: "storage",
});
// Create a source that reads from stdin
const source = new stdio.Source(job, "Source", {
keyEvent,
framing: stdio.Framing.delimited({ delimiter: Buffer.from("\n") }),
});
// Create a sink that writes to stdout
const sink = new stdio.Sink(job, "Sink");
// Create the operator with our handler
const operator = new topology.Operator(job, "Operator", {
parallelism: 1,
// This is where we configure the operator handler. We define the value spec
// in the context of the operator, making the state spec available as static
// configuration.
handler: (op) => {
const highScoreSpec = new topology.ValueSpec<number>(
op,
"highscore",
uint64ValueCodec,
0
);
return new Handler(highScoreSpec, sink);
},
});
source.connect(operator);
operator.connect(sink);
job.run();
}
Running the Job
To run the job locally with our stdin
source and sink, we'll first create a
named pipe that we can write to.
mkfifo events
Build your Reduction handler:
- Go
- TypeScript
go build # creates highscore file
bun build --compile --outfile=highscore index.ts # creates highscore file
In one terminal, start the job reading from the pipe:
reduction dev ./highscore < events
Then in another terminal, you can send events by echoing JSON to the pipe:
# First score for alice - new high score!
echo '{"user_id":"alice","score":100,"timestamp":"2024-01-30T12:00:00Z"}' > events
# Lower score for alice - no event emitted
echo '{"user_id":"alice","score":50,"timestamp":"2024-01-30T12:01:00Z"}' > events
# Bob's first score
echo '{"user_id":"bob","score":75,"timestamp":"2024-01-30T12:02:00Z"}' > events
# Alice beats her high score!
echo '{"user_id":"alice","score":150,"timestamp":"2024-01-30T12:03:00Z"}' > events
You should see output like:
🏆 New high score for alice: 100 (previous: 0)
🏆 New high score for bob: 75 (previous: 0)
🏆 New high score for alice: 150 (previous: 100)
The job will keep running and processing new events as you send them. When you're done testing, press Ctrl+C to stop the job and remove the named pipe:
rm events