"Source, Tap, Transform: How Data Flows Through AimDB's Rust Dataflow Engine"
Every AimDB application is built from the same three Rust dataflow primitives: .source(), .tap(), and .transform(). Together, they describe how data enters your system, how you observe it, and how you derive new data from existing records — all without writing any glue code.
In this post, we'll walk through each primitive, show real-world patterns, and explain how AimDB uses a dependency graph (DAG) at build time to catch wiring mistakes before your application starts.
The Three Primitives
Think of an AimDB dataflow engine as a set of typed records — named slots that hold streaming values. The three primitives control how values flow in and out of those slots:
| Primitive | Direction | Purpose |
|---|---|---|
.source() | In | Produces values into a record |
.tap() | Out | Observes values from a record |
.transform() | In ← In | Derives a record's values from another record |
Let's look at each one.
.source() — Where Data Enters
A source is the origin of data for a record. It receives a Producer<T> handle and pushes values into the record's buffer:
builder.configure::<Temperature>(TempKey::Berlin, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 100 }) .source(|ctx, producer| async move { let mut interval = ctx.runtime.interval(Duration::from_secs(30)); loop { interval.tick().await; let reading = read_sensor().await; let _ = producer.produce(reading).await; } }); });
Key rules:
- Each record can have at most one
.source()— a record has exactly one origin of truth. - The closure receives a
RuntimeContextand a typedProducer<T, R>. - Sources are typically sensors, API pollers, or protocol adapters.
For MQTT or KNX data arriving over the wire, you can use connector links instead of a manual source — the connector takes care of subscription and deserialization:
reg.link_from("weather/berlin/temperature") .with_deserializer(Temperature::from_bytes) .finish();
Under the hood, a connector link is a source.
.tap() — Observing Without Owning
A tap is a read-only observer attached to any record. Taps subscribe to the record's buffer and receive every value the source (or transform) produces:
reg.tap(|ctx, consumer| async move { let mut reader = consumer.subscribe().unwrap(); while let Ok(value) = reader.recv().await { println!("Got: {:?}", value); } });
Key rules:
- A record can have any number of taps — they're independent consumers.
- Taps never write back to the record they observe.
- Common uses: logging, WebSocket broadcasting, metrics collection, alerting.
Because taps are decoupled from production, you can add or remove observers without affecting the data pipeline.
.transform() — Reactive Derived Records
This is where things get interesting. A transform derives one record's values from another record. AimDB subscribes to the input, calls your function for each new value, and produces the result into the output record's buffer — automatically.
Stateless Map
The simplest transform: convert each input value into zero or one output value.
// Derive Fahrenheit from Celsius — every temp reading produces a conversion builder.configure::<Fahrenheit>(FahrenheitKey::Berlin, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 100 }) .transform::<Temperature, _>(TempKey::Berlin, |t| { t.map(|temp| Some(Fahrenheit { value: temp.celsius * 9.0 / 5.0 + 32.0, timestamp: temp.timestamp, })) }); });
Returning None skips output for that input — useful for filtering.
Stateful Accumulator
Need to maintain state across values? Use .with_state():
// Running average: accumulates readings, emits updated average each time builder.configure::<RunningAverage>(AvgKey::Berlin, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 100 }) .transform::<Temperature, _>(TempKey::Berlin, |t| { t.with_state(AccumulatorState { sum: 0.0, count: 0 }) .on_value(|temp, state| { state.sum += temp.celsius; state.count += 1; Some(RunningAverage { celsius: state.sum / state.count as f32, sample_count: state.count, }) }) }); });
The state is owned by AimDB and persists for the lifetime of the database — no external Arc<Mutex<>> needed.
Transforms vs. Sources — Mutual Exclusion
A record gets its values from either a .source() or a .transform(), never both. This is enforced at configuration time — mixing them will panic with a clear message:
Cannot set transform: record already has a .source() registered
Taps, on the other hand, can always be added alongside either one. Think of it this way:
One writer (source or transform), many readers (taps).
Real-World Example: Forecast Validation
Here's a pattern from our weather mesh demo. Each city receives live temperature readings over MQTT. We want to validate forecast accuracy by comparing predictions against actual readings — reactively, as temperatures arrive.
Before (manual wiring with channels):
// Create a channel to bridge temperature → validation let (temp_tx, temp_rx) = mpsc::unbounded_channel(); // Temperature record: relay each reading through the channel reg.tap(move |_ctx, consumer| relay_temps(consumer, temp_tx)); // Validation record: consume from channel, produce validations reg.source(move |_ctx, producer| { validate_forecasts(producer, temp_rx, trackers, city, tolerance) });
This works, but the channel is invisible to AimDB — it doesn't know that Validation depends on Temperature.
After (transform API):
// Validation record: derived directly from Temperature reg.transform::<Temperature, _>(TempKey::Berlin, |t| { t.with_state(ValidationState::new(trackers, city, tolerance)) .on_value(validate_one) });
One declaration replaces three moving parts. AimDB owns the subscription, manages the state, and — crucially — knows about the dependency.
The Dependency Graph (DAG)
Every .source(), .transform(), and connector link creates edges in a directed acyclic graph (DAG) that AimDB builds at startup. When you call builder.build(), AimDB validates the entire graph before spawning any tasks:
Temperature (Berlin) ──source──→ [MQTT link]
│
└──transform──→ ForecastValidation (Berlin)
│
└──tap──→ [WebSocket broadcast]
What the DAG Catches
Missing inputs — If a transform references a record key that doesn't exist, the build fails immediately:
Error: TransformInputNotFound {
output_key: "validation.berlin",
input_key: "temp.berlin" // ← typo? not registered?
}
Cyclic dependencies — If record A transforms from B, and B transforms from A, the build fails:
Error: CyclicDependency {
records: ["temp.berlin", "validation.berlin"]
}
These checks happen at build time, not at runtime. Your application either starts with a valid dataflow graph, or it doesn't start at all — no silent deadlocks, no mysterious hangs.
Why This Matters
Without a dependency graph, transforms would be opaque closures — you'd need to manually trace which records feed into which. The DAG makes dataflows:
- Visible — Tooling (CLI, MCP, dashboards) can introspect the full topology from embedded edge devices to cloud
- Safe — Cycles and dangling references are caught before the first value flows
- Debuggable — When something goes wrong, you can see exactly which record feeds which
When to Use Which
| Scenario | Primitive |
|---|---|
| Sensor reading, API poll, protocol ingestion | .source() or connector link |
| Logging, metrics, WebSocket streaming | .tap() |
| Unit conversion, filtering, enrichment | .transform() with .map() |
| Running average, deduplication, windowing | .transform() with .with_state() |
| Combining data from multiple records | .transform_join() (std only) |
What's Next
The transform API is the foundation for more advanced patterns we're building:
- Multi-input joins — Combine values from multiple input records with
.transform_join() - Dependency visualization — See the full DAG rendered in the AimDB dashboard
- Replay and backfill — Re-run transforms over historical data
Check out the Core Concepts documentation to get started, or explore the weather mesh demo for a complete working example.