Deploying to ECS With CDK
This guide demonstrates how to deploy a Reduction cluster on AWS ECS using CDK.
This example implements a basic "word count" job that reads data from a Kinesis stream and logs the count of each word whenever it changes.
Word Count Reduction Job
package main
import (
"context"
"fmt"
"strings"
"time"
"reduction.dev/reduction-go/connectors/kinesis"
"reduction.dev/reduction-go/connectors/stdio"
"reduction.dev/reduction-go/rxn"
"reduction.dev/reduction-go/topology"
)
// Handler tracks word counts
type Handler struct {
// The sink collects word count results
Sink rxn.Sink[stdio.Event]
// MapSpec tells reduction how to store and retrieve counts for each word
WordCountSpec rxn.ValueSpec[int]
}
// KeyEvent extracts words from the input text and creates events for each
func KeyEvent(ctx context.Context, eventData *kinesis.Record) ([]rxn.KeyedEvent, error) {
words := strings.Fields(string(eventData.Data))
keyedEvents := make([]rxn.KeyedEvent, 0, len(words))
for _, word := range words {
// Normalize word (lowercase, remove punctuation)
word = strings.ToLower(strings.Trim(word, ",.!?;:\"'()"))
if word == "" {
continue
}
keyedEvents = append(keyedEvents, rxn.KeyedEvent{
Key: []byte(word),
Timestamp: eventData.Timestamp,
})
}
return keyedEvents, nil
}
// OnEvent processes each word event and updates its count
func (h *Handler) OnEvent(ctx context.Context, subject rxn.Subject, keyedEvent rxn.KeyedEvent) error {
// Get count for current word
wordCount := h.WordCountSpec.StateFor(subject)
// Increment count
wordCount.Set(wordCount.Value() + 1)
// Collect the result
h.Sink.Collect(ctx, fmt.Appendf(nil, "%s: %d\n", string(subject.Key()), wordCount.Value()))
return nil
}
// OnTimerExpired is not used
func (h *Handler) OnTimerExpired(ctx context.Context, subject rxn.Subject, timestamp time.Time) error {
return nil
}
func main() {
// Configure the job
job := &topology.Job{
WorkingStorageLocation: topology.StringParam("STORAGE_PATH"),
WorkerCount: topology.IntParam("WORKER_COUNT"),
}
// Create a source that reads from kinesis
source := kinesis.NewSource(job, "Source", &kinesis.SourceParams{
StreamARN: topology.StringParam("KINESIS_STREAM_ARN"),
KeyEvent: KeyEvent,
})
// Create a sink that writes to stdout
sink := stdio.NewSink(job, "Sink")
operator := topology.NewOperator(job, "Operator", &topology.OperatorParams{
Handler: func(op *topology.Operator) rxn.OperatorHandler {
wordCountSpec := topology.NewValueSpec(op, "wordcount", rxn.ScalarValueCodec[int]{})
return &Handler{
Sink: sink,
WordCountSpec: wordCountSpec,
}
},
})
source.Connect(operator)
operator.Connect(sink)
job.Run()
}
The demo code makes cost-friendly infrastructure decisions:
- Deploys to public subnets to avoid the additional cost of NAT gateways required for private subnets
- Uses Fargate Spot instances with ARM CPU architecture
- Sets retain policies to DESTROY to ensure all resources are removed when deleting the stack
- Uses AWS service discovery tools instead of load balancers
Tools Used
- AWS CDK: An infrastructure as code tool that generates CloudFormation templates.
- Bun: A Node-compatible runtime that executes the CDK code.
- Go: The programming language used to write the Reduction job.
- Docker: Container technology used to package both the Reduction Engine components and Handler service.
- ECS: AWS's container orchestration service for deploying Docker containers.
- Fargate: A serverless compute engine for running containers.
- Service Connect: AWS's Envoy integration for ECS that provides well-known addresses.
Building Job Handler and Config
The Reduction Handler code that you write defines the job's configuration and creates the Handler Service that the Reduction engine calls. To deploy you'll first want to build the handler code and write the job configuration file. You can do this anywhere in your CI process but in this example we'll build these artifacts during CDK synthesis.
Building the handler executable:
// Build the Go executable for ARM
const buildProc = Bun.spawnSync(['go', 'build', '-o', 'deploy-go'], {
cwd: '../',
env: { ...process.env, GOOS: 'linux', GOARCH: 'arm64' },
});
if (!buildProc.success) {
throw new Error(`Failed to build handler: ${buildProc.stderr}`);
}
Writing the config file to disk:
// Write the handler's job.config file
const configProc = Bun.spawnSync(['go', 'run', 'main.go', 'config'], { cwd: '../' });
if (!configProc.success) {
throw new Error(`Failed to build job.json: ${configProc.stderr}`);
}
const jobConfigPath = path.resolve('../job.json');
await Bun.write(jobConfigPath, configProc.stdout);
The Job Manager ECS Service
The Job Manager service needs:
-
The Reduction Docker image
A Docker image containing the Reduction binary. Tasks run the
reduction job
command. -
The job.config file
The Job Manager requires the job.config file and any values only known at deploy time to resolve configuration parameters. In this case, we provide a Kinesis stream ARN, a working storage path, and the number of workers in the cluster as environment variables to the Job service.
-
A well-known network address
When the Reduction cluster boots, workers register with the Job Manager. To make the Job Manager available on the network, we'll use Service Connect to give it a well-known endpoint.
-
Write access to an S3 Bucket
The job needs write access to a storage location to save and restore checkpoints.
-
Network access to communicate with workers
We use a shared security group to allow the Job Manager and Worker nodes to communicate with each other.
Job Manager Service
import assert from 'assert';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecr_assets from 'aws-cdk-lib/aws-ecr-assets';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as s3_assets from 'aws-cdk-lib/aws-s3-assets';
import { Construct } from 'constructs';
export interface JobManagerServiceProps {
/**
* The ECS cluster to deploy to
*/
cluster: ecs.ICluster;
/**
* The Docker image asset for the Reduction
*/
reductionImage: ecr_assets.DockerImageAsset;
/**
* S3 bucket for checkpoints
*/
bucket: s3.IBucket;
/**
* The job configuation S3 asset
*/
jobConfigAsset: s3_assets.Asset;
/**
* Kinesis stream for job source data
*/
sourceStream: kinesis.IStream;
/**
* The number of workers to run
*/
workerCount: number;
/**
* The shared Reduction cluster security group
*/
securityGroup: ec2.ISecurityGroup;
}
// The port the job manager listens on for worker communication
const jobRpcPort = 8081;
/**
* An ECS Job Manager service
*/
export class JobManagerService extends Construct implements ec2.IConnectable {
public readonly connections: ec2.Connections;
public readonly endpoint: string;
constructor(scope: Construct, id: string, props: JobManagerServiceProps) {
super(scope, id);
const taskDefinition = new ecs.FargateTaskDefinition(this, 'Task', {
runtimePlatform: {
operatingSystemFamily: ecs.OperatingSystemFamily.LINUX,
cpuArchitecture: ecs.CpuArchitecture.ARM64,
},
});
props.bucket.grantReadWrite(taskDefinition.taskRole);
props.jobConfigAsset.grantRead(taskDefinition.taskRole);
props.sourceStream.grantRead(taskDefinition.taskRole);
taskDefinition.addContainer('Container', {
image: ecs.ContainerImage.fromDockerImageAsset(props.reductionImage),
portMappings: [{ containerPort: jobRpcPort, name: 'job-rpc' }],
logging: ecs.LogDriver.awsLogs({
streamPrefix: 'JobManager',
logGroup: new logs.LogGroup(this, 'LogGroup', {
logGroupName: 'JobManager',
retention: logs.RetentionDays.ONE_DAY,
}),
}),
command: ['job', props.jobConfigAsset.s3ObjectUrl],
environment: {
REDUCTION_PARAM_STORAGE_PATH: props.bucket.s3UrlForObject("/working-storage"),
REDUCTION_PARAM_WORKER_COUNT: props.workerCount.toString(),
REDUCTION_PARAM_KINESIS_STREAM_ARN: props.sourceStream.streamArn,
},
});
const service = new ecs.FargateService(this, 'Default', {
cluster: props.cluster,
securityGroups: [props.securityGroup],
taskDefinition,
desiredCount: 1,
minHealthyPercent: 0,
enableExecuteCommand: true,
assignPublicIp: true,
vpcSubnets: { subnetType: ec2.SubnetType.PUBLIC },
capacityProviderStrategies: [{
capacityProvider: 'FARGATE_SPOT',
weight: 1,
}],
serviceConnectConfiguration: {
services: [{ portMappingName: 'job-rpc' }],
},
});
this.connections = service.connections;
// Store the endpoint for other services to use
assert(props.cluster.defaultCloudMapNamespace, "Cluster must have a default Cloud Map namespace");
this.endpoint = `job-rpc.${props.cluster.defaultCloudMapNamespace?.namespaceName}:${jobRpcPort}`;
}
}
The Worker ECS Service
The WorkerService
runs instances of Reduction Workers. This service needs:
-
The Reduction Docker image
A Docker image containing the Reduction binary. Tasks run the
reduction worker
command. -
The Job Manager endpoint
Workers use the Job Manager endpoint to register when they boot.
-
Network access to communicate with the Job Manager and the Handler
The workers share a security group with the Job Manager, and the
WorkerService
construct implementsec2.IConnectable
to configure access to the Handler.WorkerService
is passed ajobManagerEndpoint
and ahandlerEndpoint
to call. -
Access read from the job source
WorkerService
implementsiam.IGrantable
so that we can grant access to read the Kinesis stream.
Worker Service
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecr_assets from 'aws-cdk-lib/aws-ecr-assets';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';
export interface WorkerServiceProps {
/**
* The ECS cluster to deploy to
*/
cluster: ecs.ICluster;
/**
* The Docker image asset for the Reduction
*/
reductionImage: ecr_assets.DockerImageAsset;
/**
* The handler service endpoint including the port
*/
handlerEndpoint: string;
/**
* The job manager service endpoint (hostname or IP)
*/
jobManagerEndpoint: string;
/**
* The number of workers to run
*/
workerCount: number;
/**
* The shared Reduction cluster security group
*/
securityGroup: ec2.ISecurityGroup;
}
/**
* An ECS Worker Service
*/
export class WorkerService extends Construct implements ec2.IConnectable, iam.IGrantable {
public readonly connections: ec2.Connections;
public readonly service: ecs.FargateService;
public readonly grantPrincipal: iam.IPrincipal;
constructor(scope: Construct, id: string, props: WorkerServiceProps) {
super(scope, id);
const taskDefinition = new ecs.FargateTaskDefinition(this, 'Task', {
memoryLimitMiB: 512,
cpu: 256,
runtimePlatform: {
operatingSystemFamily: ecs.OperatingSystemFamily.LINUX,
cpuArchitecture: ecs.CpuArchitecture.ARM64,
},
});
taskDefinition.addContainer('Container', {
image: ecs.ContainerImage.fromDockerImageAsset(props.reductionImage),
logging: ecs.LogDriver.awsLogs({
streamPrefix: 'Worker',
logGroup: new logs.LogGroup(this, 'LogGroup', {
logGroupName: 'Worker',
retention: logs.RetentionDays.ONE_DAY,
}),
}),
command: ['worker',
'--job-addr', props.jobManagerEndpoint,
'--handler-addr', props.handlerEndpoint],
});
this.service = new ecs.FargateService(this, 'Default', {
cluster: props.cluster,
securityGroups: [props.securityGroup],
taskDefinition,
desiredCount: props.workerCount,
capacityProviderStrategies: [{
capacityProvider: 'FARGATE_SPOT',
weight: 1,
}],
enableExecuteCommand: true,
minHealthyPercent: 0,
assignPublicIp: true,
vpcSubnets: { subnetType: ec2.SubnetType.PUBLIC },
serviceConnectConfiguration: {},
});
this.connections = this.service.connections;
this.grantPrincipal = taskDefinition.taskRole;
}
}
The Handler Service
The Handler service runs your job code that the Reduction workers call. This could be an AWS Lambda with a function URL, a service behind a load balancer, or any compute capable of servicing HTTP requests.
This Handler service needs:
-
A Docker image with your handler code
Create a Dockerfile with that runs your handler executable.
-
A well-known network address
The service runs on an ECS cluster configured with a namespace for Service Connect. The
HandlerService
construct exposes anendpoint
address for the Workers to call which routes calls toHandlerService
tasks.
Handler Service
import assert from 'assert';
import * as cdk from 'aws-cdk-lib';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecr_assets from 'aws-cdk-lib/aws-ecr-assets';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';
export interface HandlerServiceProps {
/**
* The ECS cluster to deploy to
*/
cluster: ecs.ICluster;
/**
* The Docker image asset for the Handler
*/
handlerImage: ecr_assets.DockerImageAsset;
/**
* The number of handler instances to run
*/
desiredCount: number;
}
// The port the handler listens on for HTTP requests
const handlerPort = 8080;
/**
* A Handler ECS service.
*
* The handler service is the user-defined service that the Reduction engine
* calls.
*/
export class HandlerService extends Construct implements ec2.IConnectable {
/**
* The endpoint for connecting to the handler
*/
public readonly endpoint: string;
/**
* Implements IConnectable for security group access control
*/
public readonly connections: ec2.Connections;
constructor(scope: Construct, id: string, props: HandlerServiceProps) {
super(scope, id);
const taskDefinition = new ecs.FargateTaskDefinition(this, 'Task', {
memoryLimitMiB: 512,
cpu: 256,
runtimePlatform: {
operatingSystemFamily: ecs.OperatingSystemFamily.LINUX,
cpuArchitecture: ecs.CpuArchitecture.ARM64,
},
});
taskDefinition.addContainer('Container', {
image: ecs.ContainerImage.fromDockerImageAsset(props.handlerImage),
portMappings: [{ containerPort: handlerPort, name: 'handler' }],
logging: ecs.LogDriver.awsLogs({
streamPrefix: 'Handler',
logGroup: new logs.LogGroup(this, 'LogGroup', {
logGroupName: 'Handler',
retention: logs.RetentionDays.ONE_DAY,
}),
}),
command: ['start'],
healthCheck: {
command: ['CMD-SHELL', `curl -f http://[::]:${handlerPort}/health || exit 1`],
interval: cdk.Duration.seconds(10),
timeout: cdk.Duration.seconds(3),
retries: 3,
startPeriod: cdk.Duration.seconds(3),
},
});
const service = new ecs.FargateService(this, 'Default', {
cluster: props.cluster,
taskDefinition,
desiredCount: props.desiredCount,
capacityProviderStrategies: [{ capacityProvider: 'FARGATE_SPOT', weight: 1 }],
enableExecuteCommand: true,
minHealthyPercent: 0,
assignPublicIp: true,
vpcSubnets: { subnetType: ec2.SubnetType.PUBLIC },
serviceConnectConfiguration: {
services: [{ portMappingName: 'handler' }],
},
});
this.connections = service.connections;
assert(props.cluster.defaultCloudMapNamespace, 'Default Cloud Map namespace not set on cluster');
this.endpoint = `handler.${props.cluster.defaultCloudMapNamespace?.namespaceName}:${handlerPort}`;
}
}
Reduction Stack - Connecting the Constructs
The ReductionStack
is the deployed CloudFormation stack that composes our
three services and creates our data dependencies (S3 Bucket, Kinesis Stream).
This stack creates:
- An S3 bucket that Reduction uses for persistent storage.
- A Kinesis stream used as a source input for a stream of words to count.
- The ECS cluster to deploy the services into.
- The shared security group for the Reduction Job Manager and Worker services.
- A custom resource that puts words on the Kinesis stream after deploying so that we can see the job process Kinesis records.
Reduction Stack
import * as cdk from 'aws-cdk-lib';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as s3_assets from 'aws-cdk-lib/aws-s3-assets';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as ecr_assets from 'aws-cdk-lib/aws-ecr-assets';
import { Construct } from 'constructs';
import { JobManagerService } from './constructs/job-manager-service';
import { WorkerService } from './constructs/worker-service';
import { HandlerService } from './constructs/handler-service';
import * as servicediscovery from 'aws-cdk-lib/aws-servicediscovery';
interface ReductionStackProps extends cdk.StackProps {
jobConfigPath: string;
handlerDockerDir: string;
reductionDockerDir: string;
workerCount: number;
}
export class ReductionStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: ReductionStackProps) {
super(scope, id, props);
// An s3 asset for the job.json config
const jobConfigAsset = new s3_assets.Asset(this, 'JobConfig', {
path: props.jobConfigPath,
});
// A bucket to use for Reduction's working storage
const bucket = new s3.Bucket(this, 'Storage', {
autoDeleteObjects: true,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
// A Kinesis stream to use for demo input
const sourceStream = new kinesis.Stream(this, 'WordCountStream', {
streamName: 'WordCountStream',
});
// The ECS cluster
const cluster = new ecs.Cluster(this, 'Cluster', {
vpc: ec2.Vpc.fromLookup(this, 'DefaultVPC', { isDefault: true }),
containerInsightsV2: ecs.ContainerInsights.ENABLED,
defaultCloudMapNamespace: {
name: 'reduction.local',
type: servicediscovery.NamespaceType.DNS_PRIVATE,
useForServiceConnect: true,
}
});
// A shared security group for Reduction services
const rxnSecurityGroup = new ec2.SecurityGroup(this, 'ReductionSecurityGroup', {
vpc: cluster.vpc,
description: 'Security group for Reduction services',
});
rxnSecurityGroup.addIngressRule(rxnSecurityGroup, ec2.Port.allTraffic(), 'Allow all communication between Reduction services');
// The user-defined handler service
const handlerService = new HandlerService(this, 'Handler', {
cluster,
handlerImage: new ecr_assets.DockerImageAsset(this, 'HandlerImage', {
directory: props.handlerDockerDir,
buildArgs: { TARGET_ARCH: 'arm64' },
}),
desiredCount: 1,
});
// Create Reduction image asset to be used by both services
const reductionImage = new ecr_assets.DockerImageAsset(this, 'ReductionImage', {
directory: props.reductionDockerDir,
buildArgs: { TARGET_ARCH: 'arm64' },
});
// The Reduction job manager service
const jobManagerService = new JobManagerService(this, 'JobManager', {
cluster,
bucket,
sourceStream,
jobConfigAsset,
reductionImage,
workerCount: props.workerCount,
securityGroup: rxnSecurityGroup,
});
// The Reduction worker service that calls the handler
const workerService = new WorkerService(this, 'Worker', {
cluster,
securityGroup: rxnSecurityGroup,
reductionImage,
handlerEndpoint: handlerService.endpoint,
jobManagerEndpoint: jobManagerService.endpoint,
workerCount: props.workerCount,
});
sourceStream.grantRead(workerService);
// ServiceConnect requires dependency ordering between services
workerService.node.addDependency(jobManagerService);
// Allow the workers to call the handler service
handlerService.connections.allowFrom(workerService, ec2.Port.allTcp());
// Export the stream name to use in the record sending script
new cdk.CfnOutput(this, 'SourceStreamName', {
value: sourceStream.streamName,
description: 'Name of the Kinesis stream for word count demo',
});
}
}
Deploying the Stack
To deploy this stack make sure you are in the cdk
directory and run:
bun cdk deploy
This example has a TypeScript script for sending records to the source Kinesis stream.
scripts/send-records.ts
import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
import {
CloudFormationClient,
DescribeStacksCommand,
} from "@aws-sdk/client-cloudformation";
async function sendRecords(streamName: string, records: string[]) {
console.log(`Sending ${records.length} records to stream: ${streamName}`);
const client = new KinesisClient();
for (const record of records) {
const command = new PutRecordCommand({
StreamName: streamName,
Data: Buffer.from(record),
PartitionKey: "1",
});
await client.send(command);
console.log(`Sent record: ${record}`);
}
}
async function getStackOutputValue(stackName: string, key: string): Promise<string> {
const cfnClient = new CloudFormationClient();
const command = new DescribeStacksCommand({ StackName: stackName });
const response = await cfnClient.send(command);
const output = response.Stacks?.[0]?.Outputs?.find(
(o) => o.OutputKey === key
)?.OutputValue;
if (!output) {
throw new Error(`Output ${key} not found in stack ${stackName}`);
}
return output;
}
// Example records - you can modify these or pass them as command line arguments
const records = [
`Whose woods these are I think I know.`,
`His house is in the village though;`,
`He will not see me stopping here`,
`To watch his woods fill up with snow.`,
`My little horse must think it queer`,
`To stop without a farmhouse near`,
`Between the woods and frozen lake`,
`The darkest evening of the year.`,
`He gives his harness bells a shake`,
`To ask if there is some mistake.`,
`The only other sound's the sweep`,
`Of easy wind and downy flake.`,
`The woods are lovely, dark and deep,`,
`But I have promises to keep,`,
`And miles to go before I sleep,`,
`And miles to go before I sleep.`,
];
const streamName = await getStackOutputValue(
"ReductionWordCountDemo",
"SourceStreamName"
);
await sendRecords(streamName, records);
You can also use the aws cli to send some records:
aws kinesis put-record \
--stream-name WordCountStream \
--partition-key "1" \
--cli-binary-format raw-in-base64-out \
--data "here are some words for our job to process"
Then you can read the log events from the Worker
log group to see the results:
> aws logs tail Worker
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 flake: 3
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 keep: 3
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 go: 5
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 village: 3
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 my: 3
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 there: 3
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 only: 3
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 sweep: 3
2025-04-10T17:34:47.563000+00:00 Worker/Container/ede9438b6cd7458baffef42edaea8990 but: 3
When you're finished with your stack you can run destroy
to remove all of the
resources.
bun cdk destroy