Getting Started
This guide will help you set up your environment and run your first Reduction job.
Installation
The Reduction CLI can be installed with homebrew:
brew install reduction-dev/tap/reduction
Or downloaded and run with one of these pre-built binaries:
Make sure you have reduction
in your path:
$ reduction --version
reduction version 0.0.1
Setup
Create a new Go project and add the Reduction Go SDK as a dependency. For example:
mkdir myjob
cd myjob
go mod init myjob
Author a Job
As a demo we'll create job that counts all the events from an embedded source of incrementing numbers and prints to stdout every 100,000th event it processes. Don't worry about the specifics of the job code yet, let's just get something running.
myjob/main.go
package main
import (
"context"
"fmt"
"time"
"reduction.dev/reduction-go/connectors/embedded"
"reduction.dev/reduction-go/connectors/stdio"
"reduction.dev/reduction-go/rxn"
"reduction.dev/reduction-go/topology"
)
type Handler struct {
sink rxn.Sink[stdio.Event]
countSpec rxn.ValueSpec[int]
}
// Count each event that arrives and send a message to the sink every 100,000 events.
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
count := h.countSpec.StateFor(subject)
count.Set(count.Value() + 1)
if count.Value()%100_000 == 0 {
h.sink.Collect(ctx, []byte(fmt.Sprintf("Count: %d", count.Value())))
}
return nil
}
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
panic("timers not used")
}
func main() {
// Configure the job
job := &topology.Job{
WorkerCount: 1,
WorkingStorageLocation: "storage",
}
sink := stdio.NewSink(job, "Sink")
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
Generator: "inc_nums",
KeyEvent: func(ctx context.Context, record []byte) ([]rxn.KeyedEvent, error) {
return []rxn.KeyedEvent{{}}, nil
},
})
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
count := topology.NewValueSpec(op, "Count", rxn.ScalarValueCodec[int]{})
return &Handler{sink: sink, countSpec: count}
},
})
source.Connect(operator)
operator.Connect(sink)
job.Run()
}
Running a Job
Try out the job:
-
Install the dependencies and build the job:
go mod tidy
go build -
Run the job with
reduction-dev
:reduction dev ./myjob
At this point you should see a line printed to the terminal for every 100,000th event processed.