bp_core/network/qos.rs
1//! Per-peer Quality-of-Service (QoS) tracking.
2//!
3//! Every known Pouch in the network gets a [`PeerQos`] record that is updated
4//! whenever the local node issues a Ping challenge or a Proof-of-Storage
5//! challenge and receives (or times out on) a response.
6//!
7//! ## Stability score
8//!
9//! The **stability score** `s ∈ [0.0, 1.0]` summarises two independent signals:
10//!
11//! ```text
12//! s(p) = w_lat · latency_score(rtt_ewma) + w_rel · challenge_success_ewma
13//!
14//! latency_score(rtt_ms) = exp(−rtt_ms / RTT_REF_MS) (RTT_REF_MS = 500 ms)
15//! ```
16//!
17//! Default weights: `w_lat = 0.6`, `w_rel = 0.4`.
18//!
19//! ## EWMA smoothing
20//!
21//! | Signal | Factor | Rationale |
22//! |-----------------|--------|----------------------------------------|
23//! | RTT | α=0.125| TCP-standard (RFC 6298) |
24//! | Challenge result| β=0.10 | Slower decay → stable reliability view |
25//!
26//! A new peer starts with `rtt_ewma_ms = None` (latency component = 0) and
27//! `challenge_success_ewma = 1.0` (benefit of the doubt, falls quickly on failure).
28
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32// ── Constants ─────────────────────────────────────────────────────────────────
33
34/// Reference RTT in ms: latency_score = 1.0 at 0 ms, 0.37 at RTT_REF_MS.
35const RTT_REF_MS: f64 = 500.0;
36
37/// EWMA smoothing factor for RTT (TCP-standard, RFC 6298).
38const ALPHA_RTT: f64 = 0.125;
39
40/// EWMA smoothing factor for challenge success rate (slower for stability).
41const BETA_CHALLENGE: f64 = 0.10;
42
43/// Weight of the latency component in the composite stability score.
44const W_LATENCY: f64 = 0.6;
45
46/// Weight of the reliability component in the composite stability score.
47const W_RELIABILITY: f64 = 0.4;
48
49// ── Fault-score thresholds ────────────────────────────────────────────────────
50
51/// Fault score (0–100) at which a peer is considered *degraded* — no new
52/// fragments will be assigned to it.
53pub const FAULT_DEGRADED: u8 = 70;
54
55/// Fault score at which a peer is *suspected* — preventive recoding begins.
56pub const FAULT_SUSPECTED: u8 = 90;
57
58/// Fault score at which a peer is *blacklisted* — evicted from routing and
59/// announced as blacklisted on gossip.
60pub const FAULT_BLACKLISTED: u8 = 100;
61
62/// Points added to `fault_score` per failed Proof-of-Storage challenge.
63const FAULT_INCREMENT: u8 = 5;
64
65/// Points subtracted from `fault_score` per successful PoS challenge.
66const FAULT_DECAY: u8 = 1;
67
68// ── PeerQos ───────────────────────────────────────────────────────────────────
69
70/// Live QoS data for a single remote Pouch peer.
71///
72/// Serialisable so that it can be persisted across daemon restarts or
73/// gossipped as part of a `ChallengeResult` announcement.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PeerQos {
76 /// libp2p PeerId (base58).
77 pub peer_id: String,
78
79 /// EWMA round-trip time in milliseconds. `None` until the first Ping
80 /// response is received.
81 pub rtt_ewma_ms: Option<f64>,
82
83 /// EWMA of challenge success rate in `[0.0, 1.0]`.
84 /// Starts at `1.0` (benefit of the doubt).
85 pub challenge_success_ewma: f64,
86
87 /// Unix timestamp (seconds) of the most recent Ping exchange.
88 pub last_ping_at: Option<u64>,
89
90 /// Total Ping + PoS challenges sent.
91 pub challenges_sent: u64,
92
93 /// Total successful challenge responses received.
94 pub challenges_succeeded: u64,
95
96 /// Accumulated fault score in `[0, 100]`.
97 ///
98 /// Incremented by `FAULT_INCREMENT` on each failed Proof-of-Storage
99 /// challenge; decremented by `FAULT_DECAY` on each success.
100 /// At [`FAULT_DEGRADED`] (70) no new fragments are sent; at
101 /// [`FAULT_SUSPECTED`] (90) preventive recoding begins; at
102 /// [`FAULT_BLACKLISTED`] (100) the peer is evicted.
103 pub fault_score: u8,
104}
105
106impl PeerQos {
107 /// Create a fresh QoS record for a newly discovered peer.
108 pub fn new(peer_id: impl Into<String>) -> Self {
109 Self {
110 peer_id: peer_id.into(),
111 rtt_ewma_ms: None,
112 challenge_success_ewma: 1.0,
113 last_ping_at: None,
114 challenges_sent: 0,
115 challenges_succeeded: 0,
116 fault_score: 0,
117 }
118 }
119
120 // ── Measurements ─────────────────────────────────────────────────────
121
122 /// Incorporate a new RTT sample (milliseconds) into the EWMA.
123 ///
124 /// Also updates `last_ping_at` to the current UTC time.
125 pub fn record_ping(&mut self, rtt_ms: f64) {
126 self.rtt_ewma_ms = Some(match self.rtt_ewma_ms {
127 None => rtt_ms,
128 Some(prev) => ALPHA_RTT * rtt_ms + (1.0 - ALPHA_RTT) * prev,
129 });
130 self.last_ping_at = Some(chrono::Utc::now().timestamp() as u64);
131 }
132
133 /// Record the outcome of a Ping timeout (no response within deadline).
134 ///
135 /// The latency component decays toward 0 by increasing the EWMA with a
136 /// very large RTT value (10 × reference), signalling poor reachability.
137 pub fn record_ping_timeout(&mut self) {
138 self.record_ping(RTT_REF_MS * 10.0);
139 }
140
141 /// Incorporate the result of a challenge (Ping or Proof-of-Storage).
142 ///
143 /// `success = true` → response was received and validated.
144 /// `success = false` → response timed out or proof was invalid.
145 pub fn record_challenge(&mut self, success: bool) {
146 let outcome = if success { 1.0_f64 } else { 0.0_f64 };
147 self.challenge_success_ewma =
148 BETA_CHALLENGE * outcome + (1.0 - BETA_CHALLENGE) * self.challenge_success_ewma;
149 self.challenges_sent += 1;
150 if success {
151 self.challenges_succeeded += 1;
152 }
153 }
154
155 /// Record a successful Proof-of-Storage response.
156 ///
157 /// Decays `fault_score` by `FAULT_DECAY` and records a successful
158 /// challenge in the reliability EWMA.
159 pub fn record_pos_success(&mut self) {
160 self.fault_score = self.fault_score.saturating_sub(FAULT_DECAY);
161 self.record_challenge(true);
162 }
163
164 /// Record a failed Proof-of-Storage response (timeout or invalid proof).
165 ///
166 /// Increments `fault_score` by `FAULT_INCREMENT` (capped at 100) and
167 /// records a failed challenge in the reliability EWMA.
168 pub fn record_pos_failure(&mut self) {
169 self.fault_score = self.fault_score.saturating_add(FAULT_INCREMENT).min(100);
170 self.record_challenge(false);
171 }
172
173 /// Current fault status derived from [`Self::fault_score`].
174 ///
175 /// Returns `"ok"`, `"degraded"`, `"suspected"`, or `"blacklisted"`.
176 pub fn fault_status(&self) -> &'static str {
177 match self.fault_score {
178 s if s >= FAULT_BLACKLISTED => "blacklisted",
179 s if s >= FAULT_SUSPECTED => "suspected",
180 s if s >= FAULT_DEGRADED => "degraded",
181 _ => "ok",
182 }
183 }
184
185 // ── Derived scores ────────────────────────────────────────────────────
186
187 /// Latency component of the stability score: `exp(−rtt_ewma / RTT_REF_MS)`.
188 ///
189 /// Returns `0.0` if no Ping has been recorded yet.
190 pub fn latency_score(&self) -> f64 {
191 self.rtt_ewma_ms
192 .map_or(0.0, |rtt| (-rtt / RTT_REF_MS).exp())
193 }
194
195 /// Composite stability score in `[0.0, 1.0]`.
196 ///
197 /// ```text
198 /// s = W_LATENCY · latency_score + W_RELIABILITY · challenge_success_ewma
199 /// ```
200 ///
201 /// This value feeds directly into `coding::params::compute_coding_params` to
202 /// determine the recovery threshold `k` for a target high probability `Ph`.
203 pub fn stability_score(&self) -> f64 {
204 W_LATENCY * self.latency_score() + W_RELIABILITY * self.challenge_success_ewma
205 }
206
207 /// Whether the peer has been pinged at all.
208 pub fn is_observed(&self) -> bool {
209 self.rtt_ewma_ms.is_some()
210 }
211}
212
213// ── QosRegistry ───────────────────────────────────────────────────────────────
214
215/// In-memory map of `peer_id → PeerQos` for the local node.
216///
217/// Held inside `DaemonState` under a `RwLock`:
218///
219/// ```ignore
220/// pub qos: RwLock<QosRegistry>,
221/// ```
222#[derive(Debug, Default)]
223pub struct QosRegistry {
224 peers: HashMap<String, PeerQos>,
225}
226
227impl QosRegistry {
228 pub fn new() -> Self {
229 Self::default()
230 }
231
232 /// Return (or lazily create) the QoS record for `peer_id`.
233 pub fn entry(&mut self, peer_id: impl Into<String>) -> &mut PeerQos {
234 let key = peer_id.into();
235 self.peers
236 .entry(key.clone())
237 .or_insert_with(|| PeerQos::new(key))
238 }
239
240 /// Read-only view of a peer's QoS; returns `None` if never seen.
241 pub fn get(&self, peer_id: &str) -> Option<&PeerQos> {
242 self.peers.get(peer_id)
243 }
244
245 /// Remove a peer (e.g. on explicit Farewell or eviction).
246 pub fn remove(&mut self, peer_id: &str) {
247 self.peers.remove(peer_id);
248 }
249
250 /// Stability score for `peer_id`. Returns `0.0` if never seen.
251 pub fn stability_score(&self, peer_id: &str) -> f64 {
252 self.peers
253 .get(peer_id)
254 .map_or(0.0, PeerQos::stability_score)
255 }
256
257 /// Snapshot of all stability scores as a `Vec<f64>` (order not defined).
258 ///
259 /// This is the primary input to [`crate::coding::params::compute_coding_params`].
260 pub fn all_stability_scores(&self) -> Vec<f64> {
261 self.peers.values().map(PeerQos::stability_score).collect()
262 }
263
264 /// Stability scores only for Pouch peers whose `peer_id` appears in
265 /// `pouch_peer_ids` — used to scope the k-calculation to active Pouches
266 /// in a specific network, rather than all connected peers.
267 pub fn stability_scores_for<'a>(
268 &self,
269 pouch_peer_ids: impl Iterator<Item = &'a str>,
270 ) -> Vec<f64> {
271 pouch_peer_ids.map(|id| self.stability_score(id)).collect()
272 }
273
274 /// Total number of tracked peers.
275 pub fn peer_count(&self) -> usize {
276 self.peers.len()
277 }
278
279 /// All peer IDs currently tracked.
280 pub fn peer_ids(&self) -> impl Iterator<Item = &str> {
281 self.peers.keys().map(String::as_str)
282 }
283
284 /// Fault status of a peer: `"ok"`, `"degraded"`, `"suspected"`, or
285 /// `"blacklisted"`. Returns `"ok"` if the peer has never been seen.
286 pub fn fault_status(&self, peer_id: &str) -> &'static str {
287 self.peers.get(peer_id).map_or("ok", PeerQos::fault_status)
288 }
289
290 /// Fault score of a peer in `[0, 100]`. Returns `0` if never seen.
291 pub fn fault_score(&self, peer_id: &str) -> u8 {
292 self.peers.get(peer_id).map_or(0, |p| p.fault_score)
293 }
294}
295
296// ── Tests ─────────────────────────────────────────────────────────────────────
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn new_peer_has_zero_latency_score() {
304 let p = PeerQos::new("peer1");
305 assert_eq!(p.latency_score(), 0.0);
306 assert_eq!(p.challenge_success_ewma, 1.0);
307 }
308
309 #[test]
310 fn stability_score_perfect_peer() {
311 let mut p = PeerQos::new("peer1");
312 // 0 ms RTT → latency_score = 1.0
313 p.record_ping(0.0);
314 assert!((p.latency_score() - 1.0).abs() < 1e-9);
315 // stability = 0.6*1.0 + 0.4*1.0 = 1.0
316 assert!((p.stability_score() - 1.0).abs() < 1e-9);
317 }
318
319 #[test]
320 fn stability_score_at_rtt_ref() {
321 let mut p = PeerQos::new("peer1");
322 // After many samples of RTT_REF_MS the EWMA converges to RTT_REF_MS.
323 // Use a single perfect-convergence shortcut:
324 p.rtt_ewma_ms = Some(RTT_REF_MS);
325 // latency_score = exp(-1) ≈ 0.3679
326 let expected_lat = (-1.0_f64).exp();
327 assert!((p.latency_score() - expected_lat).abs() < 1e-9);
328 }
329
330 #[test]
331 fn challenge_failure_degrades_score() {
332 let mut p = PeerQos::new("peer1");
333 p.record_ping(10.0); // fast peer
334 let score_before = p.stability_score();
335 for _ in 0..20 {
336 p.record_challenge(false);
337 }
338 assert!(p.stability_score() < score_before);
339 }
340
341 #[test]
342 fn ping_timeout_degrades_latency() {
343 let mut p = PeerQos::new("peer1");
344 p.record_ping(10.0);
345 let lat_before = p.latency_score();
346 for _ in 0..20 {
347 p.record_ping_timeout();
348 }
349 assert!(p.latency_score() < lat_before);
350 }
351
352 #[test]
353 fn rtt_ewma_converges() {
354 let mut p = PeerQos::new("peer1");
355 // Feed many identical samples: EWMA should converge to that value.
356 for _ in 0..100 {
357 p.record_ping(200.0);
358 }
359 assert!((p.rtt_ewma_ms.unwrap() - 200.0).abs() < 1.0);
360 }
361
362 #[test]
363 fn registry_entry_creates_new_peer() {
364 let mut reg = QosRegistry::new();
365 reg.entry("p1").record_ping(50.0);
366 assert!(reg.get("p1").is_some());
367 assert_eq!(reg.peer_count(), 1);
368 }
369
370 #[test]
371 fn all_stability_scores_length() {
372 let mut reg = QosRegistry::new();
373 reg.entry("p1").record_ping(50.0);
374 reg.entry("p2").record_ping(100.0);
375 reg.entry("p3").record_ping(300.0);
376 assert_eq!(reg.all_stability_scores().len(), 3);
377 }
378}