Getting Started
This tutorial will help you set up your local environment and run your first Reduction job.
Installation
The Reduction CLI can be installed with homebrew:
brew install reduction-dev/tap/reduction
Or downloaded and run with one of these pre-built binaries:
Make sure you have reduction
in your path:
$ reduction --version
reduction version 0.0.6
Setup
Create a new project to develop your job:
- Go
- TypeScript
mkdir myjob
cd myjob
go mod init myjob
mkdir myjob
cd myjob
bun init # choose "blank"
bun add reduction-ts
Author a Job
As a demo we'll create job that counts all the events from an embedded source of incrementing numbers and prints to stdout every 100,000th event it processes. Don't worry about the specifics of the job code yet, let's just get something running.
- Go
- TypeScript
myjob/main.go
package main
import (
"context"
"fmt"
"time"
"reduction.dev/reduction-go/connectors/embedded"
"reduction.dev/reduction-go/connectors/stdio"
"reduction.dev/reduction-go/rxn"
"reduction.dev/reduction-go/topology"
)
type Handler struct {
sink rxn.Sink[stdio.Event]
countSpec rxn.ValueSpec[int]
}
// Count each event that arrives and send a message to the sink every 100,000 events.
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
count := h.countSpec.StateFor(subject)
count.Set(count.Value() + 1)
if count.Value()%100_000 == 0 {
h.sink.Collect(ctx, []byte(fmt.Sprintf("Count: %d\n", count.Value())))
}
return nil
}
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
panic("timers not used")
}
func main() {
// Configure the job
job := &topology.Job{
WorkerCount: topology.IntValue(1),
WorkingStorageLocation: topology.StringValue("storage"),
}
sink := stdio.NewSink(job, "Sink")
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
Generator: embedded.GeneratorSequence,
KeyEvent: func(ctx context.Context, record []byte) ([]rxn.KeyedEvent, error) {
return []rxn.KeyedEvent{{}}, nil
},
})
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
count := topology.NewValueSpec(op, "Count", rxn.ScalarValueCodec[int]{})
return &Handler{sink: sink, countSpec: count}
},
})
source.Connect(operator)
operator.Connect(sink)
job.Run()
}
myjob/index.ts
import * as topology from "reduction-ts/topology";
import * as embedded from "reduction-ts/connectors/embedded";
import * as stdio from "reduction-ts/connectors/stdio";
import { uint64ValueCodec } from "reduction-ts/state";
import { type KeyedEvent, type Subject } from "reduction-ts";
import { Temporal } from "reduction-ts/temporal";
function createHandler(op: topology.Operator, sink: stdio.Sink) {
const countSpec = new topology.ValueSpec<number>(op, "count", uint64ValueCodec, 0);
return {
onEvent(subject: Subject, event: KeyedEvent) {
const count = countSpec.stateFor(subject);
count.setValue(count.value + 1);
if (count.value % 100_000 === 0) {
sink.collect(subject, Buffer.from(`Count: ${count.value}\n`));
}
},
onTimerExpired(subject: Subject, timer: Temporal.Instant) {
/* Timers not used */
},
};
}
const job = new topology.Job({
workerCount: 1,
workingStorageLocation: "storage",
});
const source = new embedded.Source(job, "source", {
keyEvent: () => [
{
key: new Uint8Array(),
value: new Uint8Array(),
timestamp: Temporal.Now.instant(),
},
],
generator: "sequence",
});
const sink = new stdio.Sink(job, "sink");
const operator = new topology.Operator(job, "operator", {
parallelism: 1,
handler: (op) => createHandler(op, sink),
});
source.connect(operator);
operator.connect(sink);
job.run();
Running a Job
Let's run your job locally using the reduction dev
command.
Build your job as an executable:
- Go
- TypeScript
go mod tidy
go build # creates an "myjob" executable
bun build --compile --outfile=myjob index.ts
Run the job with reduction-dev
:
reduction dev ./myjob
At this point you should see a line printed to the terminal for every 100,000th event processed.