Skip to content

aggregator-toyLive aggregations for in-memory data

Build pipelines that group, aggregate, and transform live data feeds with incremental updates and immutable state, tuned for dashboards, leaderboards, and streaming UIs.

Quick Example: Vote Leaderboard

Imagine a stream of votes. Each record has a voter, a candidate, and a number of points. Voters can change their mind; only their latest vote should count. You want a leaderboard of candidateId → totalPoints that updates as new votes arrive, without recomputing from scratch.

typescript
import { createPipeline } from "aggregator-toy";

interface Vote {
  voterId: string;
  candidateId: string;
  points: number;
  timestamp: number;
}

const builder = createPipeline<Vote>()
  // Each (voterId, candidateId) gets its own group
  .groupBy(["voterId", "candidateId"], "votes")
  // Keep only the latest vote by timestamp
  .in("votes").pickByMax("items", "timestamp", "latestVote")
  // Regroup by candidate
  .groupBy(["candidateId"], "byCandidate")
  // Sum the points from the latest vote for each voter
  .in("byCandidate").sum("items", "points", "totalPoints");

// Build with state management
const typeDescriptor = builder.getTypeDescriptor();
let state = [];
const pipeline = builder.build(s => { state = s(state); }, typeDescriptor);

// Feed votes
pipeline.add("1", { voterId: "A", candidateId: "X", points: 3, timestamp: 1 });
pipeline.add("2", { voterId: "B", candidateId: "X", points: 5, timestamp: 1 });
pipeline.add("3", { voterId: "A", candidateId: "X", points: 8, timestamp: 2 });
pipeline.add("4", { voterId: "A", candidateId: "Y", points: 4, timestamp: 3 });

// state.byCandidate now contains:
// [
//   { key: "X", value: { candidateId: "X", totalPoints: 13 } },
//   { key: "Y", value: { candidateId: "Y", totalPoints: 4 } }
// ]

The pipeline keeps per-group state and only touches the groups affected by each new vote. There is no global recompute, and the state tree is always ready for your UI to render.

Released under the MIT License.