Skip to content

aggregator-toy vs RxJS

RxJS is a library for reactive programming using Observables. It provides a powerful set of operators for composing asynchronous and event-based programs.

Key Differences

Aspectaggregator-toyRxJS
Primary focusGrouped aggregationsGeneral reactive streams
State managementBuilt-in state treeManual via scan/reduce
GroupingFirst-class .groupBy()groupBy() operator
Incremental updatesAutomaticManual implementation
Learning curveLower for aggregationsHigher, more concepts

The Vote Leaderboard in RxJS

typescript
import { Subject, scan, map, groupBy, mergeMap } from 'rxjs';

interface Vote {
  id: string;
  voterId: string;
  candidateId: string;
  points: number;
  timestamp: number;
}

const votes$ = new Subject<{ type: 'add' | 'remove'; vote: Vote }>();

const leaderboard$ = votes$.pipe(
  scan((state, action) => {
    if (action.type === 'add') {
      return new Map(state).set(action.vote.id, action.vote);
    } else {
      const newState = new Map(state);
      newState.delete(action.vote.id);
      return newState;
    }
  }, new Map<string, Vote>()),
  map(votes => {
    // Group by voter+candidate, keep latest
    const byVoterCandidate = new Map<string, Vote>();
    for (const vote of votes.values()) {
      const key = `${vote.voterId}|${vote.candidateId}`;
      const existing = byVoterCandidate.get(key);
      if (!existing || vote.timestamp > existing.timestamp) {
        byVoterCandidate.set(key, vote);
      }
    }
    
    // Group by candidate, sum points
    const byCandidate = new Map<string, number>();
    for (const vote of byVoterCandidate.values()) {
      const current = byCandidate.get(vote.candidateId) ?? 0;
      byCandidate.set(vote.candidateId, current + vote.points);
    }
    
    return Array.from(byCandidate.entries()).map(([id, points]) => ({
      candidateId: id,
      totalPoints: points
    }));
  })
);

// Subscribe to updates
leaderboard$.subscribe(console.log);

// Feed votes
votes$.next({ type: 'add', vote: { id: '1', voterId: 'A', candidateId: 'X', points: 3, timestamp: 1 }});

The Same in aggregator-toy

typescript
import { createPipeline } from "aggregator-toy";

const builder = createPipeline<Vote>()
  .groupBy(["voterId", "candidateId"], "votes")
  .in("votes").pickByMax("items", "timestamp", "latestVote")
  .groupBy(["candidateId"], "byCandidate")
  .in("byCandidate").sum("items", "points", "totalPoints");

let state = [];
const pipeline = builder.build(s => { state = s(state); }, builder.getTypeDescriptor());

pipeline.add("1", { voterId: "A", candidateId: "X", points: 3, timestamp: 1 });

When to Use RxJS

  • Complex async flows - Combining multiple data sources, handling errors, retries
  • Event-based programming - DOM events, WebSocket messages
  • Cancellation and backpressure - Managing async operations
  • You're already using RxJS - Consistency with existing codebase

When to Use aggregator-toy

  • Grouped aggregations - The primary use case
  • Real-time leaderboards/dashboards - Optimized for this pattern
  • Simpler mental model - Declare the pipeline, get the state
  • Less boilerplate - Grouping and aggregation built-in

Using Together

You can use both libraries together:

typescript
import { fromEvent } from 'rxjs';
import { createPipeline } from 'aggregator-toy';

// Create the aggregation pipeline
const builder = createPipeline<Vote>()
  .groupBy(["candidateId"], "byCandidate")
  .sum("byCandidate", "points", "totalPoints");

let state = [];
const pipeline = builder.build(s => { state = s(state); }, builder.getTypeDescriptor());

// Feed from an RxJS observable
websocket$.subscribe(vote => {
  pipeline.add(vote.id, vote);
});

RxJS handles the stream; aggregator-toy handles the aggregation.

Released under the MIT License.