bp_core/network/
qos.rs

1//! Per-peer Quality-of-Service (QoS) tracking.
2//!
3//! Every known Pouch in the network gets a [`PeerQos`] record that is updated
4//! whenever the local node issues a Ping challenge or a Proof-of-Storage
5//! challenge and receives (or times out on) a response.
6//!
7//! ## Stability score
8//!
9//! The **stability score** `s ∈ [0.0, 1.0]` summarises two independent signals:
10//!
11//! ```text
12//! s(p) = w_lat · latency_score(rtt_ewma) + w_rel · challenge_success_ewma
13//!
14//! latency_score(rtt_ms) = exp(−rtt_ms / RTT_REF_MS)   (RTT_REF_MS = 500 ms)
15//! ```
16//!
17//! Default weights: `w_lat = 0.6`, `w_rel = 0.4`.
18//!
19//! ## EWMA smoothing
20//!
21//! | Signal          | Factor | Rationale                              |
22//! |-----------------|--------|----------------------------------------|
23//! | RTT             | α=0.125| TCP-standard (RFC 6298)                |
24//! | Challenge result| β=0.10 | Slower decay → stable reliability view |
25//!
26//! A new peer starts with `rtt_ewma_ms = None` (latency component = 0) and
27//! `challenge_success_ewma = 1.0` (benefit of the doubt, falls quickly on failure).
28
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32// ── Constants ─────────────────────────────────────────────────────────────────
33
34/// Reference RTT in ms: latency_score = 1.0 at 0 ms, 0.37 at RTT_REF_MS.
35const RTT_REF_MS: f64 = 500.0;
36
37/// EWMA smoothing factor for RTT (TCP-standard, RFC 6298).
38const ALPHA_RTT: f64 = 0.125;
39
40/// EWMA smoothing factor for challenge success rate (slower for stability).
41const BETA_CHALLENGE: f64 = 0.10;
42
43/// Weight of the latency component in the composite stability score.
44const W_LATENCY: f64 = 0.6;
45
46/// Weight of the reliability component in the composite stability score.
47const W_RELIABILITY: f64 = 0.4;
48
49// ── Fault-score thresholds ────────────────────────────────────────────────────
50
51/// Fault score (0–100) at which a peer is considered *degraded* — no new
52/// fragments will be assigned to it.
53pub const FAULT_DEGRADED: u8 = 70;
54
55/// Fault score at which a peer is *suspected* — preventive recoding begins.
56pub const FAULT_SUSPECTED: u8 = 90;
57
58/// Fault score at which a peer is *blacklisted* — evicted from routing and
59/// announced as blacklisted on gossip.
60pub const FAULT_BLACKLISTED: u8 = 100;
61
62/// Points added to `fault_score` per failed Proof-of-Storage challenge.
63const FAULT_INCREMENT: u8 = 5;
64
65/// Points subtracted from `fault_score` per successful PoS challenge.
66const FAULT_DECAY: u8 = 1;
67
68// ── PeerQos ───────────────────────────────────────────────────────────────────
69
70/// Live QoS data for a single remote Pouch peer.
71///
72/// Serialisable so that it can be persisted across daemon restarts or
73/// gossipped as part of a `ChallengeResult` announcement.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PeerQos {
76    /// libp2p PeerId (base58).
77    pub peer_id: String,
78
79    /// EWMA round-trip time in milliseconds.  `None` until the first Ping
80    /// response is received.
81    pub rtt_ewma_ms: Option<f64>,
82
83    /// EWMA of challenge success rate in `[0.0, 1.0]`.
84    /// Starts at `1.0` (benefit of the doubt).
85    pub challenge_success_ewma: f64,
86
87    /// Unix timestamp (seconds) of the most recent Ping exchange.
88    pub last_ping_at: Option<u64>,
89
90    /// Total Ping + PoS challenges sent.
91    pub challenges_sent: u64,
92
93    /// Total successful challenge responses received.
94    pub challenges_succeeded: u64,
95
96    /// Accumulated fault score in `[0, 100]`.
97    ///
98    /// Incremented by `FAULT_INCREMENT` on each failed Proof-of-Storage
99    /// challenge; decremented by `FAULT_DECAY` on each success.
100    /// At [`FAULT_DEGRADED`] (70) no new fragments are sent; at
101    /// [`FAULT_SUSPECTED`] (90) preventive recoding begins; at
102    /// [`FAULT_BLACKLISTED`] (100) the peer is evicted.
103    pub fault_score: u8,
104}
105
106impl PeerQos {
107    /// Create a fresh QoS record for a newly discovered peer.
108    pub fn new(peer_id: impl Into<String>) -> Self {
109        Self {
110            peer_id: peer_id.into(),
111            rtt_ewma_ms: None,
112            challenge_success_ewma: 1.0,
113            last_ping_at: None,
114            challenges_sent: 0,
115            challenges_succeeded: 0,
116            fault_score: 0,
117        }
118    }
119
120    // ── Measurements ─────────────────────────────────────────────────────
121
122    /// Incorporate a new RTT sample (milliseconds) into the EWMA.
123    ///
124    /// Also updates `last_ping_at` to the current UTC time.
125    pub fn record_ping(&mut self, rtt_ms: f64) {
126        self.rtt_ewma_ms = Some(match self.rtt_ewma_ms {
127            None => rtt_ms,
128            Some(prev) => ALPHA_RTT * rtt_ms + (1.0 - ALPHA_RTT) * prev,
129        });
130        self.last_ping_at = Some(chrono::Utc::now().timestamp() as u64);
131    }
132
133    /// Record the outcome of a Ping timeout (no response within deadline).
134    ///
135    /// The latency component decays toward 0 by increasing the EWMA with a
136    /// very large RTT value (10 × reference), signalling poor reachability.
137    pub fn record_ping_timeout(&mut self) {
138        self.record_ping(RTT_REF_MS * 10.0);
139    }
140
141    /// Incorporate the result of a challenge (Ping or Proof-of-Storage).
142    ///
143    /// `success = true`  → response was received and validated.
144    /// `success = false` → response timed out or proof was invalid.
145    pub fn record_challenge(&mut self, success: bool) {
146        let outcome = if success { 1.0_f64 } else { 0.0_f64 };
147        self.challenge_success_ewma =
148            BETA_CHALLENGE * outcome + (1.0 - BETA_CHALLENGE) * self.challenge_success_ewma;
149        self.challenges_sent += 1;
150        if success {
151            self.challenges_succeeded += 1;
152        }
153    }
154
155    /// Record a successful Proof-of-Storage response.
156    ///
157    /// Decays `fault_score` by `FAULT_DECAY` and records a successful
158    /// challenge in the reliability EWMA.
159    pub fn record_pos_success(&mut self) {
160        self.fault_score = self.fault_score.saturating_sub(FAULT_DECAY);
161        self.record_challenge(true);
162    }
163
164    /// Record a failed Proof-of-Storage response (timeout or invalid proof).
165    ///
166    /// Increments `fault_score` by `FAULT_INCREMENT` (capped at 100) and
167    /// records a failed challenge in the reliability EWMA.
168    pub fn record_pos_failure(&mut self) {
169        self.fault_score = self.fault_score.saturating_add(FAULT_INCREMENT).min(100);
170        self.record_challenge(false);
171    }
172
173    /// Current fault status derived from [`Self::fault_score`].
174    ///
175    /// Returns `"ok"`, `"degraded"`, `"suspected"`, or `"blacklisted"`.
176    pub fn fault_status(&self) -> &'static str {
177        match self.fault_score {
178            s if s >= FAULT_BLACKLISTED => "blacklisted",
179            s if s >= FAULT_SUSPECTED => "suspected",
180            s if s >= FAULT_DEGRADED => "degraded",
181            _ => "ok",
182        }
183    }
184
185    // ── Derived scores ────────────────────────────────────────────────────
186
187    /// Latency component of the stability score: `exp(−rtt_ewma / RTT_REF_MS)`.
188    ///
189    /// Returns `0.0` if no Ping has been recorded yet.
190    pub fn latency_score(&self) -> f64 {
191        self.rtt_ewma_ms
192            .map_or(0.0, |rtt| (-rtt / RTT_REF_MS).exp())
193    }
194
195    /// Composite stability score in `[0.0, 1.0]`.
196    ///
197    /// ```text
198    /// s = W_LATENCY · latency_score + W_RELIABILITY · challenge_success_ewma
199    /// ```
200    ///
201    /// This value feeds directly into `coding::params::compute_coding_params` to
202    /// determine the recovery threshold `k` for a target high probability `Ph`.
203    pub fn stability_score(&self) -> f64 {
204        W_LATENCY * self.latency_score() + W_RELIABILITY * self.challenge_success_ewma
205    }
206
207    /// Whether the peer has been pinged at all.
208    pub fn is_observed(&self) -> bool {
209        self.rtt_ewma_ms.is_some()
210    }
211}
212
213// ── QosRegistry ───────────────────────────────────────────────────────────────
214
215/// In-memory map of `peer_id → PeerQos` for the local node.
216///
217/// Held inside `DaemonState` under a `RwLock`:
218///
219/// ```ignore
220/// pub qos: RwLock<QosRegistry>,
221/// ```
222#[derive(Debug, Default)]
223pub struct QosRegistry {
224    peers: HashMap<String, PeerQos>,
225}
226
227impl QosRegistry {
228    pub fn new() -> Self {
229        Self::default()
230    }
231
232    /// Return (or lazily create) the QoS record for `peer_id`.
233    pub fn entry(&mut self, peer_id: impl Into<String>) -> &mut PeerQos {
234        let key = peer_id.into();
235        self.peers
236            .entry(key.clone())
237            .or_insert_with(|| PeerQos::new(key))
238    }
239
240    /// Read-only view of a peer's QoS; returns `None` if never seen.
241    pub fn get(&self, peer_id: &str) -> Option<&PeerQos> {
242        self.peers.get(peer_id)
243    }
244
245    /// Remove a peer (e.g. on explicit Farewell or eviction).
246    pub fn remove(&mut self, peer_id: &str) {
247        self.peers.remove(peer_id);
248    }
249
250    /// Stability score for `peer_id`.  Returns `0.0` if never seen.
251    pub fn stability_score(&self, peer_id: &str) -> f64 {
252        self.peers
253            .get(peer_id)
254            .map_or(0.0, PeerQos::stability_score)
255    }
256
257    /// Snapshot of all stability scores as a `Vec<f64>` (order not defined).
258    ///
259    /// This is the primary input to [`crate::coding::params::compute_coding_params`].
260    pub fn all_stability_scores(&self) -> Vec<f64> {
261        self.peers.values().map(PeerQos::stability_score).collect()
262    }
263
264    /// Stability scores only for Pouch peers whose `peer_id` appears in
265    /// `pouch_peer_ids` — used to scope the k-calculation to active Pouches
266    /// in a specific network, rather than all connected peers.
267    pub fn stability_scores_for<'a>(
268        &self,
269        pouch_peer_ids: impl Iterator<Item = &'a str>,
270    ) -> Vec<f64> {
271        pouch_peer_ids.map(|id| self.stability_score(id)).collect()
272    }
273
274    /// Total number of tracked peers.
275    pub fn peer_count(&self) -> usize {
276        self.peers.len()
277    }
278
279    /// All peer IDs currently tracked.
280    pub fn peer_ids(&self) -> impl Iterator<Item = &str> {
281        self.peers.keys().map(String::as_str)
282    }
283
284    /// Fault status of a peer: `"ok"`, `"degraded"`, `"suspected"`, or
285    /// `"blacklisted"`.  Returns `"ok"` if the peer has never been seen.
286    pub fn fault_status(&self, peer_id: &str) -> &'static str {
287        self.peers.get(peer_id).map_or("ok", PeerQos::fault_status)
288    }
289
290    /// Fault score of a peer in `[0, 100]`.  Returns `0` if never seen.
291    pub fn fault_score(&self, peer_id: &str) -> u8 {
292        self.peers.get(peer_id).map_or(0, |p| p.fault_score)
293    }
294}
295
296// ── Tests ─────────────────────────────────────────────────────────────────────
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn new_peer_has_zero_latency_score() {
304        let p = PeerQos::new("peer1");
305        assert_eq!(p.latency_score(), 0.0);
306        assert_eq!(p.challenge_success_ewma, 1.0);
307    }
308
309    #[test]
310    fn stability_score_perfect_peer() {
311        let mut p = PeerQos::new("peer1");
312        // 0 ms RTT → latency_score = 1.0
313        p.record_ping(0.0);
314        assert!((p.latency_score() - 1.0).abs() < 1e-9);
315        // stability = 0.6*1.0 + 0.4*1.0 = 1.0
316        assert!((p.stability_score() - 1.0).abs() < 1e-9);
317    }
318
319    #[test]
320    fn stability_score_at_rtt_ref() {
321        let mut p = PeerQos::new("peer1");
322        // After many samples of RTT_REF_MS the EWMA converges to RTT_REF_MS.
323        // Use a single perfect-convergence shortcut:
324        p.rtt_ewma_ms = Some(RTT_REF_MS);
325        // latency_score = exp(-1) ≈ 0.3679
326        let expected_lat = (-1.0_f64).exp();
327        assert!((p.latency_score() - expected_lat).abs() < 1e-9);
328    }
329
330    #[test]
331    fn challenge_failure_degrades_score() {
332        let mut p = PeerQos::new("peer1");
333        p.record_ping(10.0); // fast peer
334        let score_before = p.stability_score();
335        for _ in 0..20 {
336            p.record_challenge(false);
337        }
338        assert!(p.stability_score() < score_before);
339    }
340
341    #[test]
342    fn ping_timeout_degrades_latency() {
343        let mut p = PeerQos::new("peer1");
344        p.record_ping(10.0);
345        let lat_before = p.latency_score();
346        for _ in 0..20 {
347            p.record_ping_timeout();
348        }
349        assert!(p.latency_score() < lat_before);
350    }
351
352    #[test]
353    fn rtt_ewma_converges() {
354        let mut p = PeerQos::new("peer1");
355        // Feed many identical samples: EWMA should converge to that value.
356        for _ in 0..100 {
357            p.record_ping(200.0);
358        }
359        assert!((p.rtt_ewma_ms.unwrap() - 200.0).abs() < 1.0);
360    }
361
362    #[test]
363    fn registry_entry_creates_new_peer() {
364        let mut reg = QosRegistry::new();
365        reg.entry("p1").record_ping(50.0);
366        assert!(reg.get("p1").is_some());
367        assert_eq!(reg.peer_count(), 1);
368    }
369
370    #[test]
371    fn all_stability_scores_length() {
372        let mut reg = QosRegistry::new();
373        reg.entry("p1").record_ping(50.0);
374        reg.entry("p2").record_ping(100.0);
375        reg.entry("p3").record_ping(300.0);
376        assert_eq!(reg.all_stability_scores().len(), 3);
377    }
378}