"Reactive Pipelines: The Engine of Data-First Architecture"
The Data Graph Is Just a Map
In "The Next Era of Software Architecture Is Data-First", we argued that the data model is the architecture. Records, buffers and producer/consumer relationships form a graph.
But declaring a graph is not the same as running one. A struct CheckoutEvent and a BufferCfg::SpmcRing describe potential. They don't move bytes. They don't react to anything. They don't do anything until something drives them.
That something is the foundation of AimDB: reactive pipelines.
Imperative Pipelines Pull. Reactive Pipelines React.
Most data pipelines today are imperative. A scheduler wakes up, a worker fetches inputs, a transformer runs, an output is written. ETL jobs, cron tasks, request handlers — all variations of the same shape: code is in charge and data is fetched on demand.
This is the same code-first inversion we called out before, just shifted down to the pipeline layer. The data is passive. The code does the asking.
A reactive pipeline flips that:
Data arrives. Code reacts. Outputs propagate.
Nothing polls. Nothing schedules. The presence of a new record is the trigger. Every downstream computation is a function of upstream change. The graph stops being a diagram of what could happen, it becomes the flow of execution.
What Makes a Pipeline Reactive in AimDB
A reactive pipeline in AimDB is built from three operators acting on typed buffers:
- Source — a producer that pushes new records into the graph. A sensor reading, a checkout click, an inbound MQTT message. Sources are the only nodes without inputs.
- Transform — a function from one record type to another, validated against the type graph at compile time. A transform fires when its input buffer receives a new value — not when something asks for output.
- Tap — a side-effecting consumer. Logging, alerting, persistence, network publication. Taps observe the graph without re-injecting into it.
Three operators, three buffer primitives, one runtime. That's the entire surface area.
The Inversion in Code
Here's a small pipeline that shows the shift. Each incoming checkout event triggers a transform and events whose duration_ms exceeds the threshold produce alerts into a separate buffer for downstream consumers.
#[derive(Clone, Debug)] struct CheckoutEvent { step: String, duration_ms: u64, } #[derive(Clone, Debug)] struct CheckoutAlert { step: String, duration_ms: u64, severity: &'static str, } // Source — emits events as they happen, no caller asks for them async fn checkout_source<R: Runtime>(ctx: RuntimeContext<R>, p: Producer<CheckoutEvent, R>) { while let Some(event) = next_checkout_event().await { p.produce(event).await.ok(); } } // Transform — reacts to every CheckoutEvent, emits an alert when it matters fn to_alert(e: &CheckoutEvent) -> Option<CheckoutAlert> { (e.duration_ms > 2_000).then(|| CheckoutAlert { step: e.step.clone(), duration_ms: e.duration_ms, severity: if e.duration_ms > 10_000 { "critical" } else { "warn" }, }) } // Tap — observes the alert stream async fn alert_tap<R: Runtime>(ctx: RuntimeContext<R>, c: Consumer<CheckoutAlert, R>) { let mut r = c.subscribe().unwrap(); while let Ok(alert) = r.recv().await { ctx.log().warn(&format!( "[{}] {} took {}ms", alert.severity, alert.step, alert.duration_ms )); } } // Wiring — declare the graph, runtime drives it builder.configure::<CheckoutEvent>(CheckoutKey::Event, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 1024 }) .source(checkout_source) .finish(); }); builder.configure::<CheckoutAlert>(AlertKey::Checkout, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 256 }) .transform::<CheckoutEvent, _>(CheckoutKey::Event, |t| t.map(to_alert)) .tap(alert_tap) .finish(); });
Notice what's missing: no scheduler, no orchestration loop, no run_pipeline_every(Duration::from_secs(n)). The wiring is the program. The runtime drives the graph by pushing change through it.
Why Reactive Is the Right Primitive for Data-First
If the data graph is the architecture, then the system needs an execution model where data drives the code, not the other way around. Reactive pipelines are that model. Three properties follow without effort:
Causality is explicit. Every output exists because of an input, captured in a typed transform. Trace a record back through its transforms and you have the full causal chain — no separate tracing layer required.
Backpressure is structural. Buffers are bounded. When a consumer falls behind, the buffer's policy — overwrite, drop, block — is part of the schema. There is no separate flow-control layer to forget about, mis-tune or bypass.
Distribution is incidental. A pipeline that runs in-process on an MCU runs the same way when one of its edges crosses an MQTT connector to a cloud service. The reactive boundary is the buffer; whether it's wired to a ring in shared memory or a topic on a broker, the source/transform/tap nodes don't know and don't care.
The Pipeline Closes the Loop
Data-first architecture says: describe the records and the system is the graph they form. Reactive pipelines say: data in motion executes it.
Together they close the loop. The schema stops being a hint. The DAG stops being documentation. The graph runs because every node is a function of the records flowing through it and every record exists because a reactive pipeline put it there.
Define your data. Wire the pipeline. The system is what flows.
Get Involved
AimDB ships reactive source/transform/tap pipelines today across Tokio, Embassy and WASM, with MQTT, WebSocket and KNX connectors out of the box. The same operators, the same buffers, the same record types from MCU to cloud to browser.
Star AimDB on GitHub · Live demo · Docs
Graphics and spell checks in this post were created with the help of an LLM.