Incremental Updates
Each step updates only the affected aggregates and forwards changes downstream. No global recompute needed.
Build pipelines that group, aggregate, and transform live data feeds with incremental updates and immutable state, tuned for dashboards, leaderboards, and streaming UIs.
Imagine a stream of votes. Each record has a voter, a candidate, and a number of points. Voters can change their mind; only their latest vote should count. You want a leaderboard of candidateId → totalPoints that updates as new votes arrive, without recomputing from scratch.
import { createPipeline } from "aggregator-toy";
interface Vote {
voterId: string;
candidateId: string;
points: number;
timestamp: number;
}
const builder = createPipeline<Vote>()
// Each (voterId, candidateId) gets its own group
.groupBy(["voterId", "candidateId"], "votes")
// Keep only the latest vote by timestamp
.in("votes").pickByMax("items", "timestamp", "latestVote")
// Regroup by candidate
.groupBy(["candidateId"], "byCandidate")
// Sum the points from the latest vote for each voter
.in("byCandidate").sum("items", "points", "totalPoints");
// Build with state management
const typeDescriptor = builder.getTypeDescriptor();
let state = [];
const pipeline = builder.build(s => { state = s(state); }, typeDescriptor);
// Feed votes
pipeline.add("1", { voterId: "A", candidateId: "X", points: 3, timestamp: 1 });
pipeline.add("2", { voterId: "B", candidateId: "X", points: 5, timestamp: 1 });
pipeline.add("3", { voterId: "A", candidateId: "X", points: 8, timestamp: 2 });
pipeline.add("4", { voterId: "A", candidateId: "Y", points: 4, timestamp: 3 });
// state.byCandidate now contains:
// [
// { key: "X", value: { candidateId: "X", totalPoints: 13 } },
// { key: "Y", value: { candidateId: "Y", totalPoints: 4 } }
// ]The pipeline keeps per-group state and only touches the groups affected by each new vote. There is no global recompute, and the state tree is always ready for your UI to render.