Skip to main content

Getting Started

This guide will help you set up your environment and run your first Reduction job.

Installation

The Reduction CLI can be installed with homebrew:

brew install reduction-dev/tap/reduction

Or downloaded and run with one of these pre-built binaries:

Make sure you have reduction in your path:

$ reduction --version 
reduction version 0.0.1

Setup

Create a new Go project and add the Reduction Go SDK as a dependency. For example:

mkdir myjob
cd myjob
go mod init myjob

Author a Job

As a demo we'll create job that counts all the events from an embedded source of incrementing numbers and prints to stdout every 100,000th event it processes. Don't worry about the specifics of the job code yet, let's just get something running.

myjob/main.go
package main

import (
"context"
"fmt"
"time"

"reduction.dev/reduction-go/connectors/embedded"
"reduction.dev/reduction-go/connectors/stdio"
"reduction.dev/reduction-go/rxn"
"reduction.dev/reduction-go/topology"
)

type Handler struct {
sink rxn.Sink[stdio.Event]
countSpec rxn.ValueSpec[int]
}

// Count each event that arrives and send a message to the sink every 100,000 events.
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, event rxn.KeyedEvent) error {
count := h.countSpec.StateFor(subject)
count.Set(count.Value() + 1)
if count.Value()%100_000 == 0 {
h.sink.Collect(ctx, []byte(fmt.Sprintf("Count: %d", count.Value())))
}
return nil
}

func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
panic("timers not used")
}

func main() {
// Configure the job
job := &topology.Job{
WorkerCount: 1,
WorkingStorageLocation: "storage",
}
sink := stdio.NewSink(job, "Sink")
source := embedded.NewSource(job, "Source", &embedded.SourceParams{
Generator: "inc_nums",
KeyEvent: func(ctx context.Context, record []byte) ([]rxn.KeyedEvent, error) {
return []rxn.KeyedEvent{{}}, nil
},
})
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
count := topology.NewValueSpec(op, "Count", rxn.ScalarValueCodec[int]{})
return &Handler{sink: sink, countSpec: count}
},
})
source.Connect(operator)
operator.Connect(sink)

job.Run()
}

Running a Job

Try out the job:

  1. Install the dependencies and build the job:

    go mod tidy
    go build
  2. Run the job with reduction-dev:

    reduction dev ./myjob

At this point you should see a line printed to the terminal for every 100,000th event processed.