"Two Ways In: AimDB's Sync and Async APIs"
AimDB's core is async. Every producer, consumer and connector runs as a lightweight task on an async runtime — Tokio on the edge, Embassy on an MCU. But not every caller lives in an async world. Legacy codebases, FFI boundaries, test harnesses and simple scripts all expect blocking calls. Forcing #[tokio::main] onto those callers would be the wrong trade-off.
That's why AimDB ships two first-class entry points into the same engine: an async API for reactive, event-driven systems and a sync API that wraps the async core with blocking channels. Same type safety. Same buffer semantics. Different calling conventions.
The Async API: .build() and .run()
If you own the async runtime, the async API is the natural fit. You configure records with .source(), .tap(), and .link_*(), then hand control to the engine:
#[tokio::main] async fn main() -> DbResult<()> { let runtime = Arc::new(TokioAdapter::new()?); let mut builder = AimDbBuilder::new().runtime(runtime); builder.configure::<Temperature>(SensorKey::TempIndoor, |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 10 }) .source(indoor_temp_producer) // async fn .tap(temperature_logger); // async fn }); builder.run().await }
Producers and consumers are plain async functions. They receive a RuntimeContext for timers and logging and a typed Producer<T> or Consumer<T> handle:
async fn indoor_temp_producer( ctx: RuntimeContext<TokioAdapter>, producer: Producer<Temperature, TokioAdapter>, ) { let time = ctx.time(); loop { let temp = read_sensor(); producer.produce(temp).await.ok(); time.sleep(time.secs(2)).await; } }
Everything is wired at build time. Connectors like MQTT plug in with .link_to() and .link_from(), serializers are typed and the whole graph is validated before the first message flows.
The Sync API: .attach() and .detach()
The sync API targets callers that can't or don't want to run inside an async context. Instead of .build(), you call .attach() — which spawns a dedicated runtime thread behind the scenes and hands back a blocking SyncHandle:
fn main() -> Result<(), Box<dyn std::error::Error>> { let adapter = Arc::new(TokioAdapter); let mut builder = AimDbBuilder::new().runtime(adapter); builder.configure::<Temperature>("sensor.temperature", |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 10 }) .tap(|_ctx, consumer| async move { let mut reader = consumer.subscribe().unwrap(); while let Ok(temp) = reader.recv().await { println!("Received: {:.1}°C", temp.celsius); } }); }); let handle = builder.attach()?; // Spawns runtime thread // Everything below is pure sync
From here, producers and consumers are blocking and thread-safe:
let producer = handle.producer::<Temperature>("sensor.temperature")?; let consumer = handle.consumer::<Temperature>("sensor.temperature")?; // Blocking set producer.set(Temperature { celsius: 21.5, timestamp_ms: 0 })?; // Three read strategies let val = consumer.get()?; // block forever let val = consumer.try_get(); // non-blocking let val = consumer.get_with_timeout(Duration::from_secs(1)); // bounded wait handle.detach()?; // Clean shutdown Ok(()) }
The SyncHandle is Send + Sync, so you can hand producers and consumers to std::thread::spawn and build multi-threaded pipelines with no async contamination in the caller.
Under the Hood
The sync wrapper doesn't bypass the async engine — it bridges into it. A hybrid channel strategy keeps things efficient:
| Direction | Channel | Why |
|---|---|---|
| Caller -> Engine | Tokio MPSC | Producer sends must wake the async runtime |
| Engine -> Caller | std::sync::mpsc | Consumer reads block on a standard condvar |
Each record type gets its own spawned task inside the runtime thread, so backpressure on one record doesn't stall another. Channel capacity is configurable per record for high-frequency data.
When to Use Which
| Scenario | API |
|---|---|
| Event-driven service, already on Tokio | Async (.build() / .run()) |
| Embedded MCU with Embassy | Async (.build() / .run()) |
| C/C++ FFI calling into Rust | Sync (.attach()) |
| Legacy codebase, no async runtime | Sync (.attach()) |
| Quick script or CLI tool | Sync (.attach()) |
| Test harness with blocking assertions | Sync (.attach()) |
The key insight: both APIs configure the same dataflow graph. A .tap() registered during configure runs identically whether the outer caller used .attach() or .build(). The engine doesn't know or care — it just runs async tasks either way.
Try It
The open-source repo ships working demos for both paths:
- Async:
examples/tokio-mqtt-connector-demo— multi-sensor MQTT pipeline with typed keys and connectors - Sync:
examples/sync-api-demo— multi-threaded producer-consumer with blocking reads and clean shutdown
Both compile against the same aimdb-core. Same types, same buffers, same compile-time safety — just a different front door.