State (DKV)
DKV (Distributed Key Value) is Reduction's embedded Log Structured Merge Tree (LSM) database. It provides state management for Reduction jobs through a storage engine designed specifically for cloud-native stream processing.
Key Concepts
- DKV stores state records and timers defined by Reduction's State Specs
- Uses object storage (like S3) for persistence
- Optimized for Reduction's distributed checkpointing mechanism
- Provides efficient read/write operations through tiered storage
The Write Ahead Log
LSM databases are write-optimized which means that they sacrifice some read performance for faster writes. This is a good trade-off for many cases because reading is easier to scale than writing. It's not a good fit for publishing blog posts (one write for many reads) but it's great for processing event streams.
Appending to a log is a simple way to achieve write performance and that's how DKV and most LSM databases start.
Let's say we want to count page visits for a user. We can append each event to a
wal.txt
file:
$ echo '{ "name": "Suzy" }' >> wal.txt
$ echo '{ "name": "Oscar" }' >> wal.txt
$ echo '{ "name": "Tim" }' >> wal.txt
$ echo '{ "name": "Suzy" }' >> wal.txt
This is called a Write Ahead Log (WAL) because it contains writes that happen before writing to a more permanent database structure, but it can be helpful to think of the WAL as the foundation for DKV and the other structures as layers of optimizations on top of the log.
Each entry in the DKV WAL contains:
- A sequence number (
uint64
) - The variable length key
- A tombstone marker byte (deletes need to be logged too!)
- The variable length value (excluded for a delete)
DKV uses cloud storage as the primary data store instead of local disks. This approach separates a job's compute from its storage and eliminates the need to scale clusters for data capacity but it also makes writing each record directly to the WAL infeasible due to per-request latency and cost. Instead, DKV buffers the WAL and flushes it during the asynchronous phase of checkpointing which is a natural fit for Reduction's distributed checkpointing system.
Although writing to this log is fast, reading from it is , getting slower the larger the log gets. For faster reads we need a different in-memory data structure.
MemTables
The term memTable comes from LevelDB (and probably from somewhere before that) and describes the in-memory representation of recently written data. In most LSM databases, memTables use Skip Lists to store data. DKV uses Zip Trees which have the same performance characteristics but are easier to understand. Zip Trees are binary search trees organized according to a randomly assigned "rank" value. Writing and reading are both operations.
Once records are stored in the WAL they are "written through" to a MemTable for better querying.
From a querying perspective, MemTables are like sorted maps. Records can be read efficiently in ascending order which allows grouping similar keys together. Using this property, Reduction can store state maps as multiple entries in DKV with a shared key prefix.
SSTables
Once the current MemTable reaches its size limit, DKV rotates the table, making it immutable (a "sealed" memTable) and then flushing the table to persistent storage.
Rotating memTables has a synchronous and asynchronous phase. As soon as the memTable is full, DKV performs a quick operation to instantiate a blank memTable for writing. Then, in a background task, the sealed memTables are flushed to long-term storage.
Synchronous Phase
- Create a new memtable and seal the previous one
- Create a new write buffer in the WAL
Asynchronous Phase
- Write sealed memTables as SSTables to storage
- Create a new immutable copy of
LevelList
object with the new SSTables - Truncate the WAL up to the latest sequence number in the
LevelList
The serialized format of a memTable is called a "Sorted String Table" (SST or SSTable). In DKV each SST file has an entries segment (the key-value records) and a metadata segment mostly populated with indexes (a Bloom filter and a sparse search index). The entries remain lexicographically sorted in ascending order.
Because DKV targets distributed object storage instead of local disk storage its file format differs from most block-based LSM formats that pad and align records to 4KB blocks. DKV's format yields smaller decompressed file sizes but requires the addition of a sparse binary search index for efficient searching.
Reading
So far we've covered the prioritized write path (appending to in-memory WAL followed by a write to a memTable) and the data's journey to a hard disk in the cloud. Efficient reading is more involved. Once a memTable is sealed, its data is never modified which means that records for a key like "user-1234" could be in a memTable, a sealed memTable, and several SST files.
DKV implements two read patterns: Get
and ScanPrefix
.
Get
Get
first looks for a single record by key in this order:
- The active memTable
- The sealed memTables
- SSTables starting with newer tables
Because Get
looks for a key in the most recent data first, it can stop as soon
as it finds the key even though there may be older records it hasn't seen.
MemTable Get
queries are inherently fast (~0.001 ms/op) but searching through
SSTables is slower. There are three strategies to make querying SSTables faster:
- Tiered SST organization allows newer tables to be checked before older tables.
- A Bloom filter provides checks to see if a table might contain the key before searching through it.
- A sparse binary search index allows lookup within a file.
ScanPrefix
ScanPrefix
finds all records whose keys match the given prefix. Unlike Get
queries, a scan can't stop as soon as it finds a record—it needs to query all
possible locations for a key. To do this quickly ScanPrefix
performs parallel
searches for the key prefix and merges the results together. Again, reading from
memTables is fast and keeping SST queries performant is the challenge.
Like Get
queries, ScanPrefix
relies on the tiered organization of SSTables
levels where each level (except the first) contains a continuous ascending list
of keys. This structure limits the number of parallel queries needed to find all
matching keys. ScanPrefix
also uses each table's binary search index to find
the first matching key quickly. However, ScanPrefix
cannot use the Bloom
filters. Because ScanPrefix
queries can't stop until they've searched through
every level of tables and because they can't use the table's Bloom filters,
these queries are more sensitive to the total amount of stored data than Get
queries.
In order to reduce the amount of obsolete data and to keep similar keys close together we need a process to continually organize and clean our data.
Compaction
DKV organizes tables into tiers where each tier holds an ascending list of keys split across the tables. The RocksDB project calls this organization in each level a "sorted run". The first tier (level 0) is an exception because it contains newly flushed SSTables that haven't been reprocessed to create the sorted list of keys across tables.
Compaction uses tables from a higher level (starting with 0) as inputs to create new tables in a lower level. Compaction in DKV creates uniformly sized tables but allows more tables to accumulate in lower tiers.
Level 0: [j-w] [a-r] [n-z] (newest tables with overlapping key ranges)
↓ ↓ ↓
Level 1: [a-m] [n-z] (tables sorted by key range)
↓ ↓
Level 2: [a-z]
↓
Level 3: [a-o] [p-z]
↓ ↓
Level 4: [a-f] [g-m] [o-s] [t-z]
↓ ↓ ↓ ↓
Level 5: [a-h] [i-p] [q-z] (oldest tables)
Within these sorted runs, compaction also removes superseded data. As an example consider this list of sequential operations in a WAL:
key-a PUT value-1
key-b PUT value-1
key-a DELETE
key-b PUT value-2
To query this data we only need to retain:
key-b PUT value-2
Our WAL records contain the full history of put and delete operations and are deleted once DKV persists all of their data to SSTables. The memTables are naturally compacted as their Zip Trees are mutated and flushed to SSTables. And then the compaction process continually organizes data within sorted runs spanning SSTables.
DKV uses two separate compaction triggers for SSTables:
- Reaching a configured number of recent, tier 0 tables begins a process to move data to larger data tiers while maintaining a target data size at each tier.
- Reaching a threshold for size amplification (an estimated ratio of redundant data) pulls tables into the final tier of tables.
Checkpointing and Recovery
DKV's recovery approach is focused on the specific use case of distributed checkpointing backed by cloud storage. The Reduction job manager and worker nodes start a checkpoint by requesting a new checkpoint from each DKV instance for a shared ID. DKV instances respond with a URI to a checkpoint manifest once their checkpoint is complete.
Just as there are two phases to memTable rotation, checkpointing has a synchronous and asynchronous phase. The synchronous phase is instant so that database operations can continue uninterrupted while the IO work happens in the background.
Synchronous Phase
- Rotate the WAL, creating a new WAL object and sealing the old
- Hold a reference to current immutable
LevelList
object of SSTables - Create a checkpoint object with the WAL and
LevelList
.
Asynchronous Phase
- Save the WAL to storage
- Save the checkpoint manifest file to storage
Probably the most interesting aspect of DKV is the minimal data movement required for checkpoints and recovery. Because files are already in S3, creating a checkpoint at a specific sequence number is a matter of flushing any data not in an SST yet (the WAL) and recording metadata about the active tables at that point in time. This is in contrast to systems where SSTables are copied from local nodes to distributed storage when checkpointing and then SSTables are copied from distributed storage to worker machines during start up.
DKV's approach of keeping SST files in-place for checkpointing and recovery adds complexity for removing outdated SST files that are no longer needed. After scaling-in (decreasing the number of DKV instances) each DKV instance may share an SST file with other DKV instances. To allow safe deletion of SST files, DKV instances consult a data ownership policy to determine if they have complete ownership over a table or, if not, whether they need to query other DKV instances to see if they are still using the file.
State Specs and DKV
The State Specs in the Reduction job configurations define how DKV will be updated and queried. Consider this state spec for a Reduction Job:
- Go
- TypeScript
topology.NewMapSpec(op, "state-id", rxn.ScalarMapCodec[string, string]{})
new topology.MapSpec<string, string>(op, "state-id", new MapCodec({
keyCodec: utf8StringValueCodec,
valueCodec: utf8StringValueCodec,
}));
For an onEvent
call with a KeyedEvent
keyed by "user-123"
, DKV will create
a ScanPrefix
query with a composite key.
All examples here use a pseudo string representation of keys and values but know
that DKV stores arbitrarily-encoded binary data. The dkvBD
operations are also
pseudocode for explanation.
dkvDB.scanPrefix("<key-group>|<schema-byte>|user-123|state-id");
// returned entries:
// 256|0|user-123|state-id|map-key-1|map-value-1
// 256|0|user-123|state-id|map-key-2|map-value-2
// ...
Then when you set a value (new-value
) for a key map-key-1
in the map, DKV
will translate that operation to a put
call:
dkvDB.put("<key-group>|<schema-byte>|user-123|state-id|map-key-1", "new-value");
You can see that DKV is agnostic about what keys and values it stores. The
<key-group>
and <schema-byte>
fields are added by the Reduction operators
and after that, the data is encoded and decoded in the SDKs and the job author's
code.
Timers and DKV
Timers are also stored in DKV but with a different schema-byte
than other
state keys. Unlike other state values, timers must be read in order across
different subject keys. Timers have no values associated with their keys.
A query for the next set of expired timers looks like this:
const expiredTimers = [];
for (const [key, _] of dkvDB.scanPrefix("<key-group>|<schema-byte>")) {
if (timestampOf(key) < currentWatermark) {
expiredTimers.push(key);
} else {
break;
}
}
// expiredTimer keys:
// 256|0|2025-01-01T00:00:00|user-123
// 256|0|2025-01-01T00:00:01|user-456
// 256|0|2025-01-01T00:00:02|user-789
You'll notice that this query only searched within one <key-group>
. To return
timers across all subject keys and key groups, Reduction Operators manage
parallel scans for each of their managed key groups. This is one of the reasons
that Reduction organizes the unbounded space of possible subject keys into a
finite set of key groups.
Setting a timer for a given subject key results in a put
operation for DKV:
dkvDB.put("<key-group>|<schema-byte>|<timestamp>|user-123", "");