First Reduction Job: High Scores
Let's create a simple Reduction job that tracks high scores for players in a game. When a player achieves a new personal best score, our job will send an event to celebrate their achievement.
Overview
This job will:
- Process a stream of score events containing user IDs and numeric scores
- Remember the high score for each user
- Emit events whenever a user beats their previous best score
The input events represent the score that the player has at the end of a game:
{
  "user_id": "player123",
  "score": 100,
  "timestamp": "2024-01-30T12:45:10Z"
}
And we'll print messages when a user achieves a new high score:
🏆 New high score for player123: 100 (previous: 0)
Complete Code
Here's the complete code for our high scores job:
- Go
- TypeScript
package main
import (
	"context"
	"encoding/json"
	"fmt"
	"time"
	"reduction.dev/reduction-go/connectors/stdio"
	"reduction.dev/reduction-go/rxn"
	"reduction.dev/reduction-go/topology"
)
// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
	UserID    string    `json:"user_id"`
	Score     int       `json:"score"`
	Timestamp time.Time `json:"timestamp"`
}
// Handler tracks high scores for each user
type Handler struct {
	// The sink collects the high score messages
	Sink rxn.Sink[stdio.Event]
	// ValueSpec tells reduction how to store and retrieve high scores for each user
	HighScoreSpec rxn.ValueSpec[int]
}
// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
	var event ScoreEvent
	if err := json.Unmarshal(eventData, &event); err != nil {
		return nil, err
	}
	return []rxn.KeyedEvent{{
		Key:       []byte(event.UserID),
		Timestamp: event.Timestamp,
		Value:     eventData,
	}}, nil
}
// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
	var event ScoreEvent
	if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
		return err
	}
	// Get current high score state for this user
	highScore := h.HighScoreSpec.StateFor(subject)
	// Check if this is a new high score
	if event.Score > highScore.Value() {
		// Format and send the high score message
		message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
			event.UserID, event.Score, highScore.Value())
		h.Sink.Collect(ctx, []byte(message))
		// Update the stored high score
		highScore.Set(event.Score)
	}
	return nil
}
// OnTimerExpired is not used in this handler
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
	return nil
}
func main() {
	// Configure the job
	job := &topology.Job{
		WorkerCount:            topology.IntValue(1),
		WorkingStorageLocation: topology.StringValue("storage"),
	}
	// Create a source that reads from stdin
	source := stdio.NewSource(job, "Source", &stdio.SourceParams{
		KeyEvent: KeyEvent,
		Framing:  stdio.Framing{Delimiter: []byte{'\n'}},
	})
	// Create a sink that writes to stdout
	sink := stdio.NewSink(job, "Sink")
	operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
		// This is where we configure the operator handler. We define the value
		// spec in the context of the operator, making the state spec available
		// as static configuration.
		Handler: func(op *topology.Operator) rxn.OperatorHandler {
			highScoreSpec := topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{})
			return &Handler{
				Sink:          sink,
				HighScoreSpec: highScoreSpec,
			}
		},
	})
	source.Connect(operator)
	operator.Connect(sink)
	job.Run()
}
import type { KeyedEvent, OperatorHandler, Subject } from "reduction-ts";
import * as stdio from "reduction-ts/connectors/stdio";
import { uint64ValueCodec } from "reduction-ts/state";
import { Temporal } from "reduction-ts/temporal";
import * as topology from "reduction-ts/topology";
// ScoreEvent represents a user scoring points in a game
export interface ScoreEvent {
  user_id: string;
  score: number;
  timestamp: string;
}
// Handler tracks high scores for each user
export class Handler implements OperatorHandler {
  // The sink collects the high score messages
  private sink: topology.Sink<Uint8Array>;
  // ValueSpec tells reduction how to store and retrieve high scores for each user
  private highScoreSpec: topology.ValueSpec<number>;
  constructor(
    highScoreSpec: topology.ValueSpec<number>,
    sink: topology.Sink<Uint8Array>
  ) {
    this.highScoreSpec = highScoreSpec;
    this.sink = sink;
  }
  // onEvent processes each score event and emits when there's a new high score
  onEvent(subject: Subject, keyedEvent: KeyedEvent): void {
    const event: ScoreEvent = JSON.parse(Buffer.from(keyedEvent.value).toString());
    // Get current high score state for this user
    const highScore = this.highScoreSpec.stateFor(subject);
    // Check if this is a new high score
    if (event.score > highScore.value) {
      // Format and send the high score message
      const message = `🏆 New high score for ${event.user_id}: ${event.score} (previous: ${highScore.value})\n`;
      this.sink.collect(subject, Buffer.from(message));
      // Update the stored high score
      highScore.setValue(event.score);
    }
  }
  // Timers are not used in this example
  onTimerExpired(subject: Subject, timestamp: Temporal.Instant) {}
}
// KeyEvent extracts the user ID as the key for event routing and a timestamp
export function keyEvent(eventData: Uint8Array): KeyedEvent[] {
  const event: ScoreEvent = JSON.parse(Buffer.from(eventData).toString());
  return [
    {
      key: Buffer.from(event.user_id),
      timestamp: Temporal.Instant.from(event.timestamp),
      value: eventData,
    },
  ];
}
// Main function to run the job
if (require.main === module) {
  // Configure the job
  const job = new topology.Job({
    workerCount: 1,
    workingStorageLocation: "storage",
  });
  // Create a source that reads from stdin
  const source = new stdio.Source(job, "Source", {
    keyEvent,
    framing: stdio.Framing.delimited({ delimiter: Buffer.from("\n") }),
  });
  // Create a sink that writes to stdout
  const sink = new stdio.Sink(job, "Sink");
  // Create the operator with our handler
  const operator = new topology.Operator(job, "Operator", {
    parallelism: 1,
    // This is where we configure the operator handler. We define the value spec
    // in the context of the operator, making the state spec available as static
    // configuration.
    handler: (op) => {
      const highScoreSpec = new topology.ValueSpec<number>(
        op,
        "highscore",
        uint64ValueCodec,
        0
      );
      return new Handler(highScoreSpec, sink);
    },
  });
  source.connect(operator);
  operator.connect(sink);
  job.run();
}
Code Walkthrough
Let's step through creating this example project and the key parts of the job code.
Create The Project
First you'll need to create a project with your preferred language. We'll call this project "highscores".
- Go
- TypeScript
mkdir highscores && cd highscores # create a directory for your module
go mod init highscores # initialize the module
go get reduction.dev/reduction-go # install the Go SDK
For this small example you can put all the code in a main.go in the root
of your module directory.
For TypeScript I recommend Bun if you can use it because it's fast and can create single file executables.
mkdir highscores && cd highscores
bun init # choose "blank" project
bun add reduction-ts
We'll put all of our code for this example in the index.ts file.
Event Type
We define a ScoreEvent type for parsing the JSON data of incoming score events:
- Go
- TypeScript
// ScoreEvent represents a user scoring points in a game
type ScoreEvent struct {
	UserID    string    `json:"user_id"`
	Score     int       `json:"score"`
	Timestamp time.Time `json:"timestamp"`
}
// ScoreEvent represents a user scoring points in a game
export interface ScoreEvent {
  user_id: string;
  score: number;
  timestamp: string;
}
State Management
Our handler maintains a single piece of state per user: their current high score.
- Go
- TypeScript
// Handler tracks high scores for each user
type Handler struct {
	// The sink collects the high score messages
	Sink rxn.Sink[stdio.Event]
	// ValueSpec tells reduction how to store and retrieve high scores for each user
	HighScoreSpec rxn.ValueSpec[int]
}
// Handler tracks high scores for each user
export class Handler implements OperatorHandler {
  // The sink collects the high score messages
  private sink: topology.Sink<Uint8Array>;
  // ValueSpec tells reduction how to store and retrieve high scores for each user
  private highScoreSpec: topology.ValueSpec<number>;
  constructor(
    highScoreSpec: topology.ValueSpec<number>,
    sink: topology.Sink<Uint8Array>
  ) {
    this.highScoreSpec = highScoreSpec;
    this.sink = sink;
  }
Event Processing
KeyEvent is a stateless function that accepts the raw JSON input and specifies
a key and a timestamp with its return value. The key allows Reduction to
partition our data stream and the timestamp allows it track the time relative to
the events ("event time").
- Go
- TypeScript
// KeyEvent extracts the user ID as the key for event routing and a timestamp
func KeyEvent(ctx context.Context, eventData []byte) ([]rxn.KeyedEvent, error) {
	var event ScoreEvent
	if err := json.Unmarshal(eventData, &event); err != nil {
		return nil, err
	}
	return []rxn.KeyedEvent{{
		Key:       []byte(event.UserID),
		Timestamp: event.Timestamp,
		Value:     eventData,
	}}, nil
}
// KeyEvent extracts the user ID as the key for event routing and a timestamp
export function keyEvent(eventData: Uint8Array): KeyedEvent[] {
  const event: ScoreEvent = JSON.parse(Buffer.from(eventData).toString());
  return [
    {
      key: Buffer.from(event.user_id),
      timestamp: Temporal.Instant.from(event.timestamp),
      value: eventData,
    },
  ];
}
Once events are keyed and distributed in our Reduction cluster, they'll be
handled by OnEvent. In this method we:
- Decode the value of our KeyedEvent
- Load the current high score from state
- Update the current high score and send a new high score event if the user beat their previous high score.
- Go
- TypeScript
// OnEvent processes each score event and emits when there's a new high score
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
	var event ScoreEvent
	if err := json.Unmarshal(keyedEvent.Value, &event); err != nil {
		return err
	}
	// Get current high score state for this user
	highScore := h.HighScoreSpec.StateFor(subject)
	// Check if this is a new high score
	if event.Score > highScore.Value() {
		// Format and send the high score message
		message := fmt.Sprintf("🏆 New high score for %s: %d (previous: %d)\n",
			event.UserID, event.Score, highScore.Value())
		h.Sink.Collect(ctx, []byte(message))
		// Update the stored high score
		highScore.Set(event.Score)
	}
	return nil
}
// onEvent processes each score event and emits when there's a new high score
onEvent(subject: Subject, keyedEvent: KeyedEvent): void {
  const event: ScoreEvent = JSON.parse(Buffer.from(keyedEvent.value).toString());
  // Get current high score state for this user
  const highScore = this.highScoreSpec.stateFor(subject);
  // Check if this is a new high score
  if (event.score > highScore.value) {
    // Format and send the high score message
    const message = `🏆 New high score for ${event.user_id}: ${event.score} (previous: ${highScore.value})\n`;
    this.sink.collect(subject, Buffer.from(message));
    // Update the stored high score
    highScore.setValue(event.score);
  }
}
Job Configuration
Finally, we configure and run our job.
- Go
- TypeScript
func main() {
	// Configure the job
	job := &topology.Job{
		WorkerCount:            topology.IntValue(1),
		WorkingStorageLocation: topology.StringValue("storage"),
	}
	// Create a source that reads from stdin
	source := stdio.NewSource(job, "Source", &stdio.SourceParams{
		KeyEvent: KeyEvent,
		Framing:  stdio.Framing{Delimiter: []byte{'\n'}},
	})
	// Create a sink that writes to stdout
	sink := stdio.NewSink(job, "Sink")
	operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
		// This is where we configure the operator handler. We define the value
		// spec in the context of the operator, making the state spec available
		// as static configuration.
		Handler: func(op *topology.Operator) rxn.OperatorHandler {
			highScoreSpec := topology.NewValueSpec(op, "highscore", rxn.ScalarValueCodec[int]{})
			return &Handler{
				Sink:          sink,
				HighScoreSpec: highScoreSpec,
			}
		},
	})
	source.Connect(operator)
	operator.Connect(sink)
	job.Run()
}
// Main function to run the job
if (require.main === module) {
  // Configure the job
  const job = new topology.Job({
    workerCount: 1,
    workingStorageLocation: "storage",
  });
  // Create a source that reads from stdin
  const source = new stdio.Source(job, "Source", {
    keyEvent,
    framing: stdio.Framing.delimited({ delimiter: Buffer.from("\n") }),
  });
  // Create a sink that writes to stdout
  const sink = new stdio.Sink(job, "Sink");
  // Create the operator with our handler
  const operator = new topology.Operator(job, "Operator", {
    parallelism: 1,
    // This is where we configure the operator handler. We define the value spec
    // in the context of the operator, making the state spec available as static
    // configuration.
    handler: (op) => {
      const highScoreSpec = new topology.ValueSpec<number>(
        op,
        "highscore",
        uint64ValueCodec,
        0
      );
      return new Handler(highScoreSpec, sink);
    },
  });
  source.connect(operator);
  operator.connect(sink);
  job.run();
}
Running the Job
To run the job locally with our stdin source and sink, we'll first create a
named pipe that we can write to.
mkfifo events
Build your Reduction handler:
- Go
- TypeScript
go build # creates highscore file
bun build --compile --outfile=highscore index.ts # creates highscore file
In one terminal, start the job reading from the pipe:
reduction dev ./highscore < events
Then in another terminal, you can send events by echoing JSON to the pipe:
# First score for alice - new high score!
echo '{"user_id":"alice","score":100,"timestamp":"2024-01-30T12:00:00Z"}' > events
# Lower score for alice - no event emitted
echo '{"user_id":"alice","score":50,"timestamp":"2024-01-30T12:01:00Z"}' > events
# Bob's first score
echo '{"user_id":"bob","score":75,"timestamp":"2024-01-30T12:02:00Z"}' > events
# Alice beats her high score!
echo '{"user_id":"alice","score":150,"timestamp":"2024-01-30T12:03:00Z"}' > events
You should see output like:
🏆 New high score for alice: 100 (previous: 0)
🏆 New high score for bob: 75 (previous: 0)
🏆 New high score for alice: 150 (previous: 100)
The job will keep running and processing new events as you send them. When you're done testing, press Ctrl+C to stop the job and remove the named pipe:
rm events