bp_core/storage/
mod.rs

1//! Local storage management for a Pouch service instance.
2//!
3//! ## Responsibilities
4//!
5//! - Create and maintain the on-disk directory structure for a Pouch bid.
6//! - Persist and load [`EncodedFragment`]s in the binary `.frag` format.
7//! - Maintain an in-memory [`FragmentIndex`] for fast chunk/fragment lookups.
8//! - Enforce the storage quota declared at bid time.
9//! - Expose recoding helpers so the daemon can generate new fragments for
10//!   newly-joined Pouches without touching the original data.
11//!
12//! ## Directory layout
13//!
14//! ```text
15//! ~/.local/share/billpouch/storage/<network_id>/<service_id>/
16//!   meta.json
17//!   fragments/
18//!     <chunk_id>/
19//!       <fragment_id>.frag
20//! ```
21
22pub mod encryption;
23pub mod file_registry;
24pub mod fragment;
25pub mod manifest;
26pub mod meta;
27pub mod tier;
28
29pub use encryption::ChunkCipher;
30pub use file_registry::{FileRegistry, StoredFileEntry};
31pub use fragment::{FragmentIndex, FragmentMeta};
32pub use manifest::{ChunkManifest, FileManifest, FragmentLocation, NetworkMetaKey};
33pub use meta::PouchMeta;
34pub use tier::StorageTier;
35
36use crate::{
37    coding::rlnc::{self, EncodedFragment},
38    config,
39    error::{BpError, BpResult},
40};
41use std::path::{Path, PathBuf};
42
43// ── StorageManager ────────────────────────────────────────────────────────────
44
45/// Manages on-disk storage for a single Pouch service instance.
46///
47/// One `StorageManager` exists per active Pouch service.
48/// It is held inside `DaemonState` (wrapped in `Arc<RwLock<StorageManager>>`)
49/// alongside the service registry.
50pub struct StorageManager {
51    /// Absolute path to `storage/<network_id>/<service_id>/`.
52    root: PathBuf,
53    /// Persistent quota and usage tracking.
54    pub meta: PouchMeta,
55    /// In-memory fragment index (rebuilt from disk on load).
56    pub index: FragmentIndex,
57}
58
59impl StorageManager {
60    // ── Construction ─────────────────────────────────────────────────────
61
62    /// Initialise a brand-new Pouch storage directory.
63    ///
64    /// Creates the directory structure, writes `meta.json`, and returns a
65    /// manager with an empty fragment index.
66    ///
67    /// # Errors
68    /// Fails if disk I/O fails or the XDG base directory cannot be resolved.
69    pub fn init(network_id: String, service_id: String, storage_bytes_bid: u64) -> BpResult<Self> {
70        let root = Self::root_path(&network_id, &service_id)?;
71        let fragments_dir = root.join("fragments");
72        std::fs::create_dir_all(&fragments_dir).map_err(BpError::Io)?;
73
74        let meta = PouchMeta::new(network_id, service_id, storage_bytes_bid);
75        meta.save(&root.join("meta.json"))?;
76
77        Ok(Self {
78            root,
79            meta,
80            index: FragmentIndex::new(),
81        })
82    }
83
84    /// Load an existing Pouch storage directory from disk.
85    ///
86    /// Reads `meta.json` and rebuilds the fragment index by scanning the
87    /// `fragments/` subdirectory.
88    pub fn load(network_id: &str, service_id: &str) -> BpResult<Self> {
89        let root = Self::root_path(network_id, service_id)?;
90        let meta_path = root.join("meta.json");
91        if !meta_path.exists() {
92            return Err(BpError::Storage(format!(
93                "No storage found for service {service_id} on network {network_id}"
94            )));
95        }
96
97        let meta = PouchMeta::load(&meta_path)?;
98        let index = Self::build_index(&root)?;
99
100        Ok(Self { root, meta, index })
101    }
102
103    // ── Fragment I/O ──────────────────────────────────────────────────────
104
105    /// Persist an encoded fragment to disk and update the index.
106    ///
107    /// Returns an error if the Pouch does not have enough remaining capacity.
108    pub fn store_fragment(&mut self, fragment: &EncodedFragment) -> BpResult<()> {
109        let bytes = fragment.to_bytes();
110        let size = bytes.len() as u64;
111
112        if !self.meta.has_capacity(size) {
113            return Err(BpError::Storage(format!(
114                "Quota exceeded: need {size} bytes, only {} available",
115                self.meta.available_bytes()
116            )));
117        }
118
119        // Ensure chunk subdirectory exists
120        let chunk_dir = self.root.join("fragments").join(&fragment.chunk_id);
121        std::fs::create_dir_all(&chunk_dir).map_err(BpError::Io)?;
122
123        // Write the .frag file
124        let frag_path = chunk_dir.join(format!("{}.frag", fragment.id));
125        std::fs::write(&frag_path, &bytes).map_err(BpError::Io)?;
126
127        // Update meta and index
128        self.meta.storage_bytes_used += size;
129        self.save_meta()?;
130
131        self.index.insert(FragmentMeta {
132            fragment_id: fragment.id.clone(),
133            chunk_id: fragment.chunk_id.clone(),
134            k: fragment.k,
135            size_bytes: size,
136        });
137
138        Ok(())
139    }
140
141    /// Load a fragment from disk by chunk_id and fragment_id.
142    pub fn load_fragment(&self, chunk_id: &str, fragment_id: &str) -> BpResult<EncodedFragment> {
143        let path = self
144            .root
145            .join("fragments")
146            .join(chunk_id)
147            .join(format!("{fragment_id}.frag"));
148
149        if !path.exists() {
150            return Err(BpError::Storage(format!(
151                "Fragment not found: {chunk_id}/{fragment_id}"
152            )));
153        }
154
155        let bytes = std::fs::read(&path).map_err(BpError::Io)?;
156        EncodedFragment::from_bytes(fragment_id.to_string(), chunk_id.to_string(), &bytes)
157    }
158
159    /// Remove a fragment from disk and update the index.
160    pub fn remove_fragment(&mut self, chunk_id: &str, fragment_id: &str) -> BpResult<()> {
161        let path = self
162            .root
163            .join("fragments")
164            .join(chunk_id)
165            .join(format!("{fragment_id}.frag"));
166
167        if path.exists() {
168            std::fs::remove_file(&path).map_err(BpError::Io)?;
169        }
170
171        // Remove empty chunk directory
172        let chunk_dir = self.root.join("fragments").join(chunk_id);
173        if chunk_dir.exists()
174            && std::fs::read_dir(&chunk_dir).map_or(true, |mut d| d.next().is_none())
175        {
176            std::fs::remove_dir(&chunk_dir).ok();
177        }
178
179        if let Some(removed) = self.index.remove(chunk_id, fragment_id) {
180            self.meta.storage_bytes_used = self
181                .meta
182                .storage_bytes_used
183                .saturating_sub(removed.size_bytes);
184            self.save_meta()?;
185        }
186
187        Ok(())
188    }
189
190    // ── Recoding ──────────────────────────────────────────────────────────
191
192    /// Generate `count` new fragments for `chunk_id` by recoding the local ones.
193    ///
194    /// This is the key operation for filling new Pouches: no decoding is performed,
195    /// no original data is required.
196    ///
197    /// Returns an error if this Pouch holds no fragments for the given chunk.
198    pub fn recode_chunk(&self, chunk_id: &str, count: usize) -> BpResult<Vec<EncodedFragment>> {
199        let metas = self.index.fragments_for_chunk(chunk_id);
200        if metas.is_empty() {
201            return Err(BpError::Storage(format!(
202                "recode: no fragments for chunk {chunk_id}"
203            )));
204        }
205
206        // Load all local fragments for this chunk
207        let fragments: Vec<EncodedFragment> = metas
208            .iter()
209            .map(|m| self.load_fragment(chunk_id, &m.fragment_id))
210            .collect::<BpResult<_>>()?;
211
212        rlnc::recode(&fragments, count).map_err(|e| BpError::Storage(e.to_string()))
213    }
214
215    // ── Accessors ─────────────────────────────────────────────────────────
216
217    /// Whether this Pouch has remaining capacity for `bytes` more data.
218    pub fn has_capacity(&self, bytes: u64) -> bool {
219        self.meta.has_capacity(bytes)
220    }
221
222    /// Root directory of this Pouch's storage.
223    pub fn root(&self) -> &Path {
224        &self.root
225    }
226
227    /// Permanently delete all on-disk storage for this Pouch.
228    ///
229    /// Removes the entire storage directory (`fragments/`, `meta.json`, etc.).
230    /// Call only after the service has been removed from `ServiceRegistry`.
231    pub fn purge(&self) -> BpResult<()> {
232        if self.root.exists() {
233            std::fs::remove_dir_all(&self.root).map_err(BpError::Io)?;
234        }
235        tracing::info!(root=?self.root, "Pouch storage purged");
236        Ok(())
237    }
238
239    /// Summary of what is stored: (chunk_count, fragment_count, total_bytes).
240    pub fn storage_summary(&self) -> (usize, usize, u64) {
241        let chunks = self.index.chunk_ids().count();
242        let frags = self.index.fragment_count();
243        let bytes = self.index.total_bytes();
244        (chunks, frags, bytes)
245    }
246
247    // ── Private helpers ───────────────────────────────────────────────────
248
249    fn root_path(network_id: &str, service_id: &str) -> BpResult<PathBuf> {
250        Ok(config::base_dir()?
251            .join("storage")
252            .join(network_id)
253            .join(service_id))
254    }
255
256    fn save_meta(&self) -> BpResult<()> {
257        self.meta.save(&self.root.join("meta.json"))
258    }
259
260    /// Rebuild the fragment index from the on-disk `fragments/` directory.
261    fn build_index(root: &Path) -> BpResult<FragmentIndex> {
262        let fragments_dir = root.join("fragments");
263        let mut index = FragmentIndex::new();
264
265        if !fragments_dir.exists() {
266            return Ok(index);
267        }
268
269        for chunk_entry in std::fs::read_dir(&fragments_dir).map_err(BpError::Io)? {
270            let chunk_entry = chunk_entry.map_err(BpError::Io)?;
271            let chunk_id = chunk_entry.file_name().to_string_lossy().to_string();
272
273            for frag_entry in std::fs::read_dir(chunk_entry.path()).map_err(BpError::Io)? {
274                let frag_entry = frag_entry.map_err(BpError::Io)?;
275                let fname = frag_entry.file_name().to_string_lossy().to_string();
276                if !fname.ends_with(".frag") {
277                    continue;
278                }
279                let fragment_id = fname.trim_end_matches(".frag").to_string();
280                let size_bytes = frag_entry.metadata().map(|m| m.len()).unwrap_or(0);
281
282                // Read the header to get k
283                let path = frag_entry.path();
284                let header = std::fs::read(&path).map_err(BpError::Io)?;
285                let k = if header.len() >= 8 {
286                    u32::from_le_bytes(header[4..8].try_into().unwrap()) as usize
287                } else {
288                    continue; // corrupt file, skip
289                };
290
291                index.insert(FragmentMeta {
292                    fragment_id,
293                    chunk_id: chunk_id.clone(),
294                    k,
295                    size_bytes,
296                });
297            }
298        }
299
300        Ok(index)
301    }
302}
303
304// ── Tests ─────────────────────────────────────────────────────────────────────
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::coding::rlnc;
310    use std::sync::Mutex;
311
312    static STORAGE_TEST_LOCK: Mutex<()> = Mutex::new(());
313
314    fn with_temp_home_storage<F: FnOnce() + std::panic::UnwindSafe>(f: F) {
315        let _guard = STORAGE_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
316        let tmp =
317            std::env::temp_dir().join(format!("bp_stor_test_{}", uuid::Uuid::new_v4().simple()));
318        std::fs::create_dir_all(&tmp).unwrap();
319        let old_home = std::env::var("HOME").ok();
320        std::env::set_var("HOME", &tmp);
321
322        let result = std::panic::catch_unwind(f);
323
324        if let Some(h) = old_home {
325            std::env::set_var("HOME", h);
326        } else {
327            std::env::remove_var("HOME");
328        }
329        std::fs::remove_dir_all(&tmp).ok();
330        if let Err(e) = result {
331            std::panic::resume_unwind(e);
332        }
333    }
334
335    const CHUNK: &[u8] = b"StorageManager test data - enough bytes to split into 4 symbols.";
336
337    #[test]
338    #[cfg(unix)]
339    fn init_creates_directory_and_meta() {
340        with_temp_home_storage(|| {
341            let sm = StorageManager::init("amici".into(), "svc-1".into(), 1_000_000).unwrap();
342            assert!(sm.root().exists());
343            assert!(sm.root().join("meta.json").exists());
344            assert!(sm.root().join("fragments").exists());
345            assert_eq!(sm.meta.storage_bytes_bid, 1_000_000);
346            assert_eq!(sm.meta.storage_bytes_used, 0);
347        });
348    }
349
350    #[test]
351    #[cfg(unix)]
352    fn store_and_load_fragment() {
353        with_temp_home_storage(|| {
354            let mut sm = StorageManager::init("net".into(), "svc".into(), 1_000_000).unwrap();
355            let frags = rlnc::encode(CHUNK, 4, 6).unwrap();
356
357            sm.store_fragment(&frags[0]).unwrap();
358
359            let loaded = sm.load_fragment(&frags[0].chunk_id, &frags[0].id).unwrap();
360            assert_eq!(loaded.coding_vector, frags[0].coding_vector);
361            assert_eq!(loaded.data, frags[0].data);
362            assert_eq!(sm.meta.storage_bytes_used, frags[0].to_bytes().len() as u64);
363        });
364    }
365
366    #[test]
367    #[cfg(unix)]
368    fn quota_exceeded_returns_error() {
369        with_temp_home_storage(|| {
370            // Tiny quota — won't fit even one fragment
371            let mut sm = StorageManager::init("net".into(), "svc".into(), 1).unwrap();
372            let frags = rlnc::encode(CHUNK, 4, 4).unwrap();
373            assert!(sm.store_fragment(&frags[0]).is_err());
374        });
375    }
376
377    #[test]
378    #[cfg(unix)]
379    fn remove_fragment_frees_space() {
380        with_temp_home_storage(|| {
381            let mut sm = StorageManager::init("net".into(), "svc".into(), 1_000_000).unwrap();
382            let frags = rlnc::encode(CHUNK, 4, 4).unwrap();
383            sm.store_fragment(&frags[0]).unwrap();
384            let used = sm.meta.storage_bytes_used;
385            sm.remove_fragment(&frags[0].chunk_id, &frags[0].id)
386                .unwrap();
387            assert_eq!(sm.meta.storage_bytes_used, 0);
388            let _ = used; // just checking it was non-zero before
389        });
390    }
391
392    #[test]
393    #[cfg(unix)]
394    fn recode_chunk_produces_valid_fragments() {
395        with_temp_home_storage(|| {
396            let mut sm = StorageManager::init("net".into(), "svc".into(), 1_000_000).unwrap();
397            let k = 4;
398            let frags = rlnc::encode(CHUNK, k, k + 4).unwrap();
399
400            // Store 3 fragments
401            for f in &frags[..3] {
402                sm.store_fragment(f).unwrap();
403            }
404
405            let recoded = sm.recode_chunk(&frags[0].chunk_id, 2).unwrap();
406            assert_eq!(recoded.len(), 2);
407            assert_eq!(recoded[0].k, k);
408        });
409    }
410
411    #[test]
412    #[cfg(unix)]
413    fn load_rebuilds_index_from_disk() {
414        with_temp_home_storage(|| {
415            let network_id = "net";
416            let service_id = "svc";
417            let frags = rlnc::encode(CHUNK, 4, 4).unwrap();
418
419            {
420                let mut sm =
421                    StorageManager::init(network_id.into(), service_id.into(), 1_000_000).unwrap();
422                sm.store_fragment(&frags[0]).unwrap();
423                sm.store_fragment(&frags[1]).unwrap();
424            }
425
426            // Load fresh manager — must rebuild index from disk
427            let sm2 = StorageManager::load(network_id, service_id).unwrap();
428            assert_eq!(sm2.index.fragment_count(), 2);
429            assert_eq!(sm2.meta.storage_bytes_used, sm2.index.total_bytes());
430        });
431    }
432}