Back to Blog
technicaltutorials

"Source, Tap, Transform: How Data Flows Through AimDB's Rust Dataflow Engine"

February 9, 2026AimDB Team8 min read

Every AimDB application is built from the same three Rust dataflow primitives: .source(), .tap(), and .transform(). Together, they describe how data enters your system, how you observe it, and how you derive new data from existing records — all without writing any glue code.

In this post, we'll walk through each primitive, show real-world patterns, and explain how AimDB uses a dependency graph (DAG) at build time to catch wiring mistakes before your application starts.

The Three Primitives

Think of an AimDB dataflow engine as a set of typed records — named slots that hold streaming values. The three primitives control how values flow in and out of those slots:

PrimitiveDirectionPurpose
.source()InProduces values into a record
.tap()OutObserves values from a record
.transform()In ← InDerives a record's values from another record

Let's look at each one.

.source() — Where Data Enters

A source is the origin of data for a record. It receives a Producer<T> handle and pushes values into the record's buffer:

builder.configure::<Temperature>(TempKey::Berlin, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 100 }) .source(|ctx, producer| async move { let mut interval = ctx.runtime.interval(Duration::from_secs(30)); loop { interval.tick().await; let reading = read_sensor().await; let _ = producer.produce(reading).await; } }); });

Key rules:

  • Each record can have at most one .source() — a record has exactly one origin of truth.
  • The closure receives a RuntimeContext and a typed Producer<T, R>.
  • Sources are typically sensors, API pollers, or protocol adapters.

For MQTT or KNX data arriving over the wire, you can use connector links instead of a manual source — the connector takes care of subscription and deserialization:

reg.link_from("weather/berlin/temperature") .with_deserializer(Temperature::from_bytes) .finish();

Under the hood, a connector link is a source.

.tap() — Observing Without Owning

A tap is a read-only observer attached to any record. Taps subscribe to the record's buffer and receive every value the source (or transform) produces:

reg.tap(|ctx, consumer| async move { let mut reader = consumer.subscribe().unwrap(); while let Ok(value) = reader.recv().await { println!("Got: {:?}", value); } });

Key rules:

  • A record can have any number of taps — they're independent consumers.
  • Taps never write back to the record they observe.
  • Common uses: logging, WebSocket broadcasting, metrics collection, alerting.

Because taps are decoupled from production, you can add or remove observers without affecting the data pipeline.

.transform() — Reactive Derived Records

This is where things get interesting. A transform derives one record's values from another record. AimDB subscribes to the input, calls your function for each new value, and produces the result into the output record's buffer — automatically.

Stateless Map

The simplest transform: convert each input value into zero or one output value.

// Derive Fahrenheit from Celsius — every temp reading produces a conversion builder.configure::<Fahrenheit>(FahrenheitKey::Berlin, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 100 }) .transform::<Temperature, _>(TempKey::Berlin, |t| { t.map(|temp| Some(Fahrenheit { value: temp.celsius * 9.0 / 5.0 + 32.0, timestamp: temp.timestamp, })) }); });

Returning None skips output for that input — useful for filtering.

Stateful Accumulator

Need to maintain state across values? Use .with_state():

// Running average: accumulates readings, emits updated average each time builder.configure::<RunningAverage>(AvgKey::Berlin, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 100 }) .transform::<Temperature, _>(TempKey::Berlin, |t| { t.with_state(AccumulatorState { sum: 0.0, count: 0 }) .on_value(|temp, state| { state.sum += temp.celsius; state.count += 1; Some(RunningAverage { celsius: state.sum / state.count as f32, sample_count: state.count, }) }) }); });

The state is owned by AimDB and persists for the lifetime of the database — no external Arc<Mutex<>> needed.

Transforms vs. Sources — Mutual Exclusion

A record gets its values from either a .source() or a .transform(), never both. This is enforced at configuration time — mixing them will panic with a clear message:

Cannot set transform: record already has a .source() registered

Taps, on the other hand, can always be added alongside either one. Think of it this way:

One writer (source or transform), many readers (taps).

Real-World Example: Forecast Validation

Here's a pattern from our weather mesh demo. Each city receives live temperature readings over MQTT. We want to validate forecast accuracy by comparing predictions against actual readings — reactively, as temperatures arrive.

Before (manual wiring with channels):

// Create a channel to bridge temperature → validation let (temp_tx, temp_rx) = mpsc::unbounded_channel(); // Temperature record: relay each reading through the channel reg.tap(move |_ctx, consumer| relay_temps(consumer, temp_tx)); // Validation record: consume from channel, produce validations reg.source(move |_ctx, producer| { validate_forecasts(producer, temp_rx, trackers, city, tolerance) });

This works, but the channel is invisible to AimDB — it doesn't know that Validation depends on Temperature.

After (transform API):

// Validation record: derived directly from Temperature reg.transform::<Temperature, _>(TempKey::Berlin, |t| { t.with_state(ValidationState::new(trackers, city, tolerance)) .on_value(validate_one) });

One declaration replaces three moving parts. AimDB owns the subscription, manages the state, and — crucially — knows about the dependency.

The Dependency Graph (DAG)

Every .source(), .transform(), and connector link creates edges in a directed acyclic graph (DAG) that AimDB builds at startup. When you call builder.build(), AimDB validates the entire graph before spawning any tasks:

Temperature (Berlin)  ──source──→  [MQTT link]
       │
       └──transform──→  ForecastValidation (Berlin)
                              │
                              └──tap──→  [WebSocket broadcast]

What the DAG Catches

Missing inputs — If a transform references a record key that doesn't exist, the build fails immediately:

Error: TransformInputNotFound {
    output_key: "validation.berlin",
    input_key: "temp.berlin"   // ← typo? not registered?
}

Cyclic dependencies — If record A transforms from B, and B transforms from A, the build fails:

Error: CyclicDependency {
    records: ["temp.berlin", "validation.berlin"]
}

These checks happen at build time, not at runtime. Your application either starts with a valid dataflow graph, or it doesn't start at all — no silent deadlocks, no mysterious hangs.

Why This Matters

Without a dependency graph, transforms would be opaque closures — you'd need to manually trace which records feed into which. The DAG makes dataflows:

  • Visible — Tooling (CLI, MCP, dashboards) can introspect the full topology from embedded edge devices to cloud
  • Safe — Cycles and dangling references are caught before the first value flows
  • Debuggable — When something goes wrong, you can see exactly which record feeds which

When to Use Which

ScenarioPrimitive
Sensor reading, API poll, protocol ingestion.source() or connector link
Logging, metrics, WebSocket streaming.tap()
Unit conversion, filtering, enrichment.transform() with .map()
Running average, deduplication, windowing.transform() with .with_state()
Combining data from multiple records.transform_join() (std only)

What's Next

The transform API is the foundation for more advanced patterns we're building:

  • Multi-input joins — Combine values from multiple input records with .transform_join()
  • Dependency visualization — See the full DAG rendered in the AimDB dashboard
  • Replay and backfill — Re-run transforms over historical data

Check out the Core Concepts documentation to get started, or explore the weather mesh demo for a complete working example.