1pub mod encryption;
23pub mod file_registry;
24pub mod fragment;
25pub mod manifest;
26pub mod meta;
27pub mod tier;
28
29pub use encryption::ChunkCipher;
30pub use file_registry::{FileRegistry, StoredFileEntry};
31pub use fragment::{FragmentIndex, FragmentMeta};
32pub use manifest::{ChunkManifest, FileManifest, FragmentLocation, NetworkMetaKey};
33pub use meta::PouchMeta;
34pub use tier::StorageTier;
35
36use crate::{
37 coding::rlnc::{self, EncodedFragment},
38 config,
39 error::{BpError, BpResult},
40};
41use std::path::{Path, PathBuf};
42
43pub struct StorageManager {
51 root: PathBuf,
53 pub meta: PouchMeta,
55 pub index: FragmentIndex,
57}
58
59impl StorageManager {
60 pub fn init(network_id: String, service_id: String, storage_bytes_bid: u64) -> BpResult<Self> {
70 let root = Self::root_path(&network_id, &service_id)?;
71 let fragments_dir = root.join("fragments");
72 std::fs::create_dir_all(&fragments_dir).map_err(BpError::Io)?;
73
74 let meta = PouchMeta::new(network_id, service_id, storage_bytes_bid);
75 meta.save(&root.join("meta.json"))?;
76
77 Ok(Self {
78 root,
79 meta,
80 index: FragmentIndex::new(),
81 })
82 }
83
84 pub fn load(network_id: &str, service_id: &str) -> BpResult<Self> {
89 let root = Self::root_path(network_id, service_id)?;
90 let meta_path = root.join("meta.json");
91 if !meta_path.exists() {
92 return Err(BpError::Storage(format!(
93 "No storage found for service {service_id} on network {network_id}"
94 )));
95 }
96
97 let meta = PouchMeta::load(&meta_path)?;
98 let index = Self::build_index(&root)?;
99
100 Ok(Self { root, meta, index })
101 }
102
103 pub fn store_fragment(&mut self, fragment: &EncodedFragment) -> BpResult<()> {
109 let bytes = fragment.to_bytes();
110 let size = bytes.len() as u64;
111
112 if !self.meta.has_capacity(size) {
113 return Err(BpError::Storage(format!(
114 "Quota exceeded: need {size} bytes, only {} available",
115 self.meta.available_bytes()
116 )));
117 }
118
119 let chunk_dir = self.root.join("fragments").join(&fragment.chunk_id);
121 std::fs::create_dir_all(&chunk_dir).map_err(BpError::Io)?;
122
123 let frag_path = chunk_dir.join(format!("{}.frag", fragment.id));
125 std::fs::write(&frag_path, &bytes).map_err(BpError::Io)?;
126
127 self.meta.storage_bytes_used += size;
129 self.save_meta()?;
130
131 self.index.insert(FragmentMeta {
132 fragment_id: fragment.id.clone(),
133 chunk_id: fragment.chunk_id.clone(),
134 k: fragment.k,
135 size_bytes: size,
136 });
137
138 Ok(())
139 }
140
141 pub fn load_fragment(&self, chunk_id: &str, fragment_id: &str) -> BpResult<EncodedFragment> {
143 let path = self
144 .root
145 .join("fragments")
146 .join(chunk_id)
147 .join(format!("{fragment_id}.frag"));
148
149 if !path.exists() {
150 return Err(BpError::Storage(format!(
151 "Fragment not found: {chunk_id}/{fragment_id}"
152 )));
153 }
154
155 let bytes = std::fs::read(&path).map_err(BpError::Io)?;
156 EncodedFragment::from_bytes(fragment_id.to_string(), chunk_id.to_string(), &bytes)
157 }
158
159 pub fn remove_fragment(&mut self, chunk_id: &str, fragment_id: &str) -> BpResult<()> {
161 let path = self
162 .root
163 .join("fragments")
164 .join(chunk_id)
165 .join(format!("{fragment_id}.frag"));
166
167 if path.exists() {
168 std::fs::remove_file(&path).map_err(BpError::Io)?;
169 }
170
171 let chunk_dir = self.root.join("fragments").join(chunk_id);
173 if chunk_dir.exists()
174 && std::fs::read_dir(&chunk_dir).map_or(true, |mut d| d.next().is_none())
175 {
176 std::fs::remove_dir(&chunk_dir).ok();
177 }
178
179 if let Some(removed) = self.index.remove(chunk_id, fragment_id) {
180 self.meta.storage_bytes_used = self
181 .meta
182 .storage_bytes_used
183 .saturating_sub(removed.size_bytes);
184 self.save_meta()?;
185 }
186
187 Ok(())
188 }
189
190 pub fn recode_chunk(&self, chunk_id: &str, count: usize) -> BpResult<Vec<EncodedFragment>> {
199 let metas = self.index.fragments_for_chunk(chunk_id);
200 if metas.is_empty() {
201 return Err(BpError::Storage(format!(
202 "recode: no fragments for chunk {chunk_id}"
203 )));
204 }
205
206 let fragments: Vec<EncodedFragment> = metas
208 .iter()
209 .map(|m| self.load_fragment(chunk_id, &m.fragment_id))
210 .collect::<BpResult<_>>()?;
211
212 rlnc::recode(&fragments, count).map_err(|e| BpError::Storage(e.to_string()))
213 }
214
215 pub fn has_capacity(&self, bytes: u64) -> bool {
219 self.meta.has_capacity(bytes)
220 }
221
222 pub fn root(&self) -> &Path {
224 &self.root
225 }
226
227 pub fn purge(&self) -> BpResult<()> {
232 if self.root.exists() {
233 std::fs::remove_dir_all(&self.root).map_err(BpError::Io)?;
234 }
235 tracing::info!(root=?self.root, "Pouch storage purged");
236 Ok(())
237 }
238
239 pub fn storage_summary(&self) -> (usize, usize, u64) {
241 let chunks = self.index.chunk_ids().count();
242 let frags = self.index.fragment_count();
243 let bytes = self.index.total_bytes();
244 (chunks, frags, bytes)
245 }
246
247 fn root_path(network_id: &str, service_id: &str) -> BpResult<PathBuf> {
250 Ok(config::base_dir()?
251 .join("storage")
252 .join(network_id)
253 .join(service_id))
254 }
255
256 fn save_meta(&self) -> BpResult<()> {
257 self.meta.save(&self.root.join("meta.json"))
258 }
259
260 fn build_index(root: &Path) -> BpResult<FragmentIndex> {
262 let fragments_dir = root.join("fragments");
263 let mut index = FragmentIndex::new();
264
265 if !fragments_dir.exists() {
266 return Ok(index);
267 }
268
269 for chunk_entry in std::fs::read_dir(&fragments_dir).map_err(BpError::Io)? {
270 let chunk_entry = chunk_entry.map_err(BpError::Io)?;
271 let chunk_id = chunk_entry.file_name().to_string_lossy().to_string();
272
273 for frag_entry in std::fs::read_dir(chunk_entry.path()).map_err(BpError::Io)? {
274 let frag_entry = frag_entry.map_err(BpError::Io)?;
275 let fname = frag_entry.file_name().to_string_lossy().to_string();
276 if !fname.ends_with(".frag") {
277 continue;
278 }
279 let fragment_id = fname.trim_end_matches(".frag").to_string();
280 let size_bytes = frag_entry.metadata().map(|m| m.len()).unwrap_or(0);
281
282 let path = frag_entry.path();
284 let header = std::fs::read(&path).map_err(BpError::Io)?;
285 let k = if header.len() >= 8 {
286 u32::from_le_bytes(header[4..8].try_into().unwrap()) as usize
287 } else {
288 continue; };
290
291 index.insert(FragmentMeta {
292 fragment_id,
293 chunk_id: chunk_id.clone(),
294 k,
295 size_bytes,
296 });
297 }
298 }
299
300 Ok(index)
301 }
302}
303
304#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::coding::rlnc;
310 use std::sync::Mutex;
311
312 static STORAGE_TEST_LOCK: Mutex<()> = Mutex::new(());
313
314 fn with_temp_home_storage<F: FnOnce() + std::panic::UnwindSafe>(f: F) {
315 let _guard = STORAGE_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
316 let tmp =
317 std::env::temp_dir().join(format!("bp_stor_test_{}", uuid::Uuid::new_v4().simple()));
318 std::fs::create_dir_all(&tmp).unwrap();
319 let old_home = std::env::var("HOME").ok();
320 std::env::set_var("HOME", &tmp);
321
322 let result = std::panic::catch_unwind(f);
323
324 if let Some(h) = old_home {
325 std::env::set_var("HOME", h);
326 } else {
327 std::env::remove_var("HOME");
328 }
329 std::fs::remove_dir_all(&tmp).ok();
330 if let Err(e) = result {
331 std::panic::resume_unwind(e);
332 }
333 }
334
335 const CHUNK: &[u8] = b"StorageManager test data - enough bytes to split into 4 symbols.";
336
337 #[test]
338 #[cfg(unix)]
339 fn init_creates_directory_and_meta() {
340 with_temp_home_storage(|| {
341 let sm = StorageManager::init("amici".into(), "svc-1".into(), 1_000_000).unwrap();
342 assert!(sm.root().exists());
343 assert!(sm.root().join("meta.json").exists());
344 assert!(sm.root().join("fragments").exists());
345 assert_eq!(sm.meta.storage_bytes_bid, 1_000_000);
346 assert_eq!(sm.meta.storage_bytes_used, 0);
347 });
348 }
349
350 #[test]
351 #[cfg(unix)]
352 fn store_and_load_fragment() {
353 with_temp_home_storage(|| {
354 let mut sm = StorageManager::init("net".into(), "svc".into(), 1_000_000).unwrap();
355 let frags = rlnc::encode(CHUNK, 4, 6).unwrap();
356
357 sm.store_fragment(&frags[0]).unwrap();
358
359 let loaded = sm.load_fragment(&frags[0].chunk_id, &frags[0].id).unwrap();
360 assert_eq!(loaded.coding_vector, frags[0].coding_vector);
361 assert_eq!(loaded.data, frags[0].data);
362 assert_eq!(sm.meta.storage_bytes_used, frags[0].to_bytes().len() as u64);
363 });
364 }
365
366 #[test]
367 #[cfg(unix)]
368 fn quota_exceeded_returns_error() {
369 with_temp_home_storage(|| {
370 let mut sm = StorageManager::init("net".into(), "svc".into(), 1).unwrap();
372 let frags = rlnc::encode(CHUNK, 4, 4).unwrap();
373 assert!(sm.store_fragment(&frags[0]).is_err());
374 });
375 }
376
377 #[test]
378 #[cfg(unix)]
379 fn remove_fragment_frees_space() {
380 with_temp_home_storage(|| {
381 let mut sm = StorageManager::init("net".into(), "svc".into(), 1_000_000).unwrap();
382 let frags = rlnc::encode(CHUNK, 4, 4).unwrap();
383 sm.store_fragment(&frags[0]).unwrap();
384 let used = sm.meta.storage_bytes_used;
385 sm.remove_fragment(&frags[0].chunk_id, &frags[0].id)
386 .unwrap();
387 assert_eq!(sm.meta.storage_bytes_used, 0);
388 let _ = used; });
390 }
391
392 #[test]
393 #[cfg(unix)]
394 fn recode_chunk_produces_valid_fragments() {
395 with_temp_home_storage(|| {
396 let mut sm = StorageManager::init("net".into(), "svc".into(), 1_000_000).unwrap();
397 let k = 4;
398 let frags = rlnc::encode(CHUNK, k, k + 4).unwrap();
399
400 for f in &frags[..3] {
402 sm.store_fragment(f).unwrap();
403 }
404
405 let recoded = sm.recode_chunk(&frags[0].chunk_id, 2).unwrap();
406 assert_eq!(recoded.len(), 2);
407 assert_eq!(recoded[0].k, k);
408 });
409 }
410
411 #[test]
412 #[cfg(unix)]
413 fn load_rebuilds_index_from_disk() {
414 with_temp_home_storage(|| {
415 let network_id = "net";
416 let service_id = "svc";
417 let frags = rlnc::encode(CHUNK, 4, 4).unwrap();
418
419 {
420 let mut sm =
421 StorageManager::init(network_id.into(), service_id.into(), 1_000_000).unwrap();
422 sm.store_fragment(&frags[0]).unwrap();
423 sm.store_fragment(&frags[1]).unwrap();
424 }
425
426 let sm2 = StorageManager::load(network_id, service_id).unwrap();
428 assert_eq!(sm2.index.fragment_count(), 2);
429 assert_eq!(sm2.meta.storage_bytes_used, sm2.index.total_bytes());
430 });
431 }
432}