terraphim_persistence - Multi-Backend Storage Abstraction
Overview
terraphim_persistence provides a multi-backend storage abstraction layer for the Terraphim AI system. It supports multiple storage backends with automatic fallback, cache warm-up, and transparent compression. The crate uses the OpenDAL project for unified storage operations.
Domain Model
Core Concepts
DeviceStorage
Multi-backend storage manager with speed-based selection and caching.
pub struct DeviceStorage {
pub ops: HashMap<String, (Operator, u128)>,
pub fastest_op: Operator,
}
Key Responsibilities:
- Manage multiple storage backends
- Track backend performance
- Provide fastest backend selection
- Handle backend fallbacks
Operator
Storage backend implementation (OpenDAL abstraction).
pub trait Operator: Send + Sync {
}
Common Implementations:
- Memory: In-memory storage
- DashMap: Concurrent hashmap
- SQLite: File-based database
- ReDB: Embedded database
- S3: Cloud object storage
Persistence Interface
Persistable
Trait for objects that can be saved and loaded from storage.
#[async_trait]
pub trait Persistable: Serialize + DeserializeOwned {
fn new(key: String) -> Self;
async fn save(&self) -> Result<()>;
async fn save_to_one(&self, profile_name: &str) -> Result<()>;
async fn load(&mut self) -> Result<Self>
where
Self: Sized;
async fn load_config(&self) -> Result<(HashMap<String, (Operator, u128)>, Operator)>;
async fn save_to_all(&self) -> Result<()>;
async fn save_to_profile(&self, profile_name: &str) -> Result<()>;
}
Key Responsibilities:
- Define save/load interface
- Support multi-profile writes
- Enable selective profile writes
- Provide configuration loading
Data Models
Storage Profiles
Profile
Storage backend configuration.
{
"type": "memory",
"root": "/path/to/storage"
}
Profile Types:
- memory: In-memory storage (fastest, no persistence)
- dashmap: Concurrent hashmap (fast, in-memory)
- sqlite: File-based database (persistent, moderate speed)
- redb: Embedded database (persistent, fast)
- s3: Cloud object storage (persistent, network latency)
Compression
Compression Strategy
Automatic compression for large objects.
fn maybe_compress(data: &[u8]) -> Vec<u8> {
const COMPRESSION_THRESHOLD: usize = 1_000_000;
if data.len() > COMPRESSION_THRESHOLD {
zstd::encode_all(data, 3).unwrap_or_else(|_| data.to_vec())
} else {
data.to_vec()
}
}
fn maybe_decompress(data: &[u8]) -> Result<Vec<u8>> {
}
Pattern:
- Compress objects over 1MB
- Use zstd compression (level 3)
- Handle decompression gracefully
- Fall back to raw data on error
Implementation Patterns
Storage Initialisation
Multi-Backend Setup
async fn init_device_storage_with_settings(settings: DeviceSettings) -> Result<DeviceStorage> {
for profile in settings.profiles.values() {
let profile_type = profile.get("type").unwrap_or(&"unknown");
match profile_type.as_str() {
"sqlite" => {
if let Some(datadir) = profile.get("datadir") {
if !datadir.is_empty() {
let expanded = expand_tilde(datadir);
std::fs::create_dir_all(&expanded)?;
}
}
}
"redb" => {
if let Some(datadir) = profile.get("datadir") {
if !datadir.is_empty() {
let expanded = expand_tilde(datadir);
std::fs::create_dir_all(&expanded)?;
}
}
}
"dashmap" => {
if let Some(root) = profile.get("root") {
if !root.is_empty() {
let expanded = expand_tilde(root);
std::fs::create_dir_all(&expanded)?;
}
}
}
_ => {}
}
}
let operators = settings::parse_profiles(&settings).await?;
let mut ops_vec: Vec<(&String, &(Operator, u128))> = operators.iter().collect();
ops_vec.sort_by_key(|&(_, (_, speed))| speed);
let ops: HashMap<String, (Operator, u128)> = ops_vec
.into_iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let fastest_op = match ops.values().next() {
Some((op, _)) => op.clone(),
None => return Err(Error::NoOperator),
};
Ok(DeviceStorage { ops, fastest_op })
}
Pattern:
- Parse configuration profiles
- Sort by speed (lower = faster)
- Select fastest as default
- Create operator map
Singleton Pattern
static DEVICE_STORAGE: AsyncOnceCell<DeviceStorage> = AsyncOnceCell::new();
impl DeviceStorage {
pub async fn instance() -> Result<&'static DeviceStorage> {
let storage = DEVICE_STORAGE
.get_or_try_init(async {
let initialised_storage = init_device_storage().await?;
Ok::<DeviceStorage, Error>(initialised_storage)
})
.await?;
Ok(storage)
}
pub async fn init_memory_only() -> Result<&'static DeviceStorage> {
let storage = DEVICE_STORAGE
.get_or_try_init(async {
let settings = memory::create_memory_only_device_settings()?;
let initialised_storage = init_device_storage_with_settings(settings).await?;
Ok::<DeviceStorage, Error>(initialised_storage)
})
.await?;
Ok(storage)
}
}
Pattern:
- Use
AsyncOnceCell for lazy initialisation
- Support normal and memory-only modes
- Return shared reference
- Initialise once per process
Multi-Backend Operations
Save to All Profiles
async fn save_to_all(&self) -> Result<()> {
let (ops, _fastest_op) = &self.load_config().await?;
let key = self.get_key();
let serde_str = serde_json::to_string(&self)?;
for (op, _time) in ops.values() {
log::debug!("Saving to operator: {:?}", op);
op.write(&key, serde_str.clone()).await?;
}
Ok(())
}
Pattern:
- Load configuration
- Serialise object
- Write to all profiles
- Handle failures gracefully
Load with Fallback
async fn load_from_operator(&self, key: &str, _op: &Operator) -> Result<Self>
where
Self: Sized,
{
let span = debug_span!("load_from_operator", key = %key);
async {
let (ops, fastest_op) = &self.load_config().await?;
async fn try_read_from_op<T: DeserializeOwned>(
op: &Operator,
key: &str,
profile_name: Option<&str>,
) -> Option<std::result::Result<T, Error>> {
let span = debug_span!("try_read", profile = ?profile_name);
async {
match op.stat(key).await {
Ok(_) => {
match op.read(key).await {
Ok(bs) => {
let data = match maybe_decompress(&bs.to_vec()) {
Ok(decompressed) => decompressed,
Err(e) => {
log::debug!("Decompression failed for '{}', using raw data: {}", key, e);
bs.to_vec()
}
};
match serde_json::from_slice(&data) {
Ok(obj) => {
if let Some(name) = profile_name {
log::debug!("Loaded '{}' from profile '{}'", key, name);
} else {
log::debug!("Loaded '{}' from fastest operator (cache hit)", key);
}
Some(Ok(obj))
}
Err(e) => {
log::warn!("Failed to deserialize '{}': {}", key, e);
Some(Err(Error::Json(e)))
}
}
}
Err(e) => {
log::debug!("Failed to read '{}' after stat: {}", key, e);
Some(Err(e.into()))
}
}
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => {
log::debug!("File '{}' not found in storage (cache miss)", key);
None
}
Err(e) => {
log::debug!("Failed to stat '{}': {}", key, e);
Some(Err(e.into()))
}
}
}.instrument(span).await
}
let schema_evolution_detected = {
let fastest_result = try_read_from_op::<Self>(fastest_op, key, None).await;
match fastest_result {
Some(Ok(obj)) => return Ok(obj),
Some(Err(Error::Json(_))) => true,
Some(Err(_)) => false,
None => false,
}
};
if schema_evolution_detected {
log::info!("Schema evolution detected for '{}': clearing cache and refetching", key);
let delete_span = debug_span!("cache_clear", key = %key);
async {
if let Err(e) = fastest_op.delete(key).await {
log::debug!("Failed to delete stale cache entry '{}': {}", key, e);
}
}.instrument(delete_span).await;
}
let mut ops_vec: Vec<(&String, &(Operator, u128))> = ops.iter().collect();
ops_vec.sort_by_key(|&(_, (_, speed))| speed);
for (profile_name, (op, _speed)) in ops_vec {
if std::ptr::eq(op as *const Operator, fastest_op as *const Operator) {
continue;
}
log::debug!("Trying to load '{}' from profile '{}'", key, profile_name);
if let Some(result) = try_read_from_op::<Self>(op, key, Some(profile_name)).await {
match result {
Ok(obj) => {
log::info!("Successfully loaded '{}' from fallback profile '{}'", key, profile_name);
if let Ok(serialised) = serde_json::to_vec(&obj) {
let fastest = fastest_op.clone();
let k = key.to_string();
let data_len = serialised.len();
tokio::spawn(async move {
let span = debug_span!("cache_writeback", key = %k, size = data_len);
async {
let data = maybe_compress(&serialised);
let compressed = data.len() < serialised.len();
match fastest.write(&k, data).await {
Ok(_) => {
if compressed {
log::debug!("Cached '{}' to fastest operator ({} bytes compressed)", k, data_len);
} else {
log::debug!("Cached '{}' to fastest operator ({} bytes)", k, data_len);
}
}
Err(e) => {
log::debug!("Failed to cache '{}' to fastest operator: {}", k, e);
}
}
}.instrument(span).await
});
}
return Ok(obj);
}
Err(e) => {
log::error!("Failed to load '{}' from profile '{}': {}", key, profile_name, e);
}
}
}
}
Err(Error::NoOperatorAvailable)
}.instrument(span).await
}
Pattern:
- Try fastest operator first
- Detect schema evolution
- Clear stale cache if needed
- Try fallback operators in speed order
- Asynchronous cache write-back
- Handle all error cases
Cache Warm-Up
Cache Write-Back
if let Ok(serialised) = serde_json::to_vec(&obj) {
let fastest = fastest_op.clone();
let k = key.to_string();
let data_len = serialised.len();
tokio::spawn(async move {
let span = debug_span!("cache_writeback", key = %k, size = data_len);
async {
let data = maybe_compress(&serialised);
let compressed = data.len() < serialised.len();
match fastest.write(&k, data).await {
Ok(_) => {
if compressed {
log::debug!("Cached '{}' to fastest operator ({} bytes compressed)", k, data_len);
} else {
log::debug!("Cached '{}' to fastest operator ({} bytes)", k, data_len);
}
}
Err(e) => {
log::debug!("Failed to cache '{}' to fastest operator: {}", k, e);
}
}
}.instrument(span).await
});
}
Pattern:
- Spawn fire-and-forget task
- Compress if beneficial
- Log at debug level
- Don't block load operation
Same-Operator Detection
async fn save_to_profile(&self, profile_name: &str) -> Result<()> {
let (ops, _fastest_op) = &self.load_config().await?;
let key = self.get_key();
let serde_str = serde_json::to_string(&self)?;
ops.get(profile_name)
.ok_or_else(|| {
Error::Profile(format!(
"Unknown profile name: {profile_name}. Available profiles: {}",
ops.keys().map(|k| k.as_str()).collect::<Vec<&str>>().join(", ")
))
})?
.0
.write(&key, serde_str.clone())
.await
.map_err(|e| Error::OpenDal(Box::new(e)))?;
Ok(())
}
Pattern:
- Look up profile by name
- Check if operator is fastest
- Skip write if same-operator to avoid redundant work
Error Handling
Error Types
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Profile error: {0}")]
Profile(String),
#[error("OpenDal error: {0}")]
OpenDal(Box<opendal::Error>),
#[error("No operator available")]
NoOperator,
#[error("No operator available")]
NoOperatorAvailable,
#[error("Serde JSON error")]
Json(#[from] serde_json::Error),
#[error("IO error")]
Io(#[from] std::io::Error),
}
Categories:
- Configuration: Profile errors
- Storage: OpenDal errors
- Availability: No operator errors
- Serialisation: JSON errors
- I/O: File system errors
Performance Optimisations
Compression Strategy
Adaptive Compression
fn maybe_compress(data: &[u8]) -> Vec<u8> {
const COMPRESSION_THRESHOLD: usize = 1_000_000;
if data.len() > COMPRESSION_THRESHOLD {
zstd::encode_all(data, 3).unwrap_or_else(|_| data.to_vec())
} else {
data.to_vec()
}
}
Threshold:
- 1MB compression threshold
- Level 3 compression (balanced)
- Fall back to raw on failure
Backend Selection
Speed-Based Ordering
let mut ops_vec: Vec<(&String, &(Operator, u128))> = operators.iter().collect();
ops_vec.sort_by_key(|&(_, (_, speed))| speed);
Strategy:
- Measure backend speed during initialisation
- Sort by speed (lower = faster)
- Use fastest for all reads
- Use all for writes
Caching
Lazy Initialisation
static DEVICE_STORAGE: AsyncOnceCell<DeviceStorage> = AsyncOnceCell::new();
impl DeviceStorage {
pub async fn instance() -> Result<&'static DeviceStorage> {
let storage = DEVICE_STORAGE
.get_or_try_init(async {
let initialised_storage = init_device_storage().await?;
Ok::<DeviceStorage, Error>(initialised_storage)
})
.await?;
Ok(storage)
}
}
Pattern:
- Initialise on first access
- Cache initialised instance
- Thread-safe lazy loading
Testing Patterns
Unit Tests
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_only_storage() {
let storage = DeviceStorage::init_memory_only().await.unwrap();
assert!(storage.fastest_op.is_some());
}
#[tokio::test]
async fn test_persistable_trait() {
struct TestData {
key: String,
value: String,
}
impl Serialize for TestData {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_struct("TestData", 2)?
.serialize_field("key", &self.key)?
.serialize_field("value", &self.value)?
.end()
}
}
impl<'de> Deserialize<'de> for TestData {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
enum Field { Key, Value }
struct TestDataVisitor;
impl<'de> serde::de::Visitor<'de> for TestDataVisitor {
type Value = TestData;
fn expecting(&self) -> &'static str {
"struct TestData"
}
fn visit_seq<A>(self, mut seq: A) -> Result<TestData, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut key = None;
let mut value = None;
while let Some(field) = seq.next_element()? {
match field {
Field::Key => key = Some(serde::de::MapAccess::next_value(&mut seq)?),
Field::Value => value = Some(serde::de::MapAccess::next_value(&mut seq)?),
}
}
Ok(TestData {
key: key.ok_or_else(|| serde::de::Error::missing_field("key"))?,
value: value.ok_or_else(|| serde::de::Error::missing_field("value"))?,
})
}
}
deserializer.deserialize_struct("TestData", &["key", "value"], TestDataVisitor)
}
}
impl Persistable for TestData {
fn new(key: String) -> Self {
Self { key, value: String::new() }
}
async fn load(&mut self) -> Result<Self>
where
Self: Sized,
{
Ok(self.clone())
}
async fn save(&self) -> Result<()> {
Ok(())
}
async fn load_config(&self) -> Result<(HashMap<String, (opendal::Operator, u128)>, opendal::Operator)> {
unimplemented!()
}
async fn save_to_all(&self) -> Result<()> {
Ok(())
}
async fn save_to_profile(&self, profile_name: &str) -> Result<()> {
Ok(())
}
}
let mut test_data = TestData {
key: "test-key".to_string(),
value: "test-value".to_string(),
};
}
}
Best Practices
Storage Design
- Use multiple backends for resilience
- Prioritise speed for reads
- Write to all backends for safety
- Implement graceful degradation
Error Handling
- Provide context in errors
- Categorise error types
- Support fallback mechanisms
- Log at appropriate levels
Performance
- Use async throughout
- Minimise lock duration
- Implement caching
- Use compression for large objects
Testing
- Test with memory-only backend
- Test multi-backend scenarios
- Test cache warm-up
- Test error handling
Future Enhancements
Planned Features
Write-Ahead Caching
async fn cache_prefetch(&self, keys: Vec<String>) -> Result<()> {
}
Storage Quotas
struct StorageQuota {
max_size: u64,
current_size: u64,
}
async fn enforce_quota(&self, data_size: usize) -> Result<()> {
}
Automatic Cleanup
async fn cleanup_old_entries(&self, ttl: Duration) -> Result<()> {
}
References