bp_core/network/
quality_monitor.rs

1//! Network quality monitor — Ping and Proof-of-Storage challenge loops.
2//!
3//! ## What it does
4//!
5//! The monitor runs as a background Tokio task spawned by the daemon.
6//!
7//! **Ping loop (every 60 s)**:
8//! 1. Reads the live [`NetworkState`] to enumerate all known Pouch peers.
9//! 2. Sends a [`NetworkCommand::Ping`] for each Pouch (fire-and-forget per peer).
10//! 3. Each Ping waits up to 5 seconds for a Pong from the network loop.
11//! 4. On success: records the RTT into [`QosRegistry`] via
12//!    [`crate::network::qos::PeerQos::record_ping`] and
13//!    [`crate::network::qos::PeerQos::record_challenge`].
14//! 5. On timeout: records a timeout via
15//!    [`crate::network::qos::PeerQos::record_ping_timeout`] and a failed challenge.
16//!
17//! **Proof-of-Storage loop (every 300 s)**:
18//! 1. For each Pouch peer that has entries in [`OutgoingAssignments`]:
19//!    picks one random fragment ID assigned to that peer.
20//! 2. Sends a [`NetworkCommand::ProofOfStorage`] challenge.
21//! 3. Waits up to 10 s for a `bool` proof result.
22//! 4. On success: calls [`crate::network::qos::PeerQos::record_pos_success`].
23//! 5. On failure/timeout: calls
24//!    [`crate::network::qos::PeerQos::record_pos_failure`] — increments
25//!    `fault_score` toward the `degraded`/`suspected`/`blacklisted` thresholds.
26//!
27//! The `QosRegistry` populated here is consumed by `PutFile` in the control
28//! server to compute adaptive `k`/`n` coding parameters.
29//!
30//! ## Concurrency model
31//!
32//! All per-peer tasks within a round are spawned concurrently so a slow or
33//! dead peer does not delay the others.
34
35use crate::{
36    coding::rlnc,
37    coding::rlnc::EncodedFragment,
38    network::{
39        qos::QosRegistry, state::NetworkState, FragmentResponse, NetworkCommand,
40        OutgoingAssignments, OutgoingFragment, FAULT_DEGRADED, FAULT_SUSPECTED,
41    },
42    service::ServiceType,
43};
44use libp2p::PeerId;
45use std::sync::{Arc, RwLock};
46use std::time::{Duration, SystemTime, UNIX_EPOCH};
47use tokio::sync::mpsc;
48
49// ── Configuration constants ───────────────────────────────────────────────────
50
51/// How often (in seconds) a full Ping round is run against all known Pouches.
52const PING_INTERVAL_SECS: u64 = 60;
53
54/// Maximum time (in seconds) to wait for a Pong before declaring a timeout.
55const PING_TIMEOUT_SECS: u64 = 5;
56
57/// How often (in seconds) a Proof-of-Storage round is run.
58const POS_INTERVAL_SECS: u64 = 300;
59
60/// Maximum time (in seconds) to wait for a PoS proof response.
61const POS_TIMEOUT_SECS: u64 = 10;
62
63// ── Entry point ───────────────────────────────────────────────────────────────
64
65/// Run the quality monitor loop indefinitely.
66///
67/// Spawned by [`crate::daemon::run_daemon`] as a background Tokio task.
68///
69/// # Arguments
70///
71/// - `network_state`        — shared view of all known peers (read-only here).
72/// - `qos`                  — shared QoS registry updated after every challenge.
73/// - `net_tx`               — channel to submit network commands.
74/// - `outgoing_assignments` — map of `peer_id → fragments pushed to that peer`.
75pub async fn run_quality_monitor(
76    network_state: Arc<RwLock<NetworkState>>,
77    qos: Arc<RwLock<QosRegistry>>,
78    net_tx: mpsc::Sender<NetworkCommand>,
79    outgoing_assignments: OutgoingAssignments,
80) {
81    let mut ping_interval = tokio::time::interval(Duration::from_secs(PING_INTERVAL_SECS));
82    // Skip the first (immediate) tick so the monitor waits a full interval
83    // after daemon startup before the first round.
84    ping_interval.reset();
85
86    let mut pos_interval = tokio::time::interval(Duration::from_secs(POS_INTERVAL_SECS));
87    pos_interval.reset();
88
89    tracing::info!(
90        ping_interval_secs = PING_INTERVAL_SECS,
91        pos_interval_secs = POS_INTERVAL_SECS,
92        "Quality monitor started"
93    );
94
95    loop {
96        tokio::select! {
97            _ = ping_interval.tick() => {
98                run_ping_round(&network_state, &qos, &net_tx).await;
99            }
100            _ = pos_interval.tick() => {
101                run_pos_round(&outgoing_assignments, &network_state, &qos, &net_tx).await;
102            }
103        }
104    }
105}
106
107// ── Ping round ────────────────────────────────────────────────────────────────
108
109async fn run_ping_round(
110    network_state: &Arc<RwLock<NetworkState>>,
111    qos: &Arc<RwLock<QosRegistry>>,
112    net_tx: &mpsc::Sender<NetworkCommand>,
113) {
114    // Collect Pouch peer IDs from the current NetworkState snapshot.
115    let pouch_peers: Vec<(String, PeerId)> = {
116        let ns = match network_state.read() {
117            Ok(g) => g,
118            Err(e) => {
119                tracing::warn!("quality_monitor: NetworkState lock poisoned: {e}");
120                return;
121            }
122        };
123        ns.all()
124            .into_iter()
125            .filter(|n| n.service_type == ServiceType::Pouch)
126            .filter_map(|n| {
127                n.peer_id
128                    .parse::<PeerId>()
129                    .ok()
130                    .map(|pid| (n.peer_id.clone(), pid))
131            })
132            .collect()
133    };
134
135    if pouch_peers.is_empty() {
136        tracing::debug!("quality_monitor: no Pouch peers known, skipping Ping round");
137        return;
138    }
139
140    tracing::debug!(
141        peers = pouch_peers.len(),
142        "quality_monitor: starting Ping round"
143    );
144
145    let mut handles = Vec::with_capacity(pouch_peers.len());
146    for (peer_id_str, peer_id) in pouch_peers {
147        let qos = Arc::clone(qos);
148        let net_tx = net_tx.clone();
149        let handle = tokio::spawn(async move {
150            ping_one_peer(peer_id_str, peer_id, qos, net_tx).await;
151        });
152        handles.push(handle);
153    }
154
155    for h in handles {
156        let _ = h.await;
157    }
158
159    tracing::debug!("quality_monitor: Ping round complete");
160}
161
162// ── PoS round ─────────────────────────────────────────────────────────────────
163
164async fn run_pos_round(
165    outgoing_assignments: &OutgoingAssignments,
166    network_state: &Arc<RwLock<NetworkState>>,
167    qos: &Arc<RwLock<QosRegistry>>,
168    net_tx: &mpsc::Sender<NetworkCommand>,
169) {
170    // Snapshot the current assignments (peer_id → random fragment to challenge).
171    let challenges: Vec<(String, PeerId, String, String)> = {
172        let assignments = match outgoing_assignments.read() {
173            Ok(g) => g,
174            Err(e) => {
175                tracing::warn!("quality_monitor: outgoing_assignments lock poisoned: {e}");
176                return;
177            }
178        };
179        assignments
180            .iter()
181            .filter_map(|(peer_id_str, fragments)| {
182                if fragments.is_empty() {
183                    return None;
184                }
185                // Pick a random fragment index using a simple modular approach.
186                let idx = SystemTime::now()
187                    .duration_since(UNIX_EPOCH)
188                    .unwrap_or_default()
189                    .subsec_nanos() as usize
190                    % fragments.len();
191                let frag = &fragments[idx];
192                peer_id_str.parse::<PeerId>().ok().map(|pid| {
193                    (
194                        peer_id_str.clone(),
195                        pid,
196                        frag.chunk_id.clone(),
197                        frag.fragment_id.clone(),
198                    )
199                })
200            })
201            .collect()
202    };
203
204    if challenges.is_empty() {
205        tracing::debug!("quality_monitor: no outgoing fragments, skipping PoS round");
206        return;
207    }
208
209    tracing::debug!(
210        peers = challenges.len(),
211        "quality_monitor: starting PoS round"
212    );
213
214    let mut handles = Vec::with_capacity(challenges.len());
215    for (peer_id_str, peer_id, chunk_id, fragment_id) in challenges.iter().cloned() {
216        let qos = Arc::clone(qos);
217        let net_tx = net_tx.clone();
218        let handle = tokio::spawn(async move {
219            pos_one_peer(peer_id_str, peer_id, chunk_id, fragment_id, qos, net_tx).await;
220        });
221        handles.push(handle);
222    }
223
224    for h in handles {
225        let _ = h.await;
226    }
227
228    tracing::debug!("quality_monitor: PoS round complete");
229
230    // ── Preventive rerouting ────────────────────────────────────────────────
231    // After all PoS results are recorded, check if any challenged peer has
232    // crossed the FAULT_SUSPECTED threshold.  If so, recode its assigned
233    // fragments and push new copies to a healthy peer.
234    let suspected: Vec<(String, PeerId)> = challenges
235        .iter()
236        .filter(|(peer_id_str, _, _, _)| {
237            qos.read()
238                .map(|g| g.fault_score(peer_id_str) >= FAULT_SUSPECTED)
239                .unwrap_or(false)
240        })
241        .map(|(peer_id_str, peer_id, _, _)| (peer_id_str.clone(), *peer_id))
242        // Deduplicate by peer_id_str.
243        .fold(Vec::<(String, PeerId)>::new(), |mut acc, (s, p)| {
244            if !acc.iter().any(|(x, _)| x == &s) {
245                acc.push((s, p));
246            }
247            acc
248        });
249
250    for (peer_id_str, peer_id) in suspected {
251        reroute_suspected_peer(
252            &peer_id_str,
253            peer_id,
254            outgoing_assignments,
255            network_state,
256            qos,
257            net_tx,
258        )
259        .await;
260    }
261}
262
263// ── Preventive rerouting ──────────────────────────────────────────────────────
264
265/// Recode fragments assigned to `suspected_peer` and push new copies to a
266/// healthy peer when `fault_score` has crossed [`FAULT_SUSPECTED`].
267///
268/// Steps
269/// 1. Collect unique `chunk_id`s assigned to the suspected peer.
270/// 2. Find a healthy Pouch whose `fault_score < FAULT_DEGRADED`.
271/// 3. For each chunk: fetch all fragments → [`rlnc::recode`] (1 new fragment)
272///    → [`NetworkCommand::PushFragment`] to the healthy peer → record the new
273///    assignment in [`OutgoingAssignments`].
274async fn reroute_suspected_peer(
275    suspected_peer_str: &str,
276    suspected_peer: PeerId,
277    outgoing: &OutgoingAssignments,
278    network_state: &Arc<RwLock<NetworkState>>,
279    qos: &Arc<RwLock<QosRegistry>>,
280    net_tx: &mpsc::Sender<NetworkCommand>,
281) {
282    // 1. Collect unique chunk_ids assigned to the suspected peer.
283    let chunks: Vec<String> = {
284        let guard = match outgoing.read() {
285            Ok(g) => g,
286            Err(_) => return,
287        };
288        guard
289            .get(suspected_peer_str)
290            .cloned()
291            .unwrap_or_default()
292            .iter()
293            .map(|f| f.chunk_id.clone())
294            .collect::<std::collections::HashSet<_>>()
295            .into_iter()
296            .collect()
297    };
298
299    if chunks.is_empty() {
300        tracing::debug!(
301            peer = %suspected_peer_str,
302            "reroute: no assigned chunks, nothing to reroute"
303        );
304        return;
305    }
306
307    // 2. Find a healthy replacement peer (not the suspected one, fault < FAULT_DEGRADED).
308    let healthy_peer: Option<(String, PeerId)> = {
309        let ns = match network_state.read() {
310            Ok(g) => g,
311            Err(_) => return,
312        };
313        let qos_g = match qos.read() {
314            Ok(g) => g,
315            Err(_) => return,
316        };
317        ns.all()
318            .into_iter()
319            .filter(|n| n.service_type == ServiceType::Pouch)
320            .filter(|n| n.peer_id != suspected_peer_str)
321            .filter(|n| qos_g.fault_score(&n.peer_id) < FAULT_DEGRADED)
322            .filter_map(|n| {
323                n.peer_id
324                    .parse::<PeerId>()
325                    .ok()
326                    .map(|pid| (n.peer_id.clone(), pid))
327            })
328            .next()
329    };
330
331    let (healthy_str, healthy_pid) = match healthy_peer {
332        Some(p) => p,
333        None => {
334            tracing::warn!(
335                peer = %suspected_peer_str,
336                "reroute: no healthy peer available, aborting rerouting"
337            );
338            return;
339        }
340    };
341
342    tracing::info!(
343        suspected = %suspected_peer_str,
344        healthy   = %healthy_str,
345        chunks    = chunks.len(),
346        "reroute: starting preventive fragment rerouting"
347    );
348
349    // 3. For each chunk: fetch → recode → push.
350    for chunk_id in chunks {
351        let (tx, rx) = tokio::sync::oneshot::channel();
352        if net_tx
353            .send(NetworkCommand::FetchChunkFragments {
354                peer_id: suspected_peer,
355                chunk_id: chunk_id.clone(),
356                resp_tx: tx,
357            })
358            .await
359            .is_err()
360        {
361            tracing::debug!(chunk = %chunk_id, "reroute: net_tx closed");
362            continue;
363        }
364
365        match tokio::time::timeout(Duration::from_secs(POS_TIMEOUT_SECS), rx).await {
366            Ok(Ok(FragmentResponse::FoundMany { fragments })) if !fragments.is_empty() => {
367                let encoded: Vec<EncodedFragment> = fragments
368                    .iter()
369                    .filter_map(|(fid, bytes)| {
370                        EncodedFragment::from_bytes(fid.clone(), chunk_id.clone(), bytes).ok()
371                    })
372                    .collect();
373
374                if encoded.is_empty() {
375                    tracing::debug!(chunk = %chunk_id, "reroute: all fragments unparseable");
376                    continue;
377                }
378
379                match rlnc::recode(&encoded, 1) {
380                    Ok(mut recoded) => {
381                        if let Some(new_frag) = recoded.pop() {
382                            let frag_id = new_frag.id.clone();
383                            let _ = net_tx
384                                .send(NetworkCommand::PushFragment {
385                                    peer_id: healthy_pid,
386                                    chunk_id: chunk_id.clone(),
387                                    fragment_id: frag_id.clone(),
388                                    data: new_frag.to_bytes(),
389                                })
390                                .await;
391
392                            if let Ok(mut oa) = outgoing.write() {
393                                oa.entry(healthy_str.clone())
394                                    .or_default()
395                                    .push(OutgoingFragment {
396                                        chunk_id: chunk_id.clone(),
397                                        fragment_id: frag_id,
398                                    });
399                            }
400
401                            tracing::info!(
402                                chunk   = %chunk_id,
403                                healthy = %healthy_str,
404                                "reroute: recoded fragment pushed to healthy peer"
405                            );
406                        }
407                    }
408                    Err(e) => {
409                        tracing::debug!(
410                            chunk = %chunk_id,
411                            err   = %e,
412                            "reroute: recode failed"
413                        );
414                    }
415                }
416            }
417            _ => {
418                tracing::debug!(
419                    chunk = %chunk_id,
420                    peer  = %suspected_peer_str,
421                    "reroute: fragment fetch failed or timed out"
422                );
423            }
424        }
425    }
426}
427
428// ── Per-peer PoS helper ───────────────────────────────────────────────────────
429
430async fn pos_one_peer(
431    peer_id_str: String,
432    peer_id: PeerId,
433    chunk_id: String,
434    fragment_id: String,
435    qos: Arc<RwLock<QosRegistry>>,
436    net_tx: mpsc::Sender<NetworkCommand>,
437) {
438    let nonce = SystemTime::now()
439        .duration_since(UNIX_EPOCH)
440        .unwrap_or_default()
441        .as_millis() as u64;
442
443    let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<bool>();
444
445    if net_tx
446        .send(NetworkCommand::ProofOfStorage {
447            peer_id,
448            chunk_id: chunk_id.clone(),
449            fragment_id: fragment_id.clone(),
450            nonce,
451            resp_tx,
452        })
453        .await
454        .is_err()
455    {
456        tracing::warn!(peer = %peer_id_str, "quality_monitor: net_tx closed (PoS)");
457        return;
458    }
459
460    match tokio::time::timeout(Duration::from_secs(POS_TIMEOUT_SECS), resp_rx).await {
461        Ok(Ok(true)) => {
462            tracing::debug!(
463                peer = %peer_id_str,
464                chunk = %chunk_id,
465                frag = %fragment_id,
466                "PoS challenge: proof received"
467            );
468            if let Ok(mut qos_guard) = qos.write() {
469                qos_guard.entry(peer_id_str).record_pos_success();
470            }
471        }
472        Ok(Ok(false)) | Ok(Err(_)) => {
473            tracing::debug!(
474                peer = %peer_id_str,
475                chunk = %chunk_id,
476                frag = %fragment_id,
477                "PoS challenge: proof failed or fragment not found"
478            );
479            record_pos_failure(&peer_id_str, &qos);
480        }
481        Err(_) => {
482            tracing::debug!(
483                peer = %peer_id_str,
484                "PoS challenge: timeout (>{POS_TIMEOUT_SECS}s)"
485            );
486            record_pos_failure(&peer_id_str, &qos);
487        }
488    }
489}
490
491fn record_pos_failure(peer_id_str: &str, qos: &Arc<RwLock<QosRegistry>>) {
492    if let Ok(mut qos_guard) = qos.write() {
493        qos_guard.entry(peer_id_str).record_pos_failure();
494    }
495}
496
497async fn ping_one_peer(
498    peer_id_str: String,
499    peer_id: PeerId,
500    qos: Arc<RwLock<QosRegistry>>,
501    net_tx: mpsc::Sender<NetworkCommand>,
502) {
503    let sent_at_ms = SystemTime::now()
504        .duration_since(UNIX_EPOCH)
505        .unwrap_or_default()
506        .as_millis() as u64;
507
508    let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<u64>();
509
510    // Send the Ping command to the network loop.
511    if net_tx
512        .send(NetworkCommand::Ping {
513            peer_id,
514            sent_at_ms,
515            resp_tx,
516        })
517        .await
518        .is_err()
519    {
520        tracing::warn!(peer = %peer_id_str, "quality_monitor: net_tx closed");
521        return;
522    }
523
524    // Wait for the Pong (RTT in ms) or time out.
525    match tokio::time::timeout(Duration::from_secs(PING_TIMEOUT_SECS), resp_rx).await {
526        Ok(Ok(rtt_ms)) => {
527            tracing::debug!(peer = %peer_id_str, rtt_ms = %rtt_ms, "Ping OK");
528            if let Ok(mut qos_guard) = qos.write() {
529                let entry = qos_guard.entry(peer_id_str.clone());
530                entry.record_ping(rtt_ms as f64);
531                entry.record_challenge(true);
532            }
533        }
534        Ok(Err(_)) => {
535            // Oneshot dropped by network loop (peer unreachable / send failed).
536            tracing::debug!(peer = %peer_id_str, "Ping: no response (channel dropped)");
537            record_timeout(&peer_id_str, &qos);
538        }
539        Err(_) => {
540            tracing::debug!(peer = %peer_id_str, "Ping: timeout (>{PING_TIMEOUT_SECS}s)");
541            record_timeout(&peer_id_str, &qos);
542        }
543    }
544}
545
546fn record_timeout(peer_id_str: &str, qos: &Arc<RwLock<QosRegistry>>) {
547    if let Ok(mut qos_guard) = qos.write() {
548        let entry = qos_guard.entry(peer_id_str);
549        entry.record_ping_timeout();
550        entry.record_challenge(false);
551    }
552}
553
554// ── Tests ─────────────────────────────────────────────────────────────────────
555
556#[cfg(test)]
557mod tests {
558    use super::*;
559    use crate::network::OutgoingFragment;
560
561    #[test]
562    fn record_timeout_degrades_score() {
563        let qos = Arc::new(RwLock::new(QosRegistry::new()));
564        // Prime with a good ping.
565        {
566            let mut g = qos.write().unwrap();
567            let e = g.entry("p1");
568            e.record_ping(10.0);
569            e.record_challenge(true);
570        }
571        let score_before = qos.read().unwrap().stability_score("p1");
572
573        // Record several timeouts.
574        for _ in 0..5 {
575            record_timeout("p1", &qos);
576        }
577        let score_after = qos.read().unwrap().stability_score("p1");
578        assert!(score_after < score_before, "timeout should degrade score");
579    }
580
581    #[test]
582    fn new_peer_gets_timeout_entry() {
583        let qos = Arc::new(RwLock::new(QosRegistry::new()));
584        record_timeout("new-peer", &qos);
585        let score = qos.read().unwrap().stability_score("new-peer");
586        // After one timeout: challenge_success < 1.0, latency still 0.0
587        // → stability < 0.4 (pure reliability component capped by β=0.10)
588        assert!(
589            score < 0.95,
590            "new peer after one timeout should have degraded score"
591        );
592    }
593
594    #[test]
595    fn pos_failure_increments_fault_score() {
596        let qos = Arc::new(RwLock::new(QosRegistry::new()));
597        // Initial fault score is 0.
598        assert_eq!(qos.read().unwrap().fault_score("p1"), 0);
599
600        for _ in 0..3 {
601            record_pos_failure("p1", &qos);
602        }
603        let fault = qos.read().unwrap().fault_score("p1");
604        assert!(fault > 0, "fault score should increase after PoS failures");
605    }
606
607    #[test]
608    fn pos_success_decays_fault_score() {
609        let qos = Arc::new(RwLock::new(QosRegistry::new()));
610        // Prime with some failures.
611        for _ in 0..4 {
612            record_pos_failure("p1", &qos);
613        }
614        let fault_after_failures = qos.read().unwrap().fault_score("p1");
615
616        // A success should decay the fault score.
617        {
618            let mut g = qos.write().unwrap();
619            g.entry("p1").record_pos_success();
620        }
621        let fault_after_success = qos.read().unwrap().fault_score("p1");
622        assert!(
623            fault_after_success < fault_after_failures,
624            "PoS success should decay fault score"
625        );
626    }
627
628    /// Ensure `run_quality_monitor` can be started and does not panic
629    /// immediately when there are no peers and no assignments.
630    #[tokio::test]
631    async fn monitor_noop_with_no_peers() {
632        let ns = Arc::new(RwLock::new(NetworkState::new()));
633        let qos = Arc::new(RwLock::new(QosRegistry::new()));
634        let (net_tx, _net_rx) = mpsc::channel(8);
635        let outgoing = Arc::new(RwLock::new(std::collections::HashMap::new()));
636
637        // Run the monitor for a very short time — it should not panic.
638        tokio::time::timeout(
639            Duration::from_millis(200),
640            run_quality_monitor(ns, qos, net_tx, outgoing),
641        )
642        .await
643        .ok(); // timeout is expected
644    }
645
646    #[test]
647    fn outgoing_assignment_structure() {
648        // Verify we can build OutgoingAssignments and look up entries.
649        let outgoing: OutgoingAssignments = Arc::new(RwLock::new(std::collections::HashMap::new()));
650        {
651            let mut g = outgoing.write().unwrap();
652            g.entry("peer-1".into())
653                .or_default()
654                .push(OutgoingFragment {
655                    chunk_id: "chunk-a".into(),
656                    fragment_id: "frag-1".into(),
657                });
658        }
659        let g = outgoing.read().unwrap();
660        assert_eq!(g.get("peer-1").unwrap().len(), 1);
661        assert_eq!(g.get("peer-1").unwrap()[0].chunk_id, "chunk-a");
662    }
663}