bp_core/control/
server.rs

1//! Unix socket control server — runs inside the daemon.
2//! Each CLI invocation connects, sends one JSON request, reads one JSON response,
3//! then closes the connection.
4
5use crate::{
6    coding::{params as coding_params, rlnc},
7    control::protocol::{
8        ControlRequest, ControlResponse, FileEntry, FlockData, GetFileData, HatchData, InviteData,
9        ListFilesData, NetworkQosSummary, PouchStat, PutFileData, StatusData, StorageInfoData,
10    },
11    error::BpResult,
12    identity::Identity,
13    network::{
14        fragment_gossip::{FragmentIndexAnnouncement, FragmentPointer, RemoteFragmentIndex},
15        state::NodeInfo,
16        FragmentResponse, NetworkCommand, OutgoingAssignments, QosRegistry, ReputationStore,
17        StorageManagerMap,
18    },
19    service::{ServiceInfo, ServiceRegistry, ServiceStatus, ServiceType},
20    storage::{ChunkCipher, FileRegistry, StorageManager},
21};
22use libp2p::{Multiaddr, PeerId};
23use std::{
24    collections::HashMap,
25    path::Path,
26    sync::{Arc, RwLock},
27    time::{SystemTime, UNIX_EPOCH},
28};
29use tokio::{
30    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
31    net::{UnixListener, UnixStream},
32    sync::mpsc,
33};
34
35// ── CEK hint persistence ──────────────────────────────────────────────────────
36
37/// Load persisted CEK hints from `cek_hints.json`.
38///
39/// Returns an empty map if the file does not exist or cannot be parsed.
40/// Called once at daemon startup so files encrypted in previous sessions
41/// remain decryptable.
42pub fn load_cek_hints() -> HashMap<String, [u8; 32]> {
43    let path = match crate::config::cek_hints_path() {
44        Ok(p) => p,
45        Err(e) => {
46            tracing::warn!("cek_hints_path error: {e}");
47            return HashMap::new();
48        }
49    };
50    load_cek_hints_at(&path)
51}
52
53/// Core deserialization logic — reads from an explicit `path`.
54fn load_cek_hints_at(path: &std::path::Path) -> HashMap<String, [u8; 32]> {
55    if !path.exists() {
56        return HashMap::new();
57    }
58    let data = match std::fs::read_to_string(path) {
59        Ok(d) => d,
60        Err(e) => {
61            tracing::warn!("Failed to read cek_hints: {e}");
62            return HashMap::new();
63        }
64    };
65    let hex_map: HashMap<String, String> = match serde_json::from_str(&data) {
66        Ok(m) => m,
67        Err(e) => {
68            tracing::warn!("Failed to parse cek_hints: {e}");
69            return HashMap::new();
70        }
71    };
72    hex_map
73        .into_iter()
74        .filter_map(|(k, v)| {
75            let bytes = hex::decode(&v).ok()?;
76            if bytes.len() != 32 {
77                return None;
78            }
79            let mut arr = [0u8; 32];
80            arr.copy_from_slice(&bytes);
81            Some((k, arr))
82        })
83        .collect()
84}
85
86/// Persist CEK hints to `cek_hints.json`.
87///
88/// Failures are logged as warnings and silently ignored — a missing hint
89/// means the file cannot be decrypted after restart, but it does not
90/// corrupt any other state.
91fn persist_cek_hints(hints: &HashMap<String, [u8; 32]>) {
92    let path = match crate::config::cek_hints_path() {
93        Ok(p) => p,
94        Err(e) => {
95            tracing::warn!("cek_hints_path error: {e}");
96            return;
97        }
98    };
99    persist_cek_hints_at(&path, hints);
100}
101
102/// Core serialization logic — writes to an explicit `path`.
103fn persist_cek_hints_at(path: &std::path::Path, hints: &HashMap<String, [u8; 32]>) {
104    // Ensure the parent directory exists before writing.
105    if let Some(parent) = path.parent() {
106        if let Err(e) = std::fs::create_dir_all(parent) {
107            tracing::warn!("Failed to create cek_hints dir: {e}");
108            return;
109        }
110    }
111    let hex_map: HashMap<&str, String> = hints
112        .iter()
113        .map(|(k, v)| (k.as_str(), hex::encode(v)))
114        .collect();
115    match serde_json::to_string(&hex_map) {
116        Ok(json) => {
117            if let Err(e) = std::fs::write(path, json) {
118                tracing::warn!("Failed to write cek_hints: {e}");
119            }
120        }
121        Err(e) => tracing::warn!("Failed to serialize cek_hints: {e}"),
122    }
123}
124
125/// Shared daemon state accessible from the control server.
126pub struct DaemonState {
127    pub identity: Identity,
128    pub services: RwLock<ServiceRegistry>,
129    pub network_state: Arc<RwLock<crate::network::state::NetworkState>>,
130    pub networks: RwLock<Vec<String>>,
131    pub net_tx: mpsc::Sender<NetworkCommand>,
132    /// One `StorageManager` per active Pouch service, keyed by `service_id`.
133    ///
134    /// Shared with the network loop so incoming fragment-fetch requests can
135    /// be served directly from the P2P event handler.
136    pub storage_managers: StorageManagerMap,
137    /// Per-peer QoS data updated by the network quality monitor.
138    /// Used by `PutFile` to derive adaptive k/n from live stability scores.
139    pub qos: Arc<RwLock<QosRegistry>>,
140    /// Tracks which fragments were pushed to which remote Pouch peer.
141    ///
142    /// Updated by the network loop on every `PushFragment`; read by the
143    /// quality monitor to select Proof-of-Storage challenge targets.
144    pub outgoing_assignments: OutgoingAssignments,
145    /// In-memory index of `chunk_id → [(peer_id, fragment_id)]` learned from
146    /// gossipped [`FragmentIndexAnnouncement`]s.
147    ///
148    /// Populated when received from the network; used by `GetFile` to issue
149    /// targeted `FetchChunkFragments` requests instead of broadcasting to all
150    /// known Pouches.
151    pub remote_fragment_index: Arc<RwLock<RemoteFragmentIndex>>,
152    /// Maps `chunk_id → BLAKE3(plaintext_chunk)` so that `GetFile` can
153    /// re-derive the per-user CEK for decryption after RLNC decode.
154    ///
155    /// Populated by `PutFile`; persisted only for the daemon’s lifetime
156    /// (lost on restart — full persistence comes with the manifest system).
157    pub chunk_cek_hints: RwLock<HashMap<String, [u8; 32]>>,
158    /// Per-peer reputation state (tier R0–R4, score, uptime, PoS pass rate).
159    ///
160    /// Updated by the quality monitor and by eviction events.
161    pub reputation: RwLock<ReputationStore>,
162    /// Local catalogue of files uploaded by this node's Bill services.
163    ///
164    /// Populated on each successful `PutFile`.
165    /// Persisted to `file_registry.json`.
166    pub file_registry: RwLock<FileRegistry>,
167}
168
169/// Accept connections on the Unix socket and dispatch requests.
170pub async fn run_control_server(
171    socket_path: impl AsRef<Path>,
172    state: Arc<DaemonState>,
173) -> BpResult<()> {
174    // Remove stale socket from a previous run.
175    let path = socket_path.as_ref();
176    if path.exists() {
177        std::fs::remove_file(path).ok();
178    }
179
180    let listener =
181        UnixListener::bind(path).map_err(|e| crate::error::BpError::Control(e.to_string()))?;
182
183    tracing::info!("Control socket listening at {:?}", path);
184
185    loop {
186        match listener.accept().await {
187            Ok((stream, _)) => {
188                let state = Arc::clone(&state);
189                tokio::spawn(async move {
190                    if let Err(e) = handle_connection(stream, state).await {
191                        tracing::warn!("Control connection error: {}", e);
192                    }
193                });
194            }
195            Err(e) => {
196                tracing::error!("Accept error on control socket: {}", e);
197            }
198        }
199    }
200}
201
202async fn handle_connection(stream: UnixStream, state: Arc<DaemonState>) -> anyhow::Result<()> {
203    let (read_half, mut write_half) = stream.into_split();
204    let mut reader = BufReader::new(read_half);
205    let mut line = String::new();
206
207    reader.read_line(&mut line).await?;
208    let line = line.trim();
209    if line.is_empty() {
210        return Ok(());
211    }
212
213    let request: ControlRequest = match serde_json::from_str(line) {
214        Ok(r) => r,
215        Err(e) => {
216            let resp = ControlResponse::err(format!("Invalid request JSON: {}", e));
217            send_response(&mut write_half, &resp).await?;
218            return Ok(());
219        }
220    };
221
222    let response = dispatch(request, &state).await;
223    send_response(&mut write_half, &response).await?;
224    Ok(())
225}
226
227async fn send_response(
228    writer: &mut tokio::net::unix::OwnedWriteHalf,
229    resp: &ControlResponse,
230) -> anyhow::Result<()> {
231    let mut json = serde_json::to_string(resp)?;
232    json.push('\n');
233    writer.write_all(json.as_bytes()).await?;
234    Ok(())
235}
236
237/// Dispatch a `ControlRequest` to the appropriate handler.
238async fn dispatch(req: ControlRequest, state: &Arc<DaemonState>) -> ControlResponse {
239    match req {
240        // ── Ping ──────────────────────────────────────────────────────────
241        ControlRequest::Ping => ControlResponse::ok("pong"),
242
243        // ── Status ────────────────────────────────────────────────────────
244        ControlRequest::Status => {
245            let services = state.services.read().unwrap();
246            let networks = state.networks.read().unwrap();
247            let ns = state.network_state.read().unwrap();
248            let own_peer = state.identity.peer_id.to_string();
249
250            // Own reputation tier, score, and dynamic availability factor.
251            let (rep_tier, rep_score, avail_factor) = {
252                let (tier_str, score, tier_enum) = {
253                    let rep = state.reputation.read().unwrap();
254                    match rep.get(&own_peer) {
255                        Some(r) => (r.tier.to_string(), r.reputation_score, r.tier),
256                        None => (
257                            crate::network::ReputationTier::R1.to_string(),
258                            0i64,
259                            crate::network::ReputationTier::R1,
260                        ),
261                    }
262                };
263                // k/N: fraction of raw bid usable as recoverable content.
264                // N = remote Pouch peers + own node (score 0.4 if unobserved).
265                let factor = if tier_enum == crate::network::ReputationTier::R0 {
266                    0.0_f64
267                } else {
268                    let ph = tier_enum.qos_target_ph();
269                    let qos = state.qos.read().unwrap();
270                    let own_score = qos
271                        .get(&own_peer)
272                        .map(|q| q.stability_score())
273                        .unwrap_or(0.4);
274                    let mut stabilities = qos.all_stability_scores();
275                    stabilities.push(own_score);
276                    crate::coding::params::compute_network_storage_factor(&stabilities, ph)
277                };
278                (tier_str, score, factor)
279            };
280
281            // Per-Pouch storage stats.
282            let pouch_stats: Vec<crate::control::protocol::PouchStat> = {
283                let managers = state.storage_managers.read().unwrap();
284                services
285                    .all()
286                    .iter()
287                    .filter(|s| s.service_type == ServiceType::Pouch)
288                    .map(|s| {
289                        let tier_label = s.metadata.get("tier").and_then(|v| v.as_str()).map(|t| {
290                            format!(
291                                "{} — {}",
292                                t,
293                                match t {
294                                    "T1" => "Pebble",
295                                    "T2" => "Stone",
296                                    "T3" => "Boulder",
297                                    "T4" => "Rock",
298                                    "T5" => "Monolith",
299                                    _ => "?",
300                                }
301                            )
302                        });
303                        if let Some(sm_lock) = managers.get(&s.id) {
304                            let sm = sm_lock.read().unwrap();
305                            crate::control::protocol::PouchStat {
306                                service_id: s.id.clone(),
307                                network_id: s.network_id.clone(),
308                                storage_tier: tier_label,
309                                storage_bid_bytes: sm.meta.storage_bytes_bid,
310                                storage_used_bytes: sm.meta.storage_bytes_used,
311                                available_bytes: ((sm.meta.available_bytes() as f64) * avail_factor)
312                                    .round()
313                                    as u64,
314                            }
315                        } else {
316                            crate::control::protocol::PouchStat {
317                                service_id: s.id.clone(),
318                                network_id: s.network_id.clone(),
319                                storage_tier: tier_label,
320                                storage_bid_bytes: 0,
321                                storage_used_bytes: 0,
322                                available_bytes: 0,
323                            }
324                        }
325                    })
326                    .collect()
327            };
328
329            // Network QoS summary.
330            let qos_peer_count = state.qos.read().unwrap().peer_count();
331            let network_qos = {
332                let qos = state.qos.read().unwrap();
333                let rep = state.reputation.read().unwrap();
334                let peer_count = qos.peer_count();
335                if peer_count == 0 {
336                    None
337                } else {
338                    let scores = qos.all_stability_scores();
339                    let avg_stability = if scores.is_empty() {
340                        0.0
341                    } else {
342                        scores.iter().sum::<f64>() / scores.len() as f64
343                    };
344                    let mut tier_counts: std::collections::HashMap<String, usize> =
345                        std::collections::HashMap::new();
346                    for peer_id in qos.peer_ids() {
347                        let tier = rep.tier(peer_id).to_string();
348                        *tier_counts.entry(tier).or_insert(0) += 1;
349                    }
350                    Some(NetworkQosSummary {
351                        observed_peers: peer_count,
352                        avg_stability,
353                        tier_counts,
354                    })
355                }
356            };
357
358            let data = StatusData {
359                peer_id: own_peer,
360                fingerprint: state.identity.fingerprint.clone(),
361                alias: state.identity.profile.alias.clone(),
362                local_services: services.all().into_iter().cloned().collect(),
363                networks: networks.clone(),
364                known_peers: ns.len().max(qos_peer_count),
365                version: env!("CARGO_PKG_VERSION").to_string(),
366                reputation_tier: rep_tier,
367                reputation_score: rep_score,
368                pouch_stats,
369                network_qos,
370            };
371            ControlResponse::ok(data)
372        }
373
374        // ── AnnounceNow ───────────────────────────────────────────────────
375        ControlRequest::AnnounceNow => {
376            // Re-broadcast NodeInfo for every running service, then pause
377            // 2 s to let gossipsub deliver the messages to peers.
378            let svcs: Vec<(String, ServiceType, String)> = {
379                state
380                    .services
381                    .read()
382                    .unwrap()
383                    .all()
384                    .iter()
385                    .filter(|s| s.status == ServiceStatus::Running)
386                    .map(|s| (s.id.clone(), s.service_type, s.network_id.clone()))
387                    .collect()
388            };
389            if svcs.is_empty() {
390                tracing::debug!("AnnounceNow: no running services, skipping");
391            } else {
392                for (id, stype, net) in &svcs {
393                    announce_self(state, id, *stype, net).await;
394                }
395                tracing::debug!(
396                    "AnnounceNow: announced {} service(s), waiting 2s",
397                    svcs.len()
398                );
399                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
400            }
401            ControlResponse::ok("announced")
402        }
403
404        // ── Hatch ─────────────────────────────────────────────────────────
405        ControlRequest::Hatch {
406            service_type,
407            network_id,
408            metadata,
409        } => {
410            // Ensure we're subscribed to that network's gossip
411            let already_joined = {
412                let nets = state.networks.read().unwrap();
413                nets.contains(&network_id)
414            };
415            if !already_joined {
416                let _ = state
417                    .net_tx
418                    .send(NetworkCommand::JoinNetwork {
419                        network_id: network_id.clone(),
420                    })
421                    .await;
422                state.networks.write().unwrap().push(network_id.clone());
423            }
424
425            // Enforce one-Pouch-per-network: two correlated Pouches on the
426            // same node defeat distributed redundancy.
427            if service_type == ServiceType::Pouch {
428                let already_exists =
429                    state.services.read().unwrap().all().iter().any(|s| {
430                        s.service_type == ServiceType::Pouch && s.network_id == network_id
431                    });
432                if already_exists {
433                    return ControlResponse::err(format!(
434                        "a Pouch for network '{}' already exists on this node; \
435                         two correlated Pouches defeat distributed redundancy",
436                        network_id
437                    ));
438                }
439            }
440
441            let info = ServiceInfo::new(service_type, network_id.clone(), metadata.clone());
442            let service_id = info.id.clone();
443
444            // Set to Running immediately — scope the lock guard so it is dropped
445            // BEFORE the .await below (std::sync guards are !Send across await).
446            {
447                let mut reg = state.services.write().unwrap();
448                let mut running_info = info.clone();
449                running_info.status = ServiceStatus::Running;
450                reg.register(running_info);
451            } // ← lock released here
452
453            // Initialise a StorageManager when hatching a Pouch service.
454            if service_type == ServiceType::Pouch {
455                let quota = metadata
456                    .get("storage_bytes")
457                    .or_else(|| metadata.get("storage_bytes_bid"))
458                    .and_then(|v| v.as_u64())
459                    .unwrap_or(0);
460
461                match StorageManager::init(network_id.clone(), service_id.clone(), quota) {
462                    Ok(sm) => {
463                        state
464                            .storage_managers
465                            .write()
466                            .unwrap()
467                            .insert(service_id.clone(), Arc::new(RwLock::new(sm)));
468                        tracing::info!(service_id=%service_id, quota_bytes=%quota, "StorageManager initialised");
469                    }
470                    Err(e) => {
471                        tracing::warn!("Failed to init StorageManager for {}: {}", service_id, e);
472                    }
473                }
474            }
475
476            // Announce to the network (await-safe: no lock held)
477            announce_self(state, &service_id, service_type, &network_id).await;
478
479            ControlResponse::ok(HatchData {
480                service_id: service_id.clone(),
481                service_type,
482                network_id,
483                message: format!("🦤 {} service hatched — id: {}", service_type, service_id),
484            })
485        }
486
487        // ── Farewell ──────────────────────────────────────────────────────
488        ControlRequest::Farewell { service_id } => {
489            let mut reg = state.services.write().unwrap();
490            match reg.remove(&service_id) {
491                Some(info) => ControlResponse::ok(serde_json::json!({
492                    "service_id": info.id,
493                    "service_type": info.service_type,
494                    "message": format!("Service {} stopped", info.id),
495                })),
496                None => ControlResponse::err(format!("No service with id '{}'", service_id)),
497            }
498        }
499        // ── FarewellEvict ─────────────────────────────────────────────────
500        ControlRequest::FarewellEvict { service_id } => {
501            let now_secs = SystemTime::now()
502                .duration_since(UNIX_EPOCH)
503                .unwrap_or_default()
504                .as_secs();
505
506            // Look up the service.
507            let (network_id, service_type, peer_id_str) = {
508                let reg = state.services.read().unwrap();
509                match reg.get(&service_id) {
510                    Some(info) => (
511                        info.network_id.clone(),
512                        info.service_type,
513                        state.identity.peer_id.to_string(),
514                    ),
515                    None => {
516                        return ControlResponse::err(format!(
517                            "No service with id '{}'",
518                            service_id
519                        ));
520                    }
521                }
522            };
523
524            // Collect storage stats and purge disk (if Pouch).
525            let (chunk_count, fragment_count, bytes_freed) = {
526                let mut managers = state.storage_managers.write().unwrap();
527                if let Some(sm_lock) = managers.remove(&service_id) {
528                    let sm = sm_lock.read().unwrap();
529                    let summary = sm.storage_summary();
530                    if let Err(e) = sm.purge() {
531                        tracing::warn!("Failed to purge storage for {}: {}", service_id, e);
532                    }
533                    summary
534                } else {
535                    (0, 0, 0)
536                }
537            };
538
539            // Announce eviction via gossip before removing from registry.
540            {
541                let mut meta = HashMap::new();
542                meta.insert("evicting".to_string(), serde_json::Value::Bool(true));
543                let info = NodeInfo {
544                    peer_id: peer_id_str,
545                    user_fingerprint: state.identity.fingerprint.clone(),
546                    user_alias: state.identity.profile.alias.clone(),
547                    service_type,
548                    service_id: service_id.clone(),
549                    network_id: network_id.clone(),
550                    listen_addrs: vec![],
551                    announced_at: now_secs,
552                    metadata: meta,
553                };
554                if let Ok(payload) = serde_json::to_vec(&info) {
555                    let _ = state
556                        .net_tx
557                        .send(NetworkCommand::Announce {
558                            network_id: network_id.clone(),
559                            payload,
560                        })
561                        .await;
562                }
563            }
564
565            // Record reputation eviction.
566            {
567                let local_peer = state.identity.peer_id.to_string();
568                let mut rep = state.reputation.write().unwrap();
569                rep.get_or_create(&local_peer, now_secs)
570                    .evict_without_notice(now_secs);
571            }
572
573            // Remove from registry.
574            state.services.write().unwrap().remove(&service_id);
575
576            tracing::info!(
577                service_id = %service_id,
578                chunks = chunk_count,
579                fragments = fragment_count,
580                bytes = bytes_freed,
581                "Pouch evicted"
582            );
583
584            ControlResponse::ok(serde_json::json!({
585                "service_id": service_id,
586                "network_id": network_id,
587                "chunks_removed": chunk_count,
588                "fragments_removed": fragment_count,
589                "bytes_freed": bytes_freed,
590                "message": format!(
591                    "Pouch {} evicted — {} chunk(s), {} fragment(s), {} bytes freed. \
592                     Network peers will rebalance via Proof-of-Storage.",
593                    service_id, chunk_count, fragment_count, bytes_freed
594                ),
595            }))
596        }
597        // ── Pause ─────────────────────────────────────────────────────────
598        ControlRequest::Pause {
599            service_id,
600            eta_minutes,
601        } => {
602            let now_secs = SystemTime::now()
603                .duration_since(UNIX_EPOCH)
604                .unwrap_or_default()
605                .as_secs();
606
607            let (network_id, service_type) = {
608                let mut reg = state.services.write().unwrap();
609                match reg.get_mut(&service_id) {
610                    Some(info) => {
611                        if info.status == crate::service::ServiceStatus::Stopped
612                            || matches!(info.status, crate::service::ServiceStatus::Stopping)
613                        {
614                            return ControlResponse::err(format!(
615                                "Service '{}' is not running",
616                                service_id
617                            ));
618                        }
619                        info.status = crate::service::ServiceStatus::Paused {
620                            eta_minutes,
621                            paused_at: now_secs,
622                        };
623                        info.metadata
624                            .insert("maintenance".into(), serde_json::Value::Bool(true));
625                        info.metadata
626                            .insert("eta_minutes".into(), serde_json::Value::from(eta_minutes));
627                        (info.network_id.clone(), info.service_type)
628                    }
629                    None => {
630                        return ControlResponse::err(format!(
631                            "No service with id '{}'",
632                            service_id
633                        ));
634                    }
635                }
636            };
637
638            announce_self(state, &service_id, service_type, &network_id).await;
639
640            ControlResponse::ok(serde_json::json!({
641                "service_id": service_id,
642                "status": "paused",
643                "eta_minutes": eta_minutes,
644                "message": format!(
645                    "Service {} paused — announcing maintenance (ETA {} min)",
646                    service_id, eta_minutes
647                ),
648            }))
649        }
650
651        // ── Resume ────────────────────────────────────────────────────────
652        ControlRequest::Resume { service_id } => {
653            let (network_id, service_type) = {
654                let mut reg = state.services.write().unwrap();
655                match reg.get_mut(&service_id) {
656                    Some(info) => {
657                        if !matches!(info.status, crate::service::ServiceStatus::Paused { .. }) {
658                            return ControlResponse::err(format!(
659                                "Service '{}' is not paused",
660                                service_id
661                            ));
662                        }
663                        info.status = crate::service::ServiceStatus::Running;
664                        info.metadata.remove("maintenance");
665                        info.metadata.remove("eta_minutes");
666                        (info.network_id.clone(), info.service_type)
667                    }
668                    None => {
669                        return ControlResponse::err(format!(
670                            "No service with id '{}'",
671                            service_id
672                        ));
673                    }
674                }
675            };
676
677            announce_self(state, &service_id, service_type, &network_id).await;
678
679            ControlResponse::ok(serde_json::json!({
680                "service_id": service_id,
681                "status": "running",
682                "message": format!("Service {} resumed", service_id),
683            }))
684        }
685
686        // ── Flock ─────────────────────────────────────────────────────────
687        ControlRequest::Flock => {
688            let services = state.services.read().unwrap();
689            let networks = state.networks.read().unwrap();
690            let ns = state.network_state.read().unwrap();
691            let data = FlockData {
692                local_services: services.all().into_iter().cloned().collect(),
693                known_peers: ns.all().into_iter().cloned().collect(),
694                networks: networks.clone(),
695                peer_count: ns.len(),
696            };
697            ControlResponse::ok(data)
698        }
699
700        // ── Join ──────────────────────────────────────────────────────────
701        ControlRequest::Join { network_id } => {
702            {
703                let nets = state.networks.read().unwrap();
704                if nets.contains(&network_id) {
705                    return ControlResponse::err(format!(
706                        "Already a member of network '{}'",
707                        network_id
708                    ));
709                }
710            }
711            // Ensure a NetworkMetaKey exists for this network.
712            // If none was installed via an invite token, we generate a fresh
713            // random one (this node is creating the network).
714            if let Err(e) = crate::storage::manifest::NetworkMetaKey::load_or_create(&network_id) {
715                tracing::warn!(network = %network_id, "Failed to ensure NetworkMetaKey: {e}");
716            }
717            let _ = state
718                .net_tx
719                .send(NetworkCommand::JoinNetwork {
720                    network_id: network_id.clone(),
721                })
722                .await;
723            state.networks.write().unwrap().push(network_id.clone());
724            ControlResponse::ok(serde_json::json!({
725                "network_id": network_id,
726                "message": format!("Joined network '{}'", network_id),
727            }))
728        }
729
730        // ── Leave ─────────────────────────────────────────────────────────
731        ControlRequest::Leave { network_id, force } => {
732            // Collect blocking services on this network.
733            let blocking: Vec<ServiceInfo> = {
734                let reg = state.services.read().unwrap();
735                reg.all()
736                    .iter()
737                    .filter(|s| s.network_id == network_id)
738                    .map(|s| (*s).clone())
739                    .collect()
740            };
741
742            if !blocking.is_empty() && !force {
743                // v1: return helpful error with per-service hints.
744                let blocking_json: Vec<serde_json::Value> = blocking
745                    .iter()
746                    .map(|s| {
747                        let hint = if s.service_type == ServiceType::Pouch {
748                            format!("bp farewell {} --evict", s.id)
749                        } else {
750                            format!("bp farewell {}", s.id)
751                        };
752                        serde_json::json!({
753                            "id":   s.id,
754                            "type": s.service_type.to_string(),
755                            "hint": hint,
756                        })
757                    })
758                    .collect();
759                return ControlResponse::ok(serde_json::json!({
760                    "network_id": network_id,
761                    "blocked": true,
762                    "blocking_services": blocking_json,
763                    "message": format!(
764                        "Cannot leave '{}': {} active service(s) must be stopped first \
765                         (see 'blocking_services' for commands, or use --force to auto-evict)",
766                        network_id,
767                        blocking.len()
768                    ),
769                }));
770            }
771
772            // force=true (or no blocking services): auto-evict then leave.
773            let now_secs = SystemTime::now()
774                .duration_since(UNIX_EPOCH)
775                .unwrap_or_default()
776                .as_secs();
777            let mut evicted: Vec<serde_json::Value> = Vec::new();
778
779            for svc in &blocking {
780                let service_id = svc.id.clone();
781                let service_type = svc.service_type;
782                let peer_id_str = state.identity.peer_id.to_string();
783
784                if service_type == ServiceType::Pouch {
785                    // Purge on-disk storage.
786                    {
787                        let mut managers = state.storage_managers.write().unwrap();
788                        if let Some(sm_lock) = managers.remove(&service_id) {
789                            let sm = sm_lock.read().unwrap();
790                            if let Err(e) = sm.purge() {
791                                tracing::warn!(
792                                    "leave --force: failed to purge storage for {}: {}",
793                                    service_id,
794                                    e
795                                );
796                            }
797                        }
798                    }
799                    // Gossip eviction notice before removing from registry.
800                    {
801                        let mut meta = HashMap::new();
802                        meta.insert("evicting".to_string(), serde_json::Value::Bool(true));
803                        let info = NodeInfo {
804                            peer_id: peer_id_str.clone(),
805                            user_fingerprint: state.identity.fingerprint.clone(),
806                            user_alias: state.identity.profile.alias.clone(),
807                            service_type,
808                            service_id: service_id.clone(),
809                            network_id: network_id.clone(),
810                            listen_addrs: vec![],
811                            announced_at: now_secs,
812                            metadata: meta,
813                        };
814                        if let Ok(payload) = serde_json::to_vec(&info) {
815                            let _ = state
816                                .net_tx
817                                .send(NetworkCommand::Announce {
818                                    network_id: network_id.clone(),
819                                    payload,
820                                })
821                                .await;
822                        }
823                    }
824                    // Record reputation eviction.
825                    {
826                        let mut rep = state.reputation.write().unwrap();
827                        rep.get_or_create(&peer_id_str, now_secs)
828                            .evict_without_notice(now_secs);
829                    }
830                    tracing::info!(
831                        service_id = %service_id,
832                        network_id = %network_id,
833                        "Pouch auto-evicted by leave --force"
834                    );
835                } else {
836                    tracing::info!(
837                        service_id = %service_id,
838                        service_type = %service_type,
839                        network_id = %network_id,
840                        "service stopped by leave --force"
841                    );
842                }
843
844                // Remove from registry regardless of type.
845                state.services.write().unwrap().remove(&service_id);
846                evicted.push(serde_json::json!({
847                    "service_id":   service_id,
848                    "service_type": service_type.to_string(),
849                    "evicted":       service_type == ServiceType::Pouch,
850                }));
851            }
852
853            // Unsubscribe gossip and remove from active_networks.
854            let _ = state
855                .net_tx
856                .send(NetworkCommand::LeaveNetwork {
857                    network_id: network_id.clone(),
858                })
859                .await;
860            state.networks.write().unwrap().retain(|n| n != &network_id);
861
862            let msg = if evicted.is_empty() {
863                format!("Left network '{}'", network_id)
864            } else {
865                format!(
866                    "Left network '{}' — {} service(s) auto-evicted",
867                    network_id,
868                    evicted.len()
869                )
870            };
871            ControlResponse::ok(serde_json::json!({
872                "network_id": network_id,
873                "services_auto_evicted": evicted,
874                "message": msg,
875            }))
876        }
877
878        // ── ConnectRelay ──────────────────────────────────────────────────
879        ControlRequest::ConnectRelay { relay_addr } => {
880            let addr: Multiaddr = match relay_addr.parse() {
881                Ok(a) => a,
882                Err(e) => {
883                    return ControlResponse::err(format!("Invalid relay multiaddr: {}", e));
884                }
885            };
886            let _ = state
887                .net_tx
888                .send(NetworkCommand::DialRelay { relay_addr: addr })
889                .await;
890            ControlResponse::ok(serde_json::json!({
891                "relay_addr": relay_addr,
892                "message": format!("Dialing relay '{}'", relay_addr),
893            }))
894        }
895        // ── CreateInvite ──────────────────────────────────────────────────
896        ControlRequest::CreateInvite {
897            network_id,
898            invitee_fingerprint,
899            invite_password,
900            ttl_hours,
901        } => {
902            let ttl = ttl_hours.unwrap_or(24);
903            let expires_at =
904                (chrono::Utc::now() + chrono::Duration::hours(ttl as i64)).timestamp() as u64;
905            match crate::invite::create_invite(
906                &state.identity,
907                &network_id,
908                invitee_fingerprint,
909                ttl,
910                &invite_password,
911            ) {
912                Ok(blob) => ControlResponse::ok(InviteData {
913                    blob,
914                    network_id,
915                    expires_at,
916                    inviter_fingerprint: state.identity.fingerprint.clone(),
917                }),
918                Err(e) => ControlResponse::err(format!("Failed to create invite: {e}")),
919            }
920        }
921        // ── PutFile ───────────────────────────────────────────────────────
922        ControlRequest::PutFile {
923            chunk_data,
924            ph,
925            q_target,
926            network_id,
927            file_name,
928        } => {
929            let ph = ph.unwrap_or(0.999);
930            let q_target = q_target.unwrap_or(1.0);
931
932            // Find a Pouch StorageManager for this network.
933            let managers_snap: Vec<(String, Arc<RwLock<StorageManager>>)> = {
934                let map = state.storage_managers.read().unwrap();
935                map.iter()
936                    .map(|(id, sm)| (id.clone(), Arc::clone(sm)))
937                    .collect()
938            };
939
940            // Filter to managers whose network matches (or any if network_id is empty).
941            let candidates: Vec<_> = managers_snap
942                .iter()
943                .filter(|(_, sm)| {
944                    let meta = &sm.read().unwrap().meta;
945                    network_id.is_empty() || meta.network_id == network_id
946                })
947                .collect();
948
949            // bill/post nodes have no local Pouch — they act as pure coordinators
950            // that encode the chunk and distribute all fragments to remote Pouches.
951            // We only fail if there are neither local nor remote Pouches.
952            let has_local_pouch = !candidates.is_empty();
953
954            // ── Compute adaptive k/n from live QoS data ──────────────────────
955            let pouch_peer_ids: Vec<String> = {
956                let ns = state.network_state.read().unwrap();
957                ns.in_network(&network_id)
958                    .into_iter()
959                    .filter(|n| n.service_type == ServiceType::Pouch)
960                    .map(|n| n.peer_id.clone())
961                    .collect()
962            };
963
964            // For peers not yet observed via Ping, assume 0.8 stability
965            // (benefit of the doubt for new entries).
966            let stabilities: Vec<f64> = {
967                let qos = state.qos.read().unwrap();
968                if pouch_peer_ids.is_empty() {
969                    vec![0.8; candidates.len().max(1)]
970                } else {
971                    pouch_peer_ids
972                        .iter()
973                        .map(|pid| match qos.get(pid) {
974                            Some(p) if p.is_observed() => p.stability_score(),
975                            _ => 0.8,
976                        })
977                        .collect()
978                }
979            };
980
981            let (k, n, q, pe) =
982                match coding_params::compute_coding_params(&stabilities, ph, q_target) {
983                    Ok(p) => {
984                        let pe = coding_params::effective_recovery_probability(&stabilities, p.k);
985                        (p.k, p.n, p.q, pe)
986                    }
987                    Err(e) => {
988                        tracing::warn!("compute_coding_params failed ({e}), using fallback");
989                        let peer_n = stabilities.len().max(2);
990                        let fallback_k = (peer_n / 2).max(1);
991                        let fallback_q = (peer_n - fallback_k) as f64 / fallback_k as f64;
992                        let pe =
993                            coding_params::effective_recovery_probability(&stabilities, fallback_k);
994                        (fallback_k, peer_n, fallback_q, pe)
995                    }
996                };
997
998            tracing::info!(
999                network = %network_id, k = %k, n = %n, q = %q,
1000                ph = %ph, pe = %pe, peers = %stabilities.len(),
1001                "PutFile: computed coding params"
1002            );
1003
1004            // Encrypt the chunk with a per-user CEK before RLNC encoding.
1005            // The CEK is derived from the owner’s secret material and the
1006            // BLAKE3 hash of the plaintext — Pouch nodes never see plaintext.
1007            let plaintext_hash: [u8; 32] = *blake3::hash(&chunk_data).as_bytes();
1008            let secret_mat = state.identity.secret_material();
1009            let cipher = ChunkCipher::for_user(&secret_mat, &plaintext_hash);
1010            let encrypted_chunk = match cipher.encrypt(&chunk_data) {
1011                Ok(b) => b,
1012                Err(e) => return ControlResponse::err(format!("Chunk encryption error: {e}")),
1013            };
1014
1015            let fragments = match rlnc::encode(&encrypted_chunk, k, n) {
1016                Ok(f) => f,
1017                Err(e) => return ControlResponse::err(format!("Encode error: {e}")),
1018            };
1019
1020            let chunk_id = match fragments.first() {
1021                Some(f) => f.chunk_id.clone(),
1022                None => return ControlResponse::err("Encoding produced no fragments"),
1023            };
1024
1025            // Store the plaintext hash so GetFile can re-derive the CEK.
1026            state
1027                .chunk_cek_hints
1028                .write()
1029                .unwrap()
1030                .insert(chunk_id.clone(), plaintext_hash);
1031            // Persist hints so the CEK survives a daemon restart.
1032            {
1033                let hints = state.chunk_cek_hints.read().unwrap();
1034                persist_cek_hints(&hints);
1035            }
1036
1037            // Store all fragments locally if a local Pouch is available.
1038            let mut stored = 0usize;
1039            if has_local_pouch {
1040                let (_, sm_arc) = candidates[0];
1041                let mut sm = sm_arc.write().unwrap();
1042                for fragment in &fragments {
1043                    match sm.store_fragment(fragment) {
1044                        Ok(()) => stored += 1,
1045                        Err(e) => {
1046                            tracing::warn!(chunk_id=%chunk_id, "store_fragment failed: {e}");
1047                            break;
1048                        }
1049                    }
1050                }
1051            } // write lock released before async work
1052
1053            // ── Distribute fragments to remote Pouch peers (round-robin) ────
1054            let remote_pouches: Vec<PeerId> = {
1055                let ns = state.network_state.read().unwrap();
1056                let local_peer = state.identity.peer_id.to_string();
1057                ns.in_network(&network_id)
1058                    .into_iter()
1059                    .filter(|n| n.service_type == ServiceType::Pouch && n.peer_id != local_peer)
1060                    .filter_map(|n| n.peer_id.parse::<PeerId>().ok())
1061                    .collect()
1062            };
1063
1064            // Fail only if there is nowhere to store fragments at all.
1065            if stored == 0 && remote_pouches.is_empty() {
1066                return ControlResponse::err(format!(
1067                    "No Pouches available on network '{}' — hatch a Pouch or wait for Pouch peers to join",
1068                    network_id
1069                ));
1070            }
1071
1072            let mut distributed = 0usize;
1073            let mut index_pointers: Vec<FragmentPointer> = Vec::new();
1074            if !remote_pouches.is_empty() {
1075                for (i, fragment) in fragments.iter().enumerate() {
1076                    let peer = remote_pouches[i % remote_pouches.len()];
1077                    if state
1078                        .net_tx
1079                        .send(NetworkCommand::PushFragment {
1080                            peer_id: peer,
1081                            chunk_id: chunk_id.clone(),
1082                            fragment_id: fragment.id.clone(),
1083                            data: fragment.to_bytes(),
1084                        })
1085                        .await
1086                        .is_ok()
1087                    {
1088                        distributed += 1;
1089                        index_pointers.push(FragmentPointer {
1090                            peer_id: peer.to_string(),
1091                            fragment_id: fragment.id.clone(),
1092                        });
1093                    }
1094                }
1095            }
1096
1097            // Publish the fragment-index announcement so peers learn which
1098            // Pouch holds each fragment.
1099            if !index_pointers.is_empty() {
1100                let ann = FragmentIndexAnnouncement {
1101                    network_id: network_id.clone(),
1102                    chunk_id: chunk_id.clone(),
1103                    announced_at: chrono::Utc::now().timestamp() as u64,
1104                    pointers: index_pointers,
1105                };
1106                if let Ok(payload) = serde_json::to_vec(&ann) {
1107                    let _ = state
1108                        .net_tx
1109                        .send(NetworkCommand::AnnounceIndex {
1110                            network_id: network_id.clone(),
1111                            payload,
1112                        })
1113                        .await;
1114                    // Also update our own local index eagerly.
1115                    if let Ok(mut idx) = state.remote_fragment_index.write() {
1116                        idx.upsert(ann);
1117                    }
1118                }
1119            }
1120
1121            tracing::info!(
1122                chunk_id=%chunk_id, stored=%stored, distributed=%distributed,
1123                total=%fragments.len(), "PutFile stored + distributed"
1124            );
1125
1126            // Record the upload in the local file registry so `bp ls` can list it.
1127            if let Some(name) = file_name {
1128                let entry = crate::storage::StoredFileEntry {
1129                    file_name: name,
1130                    size_bytes: chunk_data.len() as u64,
1131                    chunk_id: chunk_id.clone(),
1132                    network_id: network_id.clone(),
1133                    uploaded_at: chrono::Utc::now().timestamp() as u64,
1134                };
1135                if let Ok(path) = crate::config::file_registry_path() {
1136                    state
1137                        .file_registry
1138                        .write()
1139                        .unwrap()
1140                        .insert_and_save(entry, &path);
1141                }
1142            }
1143
1144            ControlResponse::ok(PutFileData {
1145                chunk_id: chunk_id.clone(),
1146                k,
1147                n,
1148                q,
1149                ph,
1150                pe,
1151                fragments_stored: stored,
1152                fragments_distributed: distributed,
1153                message: format!(
1154                    "Stored {stored}/{total} locally, pushed {distributed} to {peers} remote pouch(es) — chunk_id: {chunk_id}",
1155                    total = fragments.len(),
1156                    peers = remote_pouches.len(),
1157                ),
1158            })
1159        }
1160
1161        // ── GetFile ───────────────────────────────────────────────────────
1162        ControlRequest::GetFile {
1163            chunk_id,
1164            network_id,
1165        } => {
1166            let managers_snap: Vec<Arc<RwLock<StorageManager>>> = {
1167                let map = state.storage_managers.read().unwrap();
1168                map.values()
1169                    .filter(|sm| {
1170                        let meta = &sm.read().unwrap().meta;
1171                        network_id.is_empty() || meta.network_id == network_id
1172                    })
1173                    .map(Arc::clone)
1174                    .collect()
1175            };
1176
1177            // bill/post nodes have no local Pouch: skip the local load step and
1178            // go straight to remote fragment fetching.
1179            let remote_only = managers_snap.is_empty();
1180
1181            // Collect fragments from all matching local managers.
1182            let mut all_fragments = Vec::new();
1183            for sm_arc in &managers_snap {
1184                let sm = sm_arc.read().unwrap();
1185                let metas = sm.index.fragments_for_chunk(&chunk_id).to_vec();
1186                for meta in metas {
1187                    match sm.load_fragment(&chunk_id, &meta.fragment_id) {
1188                        Ok(f) => all_fragments.push(f),
1189                        Err(e) => tracing::warn!("load_fragment failed: {e}"),
1190                    }
1191                }
1192            }
1193
1194            let local_count = all_fragments.len();
1195
1196            // Determine k from existing fragments (if any) to know if we need more.
1197            let k_needed = all_fragments.first().map(|f| f.k).unwrap_or(0);
1198
1199            // ── Fetch from remote Pouches if local fragments < k ────────────
1200            // When there is no local Pouch (remote_only), always attempt remote fetch
1201            // since k_needed == 0 would otherwise skip this block entirely.
1202            let mut fragments_remote = 0usize;
1203            if remote_only || (k_needed > 0 && all_fragments.len() < k_needed) {
1204                let local_peer = state.identity.peer_id.to_string();
1205
1206                // Build list of remote peers to query: prefer the fragment
1207                // index (targeted fetch) and fall back to broadcasting to all
1208                // known Pouches if the index has no entries for this chunk.
1209                let target_peers: Vec<PeerId> = {
1210                    let idx = state.remote_fragment_index.read().unwrap();
1211                    let ptrs = idx.pointers_for(&chunk_id);
1212                    if !ptrs.is_empty() {
1213                        // Deduplicate peer IDs from the index.
1214                        let mut seen = std::collections::HashSet::new();
1215                        ptrs.iter()
1216                            .filter(|p| p.peer_id != local_peer)
1217                            .filter_map(|p| {
1218                                if seen.insert(p.peer_id.clone()) {
1219                                    p.peer_id.parse::<PeerId>().ok()
1220                                } else {
1221                                    None
1222                                }
1223                            })
1224                            .collect()
1225                    } else {
1226                        // Fallback: broadcast to all known Pouches in the network.
1227                        let ns = state.network_state.read().unwrap();
1228                        ns.in_network(&network_id)
1229                            .into_iter()
1230                            .filter(|n| {
1231                                n.service_type == ServiceType::Pouch && n.peer_id != local_peer
1232                            })
1233                            .filter_map(|n| n.peer_id.parse::<PeerId>().ok())
1234                            .collect()
1235                    }
1236                };
1237
1238                // Send FetchChunkFragments to each target peer.
1239                let mut receivers = Vec::new();
1240                for peer in &target_peers {
1241                    let (tx, rx) = tokio::sync::oneshot::channel();
1242                    if state
1243                        .net_tx
1244                        .send(NetworkCommand::FetchChunkFragments {
1245                            peer_id: *peer,
1246                            chunk_id: chunk_id.clone(),
1247                            resp_tx: tx,
1248                        })
1249                        .await
1250                        .is_ok()
1251                    {
1252                        receivers.push(rx);
1253                    }
1254                }
1255
1256                // Await responses with a timeout.
1257                use crate::coding::rlnc::EncodedFragment;
1258                for rx in receivers {
1259                    match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
1260                        Ok(Ok(FragmentResponse::FoundMany { fragments })) => {
1261                            for (frag_id, bytes) in fragments {
1262                                if all_fragments.len() >= k_needed {
1263                                    break;
1264                                }
1265                                match EncodedFragment::from_bytes(frag_id, chunk_id.clone(), &bytes)
1266                                {
1267                                    Ok(f) => {
1268                                        all_fragments.push(f);
1269                                        fragments_remote += 1;
1270                                    }
1271                                    Err(e) => tracing::warn!("Remote fragment parse error: {e}"),
1272                                }
1273                            }
1274                        }
1275                        Ok(Ok(FragmentResponse::Found { data })) => {
1276                            if all_fragments.len() < k_needed {
1277                                let frag_id = uuid::Uuid::new_v4().to_string();
1278                                match EncodedFragment::from_bytes(frag_id, chunk_id.clone(), &data)
1279                                {
1280                                    Ok(f) => {
1281                                        all_fragments.push(f);
1282                                        fragments_remote += 1;
1283                                    }
1284                                    Err(e) => tracing::warn!("Remote fragment parse error: {e}"),
1285                                }
1286                            }
1287                        }
1288                        Ok(Ok(_)) => {} // NotFound, Stored, StoreFailed — skip
1289                        Ok(Err(_)) => tracing::debug!("Remote fetch: oneshot dropped"),
1290                        Err(_) => tracing::debug!("Remote fetch: timeout"),
1291                    }
1292                    if all_fragments.len() >= k_needed {
1293                        break;
1294                    }
1295                }
1296            }
1297
1298            if all_fragments.is_empty() {
1299                return ControlResponse::err(format!(
1300                    "Chunk '{chunk_id}' not found locally or on remote peers"
1301                ));
1302            }
1303
1304            let fragments_used = all_fragments.len();
1305            match rlnc::decode(&all_fragments) {
1306                Ok(encrypted_data) => {
1307                    // Decrypt the recovered chunk using the per-user CEK.
1308                    // Re-derive the CEK from the stored plaintext hash hint.
1309                    let plaintext_hash = {
1310                        state
1311                            .chunk_cek_hints
1312                            .read()
1313                            .unwrap()
1314                            .get(&chunk_id)
1315                            .copied()
1316                    };
1317                    let data = match plaintext_hash {
1318                        Some(ph) => {
1319                            let secret_mat = state.identity.secret_material();
1320                            let cipher = ChunkCipher::for_user(&secret_mat, &ph);
1321                            match cipher.decrypt(&encrypted_data) {
1322                                Ok(d) => d,
1323                                Err(e) => {
1324                                    return ControlResponse::err(format!(
1325                                        "Chunk decryption failed: {e}"
1326                                    ))
1327                                }
1328                            }
1329                        }
1330                        None => {
1331                            return ControlResponse::err(format!(
1332                                "CEK hint not found for chunk '{}' — daemon may have restarted",
1333                                chunk_id
1334                            ))
1335                        }
1336                    };
1337                    tracing::info!(
1338                        chunk_id=%chunk_id, local=%local_count,
1339                        remote=%fragments_remote, total=%fragments_used,
1340                        "GetFile decoded + decrypted"
1341                    );
1342                    ControlResponse::ok(GetFileData {
1343                        chunk_id,
1344                        data,
1345                        fragments_used,
1346                        fragments_remote,
1347                    })
1348                }
1349                Err(e) => ControlResponse::err(format!("Decode failed: {e}")),
1350            }
1351        }
1352
1353        // ── StorageInfo ───────────────────────────────────────────────────
1354        ControlRequest::StorageInfo { network_id } => {
1355            let managers = state.storage_managers.read().unwrap();
1356            let services = state.services.read().unwrap();
1357            let registry = state.file_registry.read().unwrap();
1358
1359            // k/N availability factor: dynamic, from current network QoS.
1360            let avail_factor = {
1361                let own_peer = state.identity.peer_id.to_string();
1362                let tier_enum = {
1363                    let rep = state.reputation.read().unwrap();
1364                    match rep.get(&own_peer) {
1365                        Some(r) => r.tier,
1366                        None => crate::network::ReputationTier::R1,
1367                    }
1368                };
1369                if tier_enum == crate::network::ReputationTier::R0 {
1370                    0.0_f64
1371                } else {
1372                    let ph = tier_enum.qos_target_ph();
1373                    let qos = state.qos.read().unwrap();
1374                    let own_score = qos
1375                        .get(&own_peer)
1376                        .map(|q| q.stability_score())
1377                        .unwrap_or(0.4);
1378                    let mut stabilities = qos.all_stability_scores();
1379                    stabilities.push(own_score);
1380                    crate::coding::params::compute_network_storage_factor(&stabilities, ph)
1381                }
1382            };
1383
1384            let pouches: Vec<PouchStat> = services
1385                .all()
1386                .iter()
1387                .filter(|s| {
1388                    s.service_type == ServiceType::Pouch
1389                        && (network_id.is_empty() || s.network_id == network_id)
1390                })
1391                .map(|s| {
1392                    let tier_label = s.metadata.get("tier").and_then(|v| v.as_str()).map(|t| {
1393                        format!(
1394                            "{} — {}",
1395                            t,
1396                            match t {
1397                                "T1" => "Pebble",
1398                                "T2" => "Stone",
1399                                "T3" => "Boulder",
1400                                "T4" => "Rock",
1401                                "T5" => "Monolith",
1402                                _ => "?",
1403                            }
1404                        )
1405                    });
1406                    if let Some(sm_lock) = managers.get(&s.id) {
1407                        let sm = sm_lock.read().unwrap();
1408                        PouchStat {
1409                            service_id: s.id.clone(),
1410                            network_id: s.network_id.clone(),
1411                            storage_tier: tier_label,
1412                            storage_bid_bytes: sm.meta.storage_bytes_bid,
1413                            storage_used_bytes: sm.meta.storage_bytes_used,
1414                            available_bytes: ((sm.meta.available_bytes() as f64) * avail_factor)
1415                                .round() as u64,
1416                        }
1417                    } else {
1418                        PouchStat {
1419                            service_id: s.id.clone(),
1420                            network_id: s.network_id.clone(),
1421                            storage_tier: tier_label,
1422                            storage_bid_bytes: 0,
1423                            storage_used_bytes: 0,
1424                            available_bytes: 0,
1425                        }
1426                    }
1427                })
1428                .collect();
1429
1430            let total_bid_bytes: u64 = pouches.iter().map(|p| p.storage_bid_bytes).sum();
1431            let total_used_bytes: u64 = pouches.iter().map(|p| p.storage_used_bytes).sum();
1432            let total_available_bytes: u64 = pouches.iter().map(|p| p.available_bytes).sum();
1433
1434            let net_filter = if network_id.is_empty() {
1435                ""
1436            } else {
1437                &network_id
1438            };
1439            let file_entries = registry.list(net_filter);
1440            let total_files_uploaded = file_entries.len();
1441            let total_uploaded_bytes: u64 = file_entries.iter().map(|e| e.size_bytes).sum();
1442
1443            ControlResponse::ok(StorageInfoData {
1444                pouches,
1445                total_bid_bytes,
1446                total_used_bytes,
1447                total_available_bytes,
1448                total_files_uploaded,
1449                total_uploaded_bytes,
1450            })
1451        }
1452
1453        // ── ListFiles ─────────────────────────────────────────────────────
1454        ControlRequest::ListFiles { network_id } => {
1455            let registry = state.file_registry.read().unwrap();
1456            let net_filter = if network_id.is_empty() {
1457                ""
1458            } else {
1459                &network_id
1460            };
1461            let entries: Vec<FileEntry> = registry
1462                .list(net_filter)
1463                .into_iter()
1464                .map(|e| FileEntry {
1465                    file_name: e.file_name.clone(),
1466                    size_bytes: e.size_bytes,
1467                    chunk_id: e.chunk_id.clone(),
1468                    network_id: e.network_id.clone(),
1469                    uploaded_at: e.uploaded_at,
1470                })
1471                .collect();
1472            let total_bytes = entries.iter().map(|e| e.size_bytes).sum();
1473            let total_files = entries.len();
1474            ControlResponse::ok(ListFilesData {
1475                files: entries,
1476                network_id: network_id.clone(),
1477                total_files,
1478                total_bytes,
1479            })
1480        }
1481    }
1482}
1483
1484/// Build a `NodeInfo` from the current daemon state for a given service and
1485/// broadcast it via gossipsub.
1486async fn announce_self(
1487    state: &Arc<DaemonState>,
1488    service_id: &str,
1489    service_type: ServiceType,
1490    network_id: &str,
1491) {
1492    let listen_addrs: Vec<String> = vec![]; // Populated once we know our Multiaddrs
1493
1494    let info = NodeInfo {
1495        peer_id: state.identity.peer_id.to_string(),
1496        user_fingerprint: state.identity.fingerprint.clone(),
1497        user_alias: state.identity.profile.alias.clone(),
1498        service_type,
1499        service_id: service_id.to_string(),
1500        network_id: network_id.to_string(),
1501        listen_addrs,
1502        announced_at: chrono::Utc::now().timestamp() as u64,
1503        metadata: HashMap::new(),
1504    };
1505
1506    match serde_json::to_vec(&info) {
1507        Ok(payload) => {
1508            let _ = state
1509                .net_tx
1510                .send(NetworkCommand::Announce {
1511                    network_id: network_id.to_string(),
1512                    payload,
1513                })
1514                .await;
1515        }
1516        Err(e) => tracing::warn!("Failed to serialize NodeInfo: {}", e),
1517    }
1518}
1519
1520// ─────────────────────────────────────────────────────────────────────────────
1521// Unit tests
1522// ─────────────────────────────────────────────────────────────────────────────
1523
1524#[cfg(test)]
1525mod tests {
1526    use super::*;
1527    use std::collections::HashMap;
1528
1529    /// `load_cek_hints_at` on a non-existent path returns an empty map.
1530    #[test]
1531    fn load_cek_hints_missing_file_returns_empty() {
1532        let dir = tempdir();
1533        let path = dir.join("cek_hints.json");
1534        let hints = load_cek_hints_at(&path);
1535        assert!(hints.is_empty(), "expected empty map for missing file");
1536    }
1537
1538    /// `persist_cek_hints_at` + `load_cek_hints_at` round-trip.
1539    #[test]
1540    fn cek_hints_persist_load_roundtrip() {
1541        let dir = tempdir();
1542        let path = dir.join("cek_hints.json");
1543
1544        let mut hints: HashMap<String, [u8; 32]> = HashMap::new();
1545        let key_a = *b"abcdefghijklmnopqrstuvwxyz012345";
1546        let key_b = *b"ABCDEFGHIJKLMNOPQRSTUVWXYZ678901";
1547        hints.insert("chunk-aaa".to_string(), key_a);
1548        hints.insert("chunk-bbb".to_string(), key_b);
1549
1550        persist_cek_hints_at(&path, &hints);
1551        let loaded = load_cek_hints_at(&path);
1552
1553        assert_eq!(loaded.len(), 2, "wrong number of hints loaded");
1554        assert_eq!(loaded["chunk-aaa"], key_a);
1555        assert_eq!(loaded["chunk-bbb"], key_b);
1556    }
1557
1558    /// `load_cek_hints_at` with a corrupt JSON file returns an empty map without panicking.
1559    #[test]
1560    fn load_cek_hints_corrupt_file_returns_empty() {
1561        let dir = tempdir();
1562        let path = dir.join("cek_hints.json");
1563        std::fs::write(&path, b"NOT_VALID_JSON").unwrap();
1564
1565        let hints = load_cek_hints_at(&path);
1566        assert!(hints.is_empty(), "corrupt file must yield empty map");
1567    }
1568
1569    // ── helpers ───────────────────────────────────────────────────────────
1570
1571    fn tempdir() -> std::path::PathBuf {
1572        let mut path = std::env::temp_dir();
1573        path.push(format!("bp_test_{}", uuid::Uuid::new_v4()));
1574        std::fs::create_dir_all(&path).unwrap();
1575        path
1576    }
1577}