"Long-Term Record History: Persistence Comes to AimDB"
AimDB has always been built around live data: sensors flow in from MCU and edge devices, transforms derive new values, taps fan out to consumers — all in memory, all in real time. But real-world applications need more than the latest value. You want to ask "what was the accuracy in Vienna over the last 24 hours?" or "give me the 10 most recent temperature readings per city." That's what long-term record history adds.
In this post we'll walk through the new persistence layer — how to enable it, what it stores, how to query it, and why it's designed the way it is.
The Problem with Bolt-On Persistence
The obvious approach is to add a .tap() to every record you care about and write to a database from there. That works, but it means you're managing:
- Multiple database connections scattered across tap closures
- Retention logic written by hand for each record type
- Query APIs that live outside AimDB and don't know about record patterns or wildcards
- Serialization code duplicated between the tap and whatever reads from the store
We wanted persistence to feel like a first-class feature, not glue code bolted onto the side.
One Trait, Any Backend
The persistence layer is split across two crates:
aimdb-persistence— the traits and extension methods that integrate with the builderaimdb-persistence-sqlite— a concrete SQLite implementation (bundled, no system SQLite required)
The backend trait is minimal by design:
pub trait PersistenceBackend: Send + Sync { fn store<'a>(&'a self, record_name: &'a str, value: &'a Value, timestamp: u64) -> BoxFuture<'a, Result<(), PersistenceError>>; fn query<'a>(&'a self, record_pattern: &'a str, params: QueryParams) -> BoxFuture<'a, Result<Vec<StoredValue>, PersistenceError>>; fn cleanup(&self, older_than: u64) -> BoxFuture<'_, Result<u64, PersistenceError>>; }
If you have a Postgres or TimescaleDB deployment on Kubernetes, you can implement this trait yourself and drop it in — the builder extension and query API work identically regardless of what's underneath.
Enabling Persistence
Persistence is configured in two steps on the builder: first you register the backend and retention window, then you opt each individual record in.
use std::sync::Arc; use std::time::Duration; use aimdb_persistence::{AimDbBuilderPersistExt, RecordRegistrarPersistExt}; use aimdb_persistence_sqlite::SqliteBackend; let backend = Arc::new(SqliteBackend::new("./data/history.db")?); let mut builder = AimDbBuilder::new() .runtime(runtime) .with_persistence(backend, Duration::from_secs(7 * 24 * 3600)); builder.configure::<Accuracy>("accuracy::vienna", |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 500 }) .source(accuracy_producer) .persist("accuracy::vienna"); // ← one call, that's it }); builder.configure::<Accuracy>("accuracy::berlin", |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 500 }) .source(accuracy_producer) .persist("accuracy::berlin"); });
with_persistence stores the backend in AimDB's Extensions TypeMap and registers a retention cleanup task that runs once at startup (to catch up on any data that accumulated while the process was down) and then every 24 hours.
.persist("accuracy::vienna") is the only thing you add to your existing record configuration. It doesn't require .with_remote_access(), it doesn't change your producer or consumer code, and it has no effect on the record's buffer semantics.
How It Works Under the Hood
Persistence is implemented as a buffer tap — the same mechanism you'd use to attach a logger or a WebSocket broadcaster. When you call .persist(), AimDB registers a background subscriber that wakes up for every new value, serialises it to JSON, and writes it to the backend:
Producer → [record buffer] → consumers
│
└─── tap (persistence subscriber)
serde_json::to_value(&T)
backend.store(name, json, now_ms)
This means:
- No coupling to
aimdb-core— the core crate has no dependency on storage types. Persistence is a pure extension. T: Serializeis all you need — any record type that derivesserde::Serializecan be persisted.- Back-pressure is inherited — the subscriber uses the same buffer subscription as any other tap. If the backend is slow, the tap will lag behind production; it won't block the producer.
The SQLite backend takes this one step further by running all database I/O on a dedicated OS thread. Async callers send commands over a bounded channel and await a one-shot reply — the executor is never blocked by a write or query:
async task mpsc::SyncSender OS thread
│ ─── DbCommand::Store ──→ rusqlite::Connection
│ ←── oneshot reply ─────── (WAL mode)
│
.await
Querying Historical Data
Once records are persisting, you can query them from the live AimDb handle at any time:
use aimdb_persistence::AimDbQueryExt; // Latest 5 values per matching record — pattern supports * wildcard let latest: Vec<Accuracy> = db.query_latest("accuracy::*", 5).await?; // All values for one city in a time window (Unix milliseconds) let start = 1_740_000_000_000_u64; let end = 1_740_086_400_000_u64; // +24 hours let history: Vec<Accuracy> = db .query_range("accuracy::vienna", start, end) .await?;
Pattern matching mirrors the record naming convention you already use. "accuracy::*" returns the latest N values from every record whose name starts with "accuracy::" — no extra configuration, no separate schema.
query_range returns every row in the time window — there is no implicit truncation. If you store a reading every 30 seconds and query 48 hours, you get 5,760 rows. The caller decides how much history they need.
Rows that can't be deserialised as T (for example, because the record's type has evolved since they were stored) are skipped with a warning rather than failing the entire query. This keeps historical queries resilient to schema changes over time.
Retention
The cleanup task registered by with_persistence deletes rows older than the configured retention window:
DELETE FROM record_history WHERE stored_at < (now - retention_ms)
At startup the sweep runs unconditionally — this handles the case where the process was down for days and data exceeded retention while it was offline. After that it runs on a 24-hour cadence.
If a cleanup sweep fails (disk error, lock timeout, etc.) the error is always visible: tracing::warn! when the tracing feature is enabled, and an eprintln! fallback otherwise. Operators are never silently left with an unbounded database.
The AimX Protocol Integration
The persistence layer also integrates with AimDB's remote access protocol (AimX). Once you've called with_persistence, the record.query method becomes available to any connected client — the CLI, the MCP server, or the dashboard:
→ { "id": 1, "method": "record.query", "params": { "name": "accuracy::*", "limit": 1 } } ← { "id": 1, "result": { "count": 2, "values": [ { "record": "accuracy::vienna", "value": { "value": 0.94 }, "stored_at": 1740081234000 }, { "record": "accuracy::berlin", "value": { "value": 0.91 }, "stored_at": 1740081190000 } ]}}
This happens without any extra configuration. The builder wires up a type-erased handler through the Extensions TypeMap during with_persistence, and the protocol handler delegates to it at query time — without importing any persistence types in aimdb-core.
What's Not Persistence's Job
A few things worth calling out explicitly:
Replay and backfill — persisted history is for querying, not for re-running transforms over past data. Replay (re-feeding historical values through the live dataflow) is a separate concern we're exploring for a future milestone.
Cross-node synchronisation — if you run multiple AimDB processes, each maintains its own local store. The persistence layer has no distributed coordination. For shared history across edge and cloud nodes, use a shared backend implementation (e.g. Postgres over a connection pool).
Caching or read-through — query_latest always reads from the backend; it does not consult the record's live buffer. The live buffer and the history store are independent: one is in-memory and ephemeral, the other is durable and queryable.
Summary
| Feature | Details |
|---|---|
| Opt-in per record | Add .persist("key") — no change to producers or consumers |
Any T: Serialize | No .with_remote_access() needed |
| Pattern queries | "accuracy::*" wildcards work out of the box |
| Full time-range results | query_range has no implicit row cap |
| Pluggable backend | Implement PersistenceBackend for Postgres, TimescaleDB, etc. |
| Retention cleanup | Automatic on startup + every 24 hours |
| AimX protocol | record.query works once with_persistence is called |
Check out the aimdb-persistence documentation to get started, or browse the source on GitHub if you want to implement a custom backend.
Have questions? Open an issue on GitHub or join the discussion.