1use crate::{
36 coding::rlnc,
37 coding::rlnc::EncodedFragment,
38 network::{
39 qos::QosRegistry, state::NetworkState, FragmentResponse, NetworkCommand,
40 OutgoingAssignments, OutgoingFragment, FAULT_DEGRADED, FAULT_SUSPECTED,
41 },
42 service::ServiceType,
43};
44use libp2p::PeerId;
45use std::sync::{Arc, RwLock};
46use std::time::{Duration, SystemTime, UNIX_EPOCH};
47use tokio::sync::mpsc;
48
49const PING_INTERVAL_SECS: u64 = 60;
53
54const PING_TIMEOUT_SECS: u64 = 5;
56
57const POS_INTERVAL_SECS: u64 = 300;
59
60const POS_TIMEOUT_SECS: u64 = 10;
62
63pub async fn run_quality_monitor(
76 network_state: Arc<RwLock<NetworkState>>,
77 qos: Arc<RwLock<QosRegistry>>,
78 net_tx: mpsc::Sender<NetworkCommand>,
79 outgoing_assignments: OutgoingAssignments,
80) {
81 let mut ping_interval = tokio::time::interval(Duration::from_secs(PING_INTERVAL_SECS));
82 ping_interval.reset();
85
86 let mut pos_interval = tokio::time::interval(Duration::from_secs(POS_INTERVAL_SECS));
87 pos_interval.reset();
88
89 tracing::info!(
90 ping_interval_secs = PING_INTERVAL_SECS,
91 pos_interval_secs = POS_INTERVAL_SECS,
92 "Quality monitor started"
93 );
94
95 loop {
96 tokio::select! {
97 _ = ping_interval.tick() => {
98 run_ping_round(&network_state, &qos, &net_tx).await;
99 }
100 _ = pos_interval.tick() => {
101 run_pos_round(&outgoing_assignments, &network_state, &qos, &net_tx).await;
102 }
103 }
104 }
105}
106
107async fn run_ping_round(
110 network_state: &Arc<RwLock<NetworkState>>,
111 qos: &Arc<RwLock<QosRegistry>>,
112 net_tx: &mpsc::Sender<NetworkCommand>,
113) {
114 let pouch_peers: Vec<(String, PeerId)> = {
116 let ns = match network_state.read() {
117 Ok(g) => g,
118 Err(e) => {
119 tracing::warn!("quality_monitor: NetworkState lock poisoned: {e}");
120 return;
121 }
122 };
123 ns.all()
124 .into_iter()
125 .filter(|n| n.service_type == ServiceType::Pouch)
126 .filter_map(|n| {
127 n.peer_id
128 .parse::<PeerId>()
129 .ok()
130 .map(|pid| (n.peer_id.clone(), pid))
131 })
132 .collect()
133 };
134
135 if pouch_peers.is_empty() {
136 tracing::debug!("quality_monitor: no Pouch peers known, skipping Ping round");
137 return;
138 }
139
140 tracing::debug!(
141 peers = pouch_peers.len(),
142 "quality_monitor: starting Ping round"
143 );
144
145 let mut handles = Vec::with_capacity(pouch_peers.len());
146 for (peer_id_str, peer_id) in pouch_peers {
147 let qos = Arc::clone(qos);
148 let net_tx = net_tx.clone();
149 let handle = tokio::spawn(async move {
150 ping_one_peer(peer_id_str, peer_id, qos, net_tx).await;
151 });
152 handles.push(handle);
153 }
154
155 for h in handles {
156 let _ = h.await;
157 }
158
159 tracing::debug!("quality_monitor: Ping round complete");
160}
161
162async fn run_pos_round(
165 outgoing_assignments: &OutgoingAssignments,
166 network_state: &Arc<RwLock<NetworkState>>,
167 qos: &Arc<RwLock<QosRegistry>>,
168 net_tx: &mpsc::Sender<NetworkCommand>,
169) {
170 let challenges: Vec<(String, PeerId, String, String)> = {
172 let assignments = match outgoing_assignments.read() {
173 Ok(g) => g,
174 Err(e) => {
175 tracing::warn!("quality_monitor: outgoing_assignments lock poisoned: {e}");
176 return;
177 }
178 };
179 assignments
180 .iter()
181 .filter_map(|(peer_id_str, fragments)| {
182 if fragments.is_empty() {
183 return None;
184 }
185 let idx = SystemTime::now()
187 .duration_since(UNIX_EPOCH)
188 .unwrap_or_default()
189 .subsec_nanos() as usize
190 % fragments.len();
191 let frag = &fragments[idx];
192 peer_id_str.parse::<PeerId>().ok().map(|pid| {
193 (
194 peer_id_str.clone(),
195 pid,
196 frag.chunk_id.clone(),
197 frag.fragment_id.clone(),
198 )
199 })
200 })
201 .collect()
202 };
203
204 if challenges.is_empty() {
205 tracing::debug!("quality_monitor: no outgoing fragments, skipping PoS round");
206 return;
207 }
208
209 tracing::debug!(
210 peers = challenges.len(),
211 "quality_monitor: starting PoS round"
212 );
213
214 let mut handles = Vec::with_capacity(challenges.len());
215 for (peer_id_str, peer_id, chunk_id, fragment_id) in challenges.iter().cloned() {
216 let qos = Arc::clone(qos);
217 let net_tx = net_tx.clone();
218 let handle = tokio::spawn(async move {
219 pos_one_peer(peer_id_str, peer_id, chunk_id, fragment_id, qos, net_tx).await;
220 });
221 handles.push(handle);
222 }
223
224 for h in handles {
225 let _ = h.await;
226 }
227
228 tracing::debug!("quality_monitor: PoS round complete");
229
230 let suspected: Vec<(String, PeerId)> = challenges
235 .iter()
236 .filter(|(peer_id_str, _, _, _)| {
237 qos.read()
238 .map(|g| g.fault_score(peer_id_str) >= FAULT_SUSPECTED)
239 .unwrap_or(false)
240 })
241 .map(|(peer_id_str, peer_id, _, _)| (peer_id_str.clone(), *peer_id))
242 .fold(Vec::<(String, PeerId)>::new(), |mut acc, (s, p)| {
244 if !acc.iter().any(|(x, _)| x == &s) {
245 acc.push((s, p));
246 }
247 acc
248 });
249
250 for (peer_id_str, peer_id) in suspected {
251 reroute_suspected_peer(
252 &peer_id_str,
253 peer_id,
254 outgoing_assignments,
255 network_state,
256 qos,
257 net_tx,
258 )
259 .await;
260 }
261}
262
263async fn reroute_suspected_peer(
275 suspected_peer_str: &str,
276 suspected_peer: PeerId,
277 outgoing: &OutgoingAssignments,
278 network_state: &Arc<RwLock<NetworkState>>,
279 qos: &Arc<RwLock<QosRegistry>>,
280 net_tx: &mpsc::Sender<NetworkCommand>,
281) {
282 let chunks: Vec<String> = {
284 let guard = match outgoing.read() {
285 Ok(g) => g,
286 Err(_) => return,
287 };
288 guard
289 .get(suspected_peer_str)
290 .cloned()
291 .unwrap_or_default()
292 .iter()
293 .map(|f| f.chunk_id.clone())
294 .collect::<std::collections::HashSet<_>>()
295 .into_iter()
296 .collect()
297 };
298
299 if chunks.is_empty() {
300 tracing::debug!(
301 peer = %suspected_peer_str,
302 "reroute: no assigned chunks, nothing to reroute"
303 );
304 return;
305 }
306
307 let healthy_peer: Option<(String, PeerId)> = {
309 let ns = match network_state.read() {
310 Ok(g) => g,
311 Err(_) => return,
312 };
313 let qos_g = match qos.read() {
314 Ok(g) => g,
315 Err(_) => return,
316 };
317 ns.all()
318 .into_iter()
319 .filter(|n| n.service_type == ServiceType::Pouch)
320 .filter(|n| n.peer_id != suspected_peer_str)
321 .filter(|n| qos_g.fault_score(&n.peer_id) < FAULT_DEGRADED)
322 .filter_map(|n| {
323 n.peer_id
324 .parse::<PeerId>()
325 .ok()
326 .map(|pid| (n.peer_id.clone(), pid))
327 })
328 .next()
329 };
330
331 let (healthy_str, healthy_pid) = match healthy_peer {
332 Some(p) => p,
333 None => {
334 tracing::warn!(
335 peer = %suspected_peer_str,
336 "reroute: no healthy peer available, aborting rerouting"
337 );
338 return;
339 }
340 };
341
342 tracing::info!(
343 suspected = %suspected_peer_str,
344 healthy = %healthy_str,
345 chunks = chunks.len(),
346 "reroute: starting preventive fragment rerouting"
347 );
348
349 for chunk_id in chunks {
351 let (tx, rx) = tokio::sync::oneshot::channel();
352 if net_tx
353 .send(NetworkCommand::FetchChunkFragments {
354 peer_id: suspected_peer,
355 chunk_id: chunk_id.clone(),
356 resp_tx: tx,
357 })
358 .await
359 .is_err()
360 {
361 tracing::debug!(chunk = %chunk_id, "reroute: net_tx closed");
362 continue;
363 }
364
365 match tokio::time::timeout(Duration::from_secs(POS_TIMEOUT_SECS), rx).await {
366 Ok(Ok(FragmentResponse::FoundMany { fragments })) if !fragments.is_empty() => {
367 let encoded: Vec<EncodedFragment> = fragments
368 .iter()
369 .filter_map(|(fid, bytes)| {
370 EncodedFragment::from_bytes(fid.clone(), chunk_id.clone(), bytes).ok()
371 })
372 .collect();
373
374 if encoded.is_empty() {
375 tracing::debug!(chunk = %chunk_id, "reroute: all fragments unparseable");
376 continue;
377 }
378
379 match rlnc::recode(&encoded, 1) {
380 Ok(mut recoded) => {
381 if let Some(new_frag) = recoded.pop() {
382 let frag_id = new_frag.id.clone();
383 let _ = net_tx
384 .send(NetworkCommand::PushFragment {
385 peer_id: healthy_pid,
386 chunk_id: chunk_id.clone(),
387 fragment_id: frag_id.clone(),
388 data: new_frag.to_bytes(),
389 })
390 .await;
391
392 if let Ok(mut oa) = outgoing.write() {
393 oa.entry(healthy_str.clone())
394 .or_default()
395 .push(OutgoingFragment {
396 chunk_id: chunk_id.clone(),
397 fragment_id: frag_id,
398 });
399 }
400
401 tracing::info!(
402 chunk = %chunk_id,
403 healthy = %healthy_str,
404 "reroute: recoded fragment pushed to healthy peer"
405 );
406 }
407 }
408 Err(e) => {
409 tracing::debug!(
410 chunk = %chunk_id,
411 err = %e,
412 "reroute: recode failed"
413 );
414 }
415 }
416 }
417 _ => {
418 tracing::debug!(
419 chunk = %chunk_id,
420 peer = %suspected_peer_str,
421 "reroute: fragment fetch failed or timed out"
422 );
423 }
424 }
425 }
426}
427
428async fn pos_one_peer(
431 peer_id_str: String,
432 peer_id: PeerId,
433 chunk_id: String,
434 fragment_id: String,
435 qos: Arc<RwLock<QosRegistry>>,
436 net_tx: mpsc::Sender<NetworkCommand>,
437) {
438 let nonce = SystemTime::now()
439 .duration_since(UNIX_EPOCH)
440 .unwrap_or_default()
441 .as_millis() as u64;
442
443 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<bool>();
444
445 if net_tx
446 .send(NetworkCommand::ProofOfStorage {
447 peer_id,
448 chunk_id: chunk_id.clone(),
449 fragment_id: fragment_id.clone(),
450 nonce,
451 resp_tx,
452 })
453 .await
454 .is_err()
455 {
456 tracing::warn!(peer = %peer_id_str, "quality_monitor: net_tx closed (PoS)");
457 return;
458 }
459
460 match tokio::time::timeout(Duration::from_secs(POS_TIMEOUT_SECS), resp_rx).await {
461 Ok(Ok(true)) => {
462 tracing::debug!(
463 peer = %peer_id_str,
464 chunk = %chunk_id,
465 frag = %fragment_id,
466 "PoS challenge: proof received"
467 );
468 if let Ok(mut qos_guard) = qos.write() {
469 qos_guard.entry(peer_id_str).record_pos_success();
470 }
471 }
472 Ok(Ok(false)) | Ok(Err(_)) => {
473 tracing::debug!(
474 peer = %peer_id_str,
475 chunk = %chunk_id,
476 frag = %fragment_id,
477 "PoS challenge: proof failed or fragment not found"
478 );
479 record_pos_failure(&peer_id_str, &qos);
480 }
481 Err(_) => {
482 tracing::debug!(
483 peer = %peer_id_str,
484 "PoS challenge: timeout (>{POS_TIMEOUT_SECS}s)"
485 );
486 record_pos_failure(&peer_id_str, &qos);
487 }
488 }
489}
490
491fn record_pos_failure(peer_id_str: &str, qos: &Arc<RwLock<QosRegistry>>) {
492 if let Ok(mut qos_guard) = qos.write() {
493 qos_guard.entry(peer_id_str).record_pos_failure();
494 }
495}
496
497async fn ping_one_peer(
498 peer_id_str: String,
499 peer_id: PeerId,
500 qos: Arc<RwLock<QosRegistry>>,
501 net_tx: mpsc::Sender<NetworkCommand>,
502) {
503 let sent_at_ms = SystemTime::now()
504 .duration_since(UNIX_EPOCH)
505 .unwrap_or_default()
506 .as_millis() as u64;
507
508 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<u64>();
509
510 if net_tx
512 .send(NetworkCommand::Ping {
513 peer_id,
514 sent_at_ms,
515 resp_tx,
516 })
517 .await
518 .is_err()
519 {
520 tracing::warn!(peer = %peer_id_str, "quality_monitor: net_tx closed");
521 return;
522 }
523
524 match tokio::time::timeout(Duration::from_secs(PING_TIMEOUT_SECS), resp_rx).await {
526 Ok(Ok(rtt_ms)) => {
527 tracing::debug!(peer = %peer_id_str, rtt_ms = %rtt_ms, "Ping OK");
528 if let Ok(mut qos_guard) = qos.write() {
529 let entry = qos_guard.entry(peer_id_str.clone());
530 entry.record_ping(rtt_ms as f64);
531 entry.record_challenge(true);
532 }
533 }
534 Ok(Err(_)) => {
535 tracing::debug!(peer = %peer_id_str, "Ping: no response (channel dropped)");
537 record_timeout(&peer_id_str, &qos);
538 }
539 Err(_) => {
540 tracing::debug!(peer = %peer_id_str, "Ping: timeout (>{PING_TIMEOUT_SECS}s)");
541 record_timeout(&peer_id_str, &qos);
542 }
543 }
544}
545
546fn record_timeout(peer_id_str: &str, qos: &Arc<RwLock<QosRegistry>>) {
547 if let Ok(mut qos_guard) = qos.write() {
548 let entry = qos_guard.entry(peer_id_str);
549 entry.record_ping_timeout();
550 entry.record_challenge(false);
551 }
552}
553
554#[cfg(test)]
557mod tests {
558 use super::*;
559 use crate::network::OutgoingFragment;
560
561 #[test]
562 fn record_timeout_degrades_score() {
563 let qos = Arc::new(RwLock::new(QosRegistry::new()));
564 {
566 let mut g = qos.write().unwrap();
567 let e = g.entry("p1");
568 e.record_ping(10.0);
569 e.record_challenge(true);
570 }
571 let score_before = qos.read().unwrap().stability_score("p1");
572
573 for _ in 0..5 {
575 record_timeout("p1", &qos);
576 }
577 let score_after = qos.read().unwrap().stability_score("p1");
578 assert!(score_after < score_before, "timeout should degrade score");
579 }
580
581 #[test]
582 fn new_peer_gets_timeout_entry() {
583 let qos = Arc::new(RwLock::new(QosRegistry::new()));
584 record_timeout("new-peer", &qos);
585 let score = qos.read().unwrap().stability_score("new-peer");
586 assert!(
589 score < 0.95,
590 "new peer after one timeout should have degraded score"
591 );
592 }
593
594 #[test]
595 fn pos_failure_increments_fault_score() {
596 let qos = Arc::new(RwLock::new(QosRegistry::new()));
597 assert_eq!(qos.read().unwrap().fault_score("p1"), 0);
599
600 for _ in 0..3 {
601 record_pos_failure("p1", &qos);
602 }
603 let fault = qos.read().unwrap().fault_score("p1");
604 assert!(fault > 0, "fault score should increase after PoS failures");
605 }
606
607 #[test]
608 fn pos_success_decays_fault_score() {
609 let qos = Arc::new(RwLock::new(QosRegistry::new()));
610 for _ in 0..4 {
612 record_pos_failure("p1", &qos);
613 }
614 let fault_after_failures = qos.read().unwrap().fault_score("p1");
615
616 {
618 let mut g = qos.write().unwrap();
619 g.entry("p1").record_pos_success();
620 }
621 let fault_after_success = qos.read().unwrap().fault_score("p1");
622 assert!(
623 fault_after_success < fault_after_failures,
624 "PoS success should decay fault score"
625 );
626 }
627
628 #[tokio::test]
631 async fn monitor_noop_with_no_peers() {
632 let ns = Arc::new(RwLock::new(NetworkState::new()));
633 let qos = Arc::new(RwLock::new(QosRegistry::new()));
634 let (net_tx, _net_rx) = mpsc::channel(8);
635 let outgoing = Arc::new(RwLock::new(std::collections::HashMap::new()));
636
637 tokio::time::timeout(
639 Duration::from_millis(200),
640 run_quality_monitor(ns, qos, net_tx, outgoing),
641 )
642 .await
643 .ok(); }
645
646 #[test]
647 fn outgoing_assignment_structure() {
648 let outgoing: OutgoingAssignments = Arc::new(RwLock::new(std::collections::HashMap::new()));
650 {
651 let mut g = outgoing.write().unwrap();
652 g.entry("peer-1".into())
653 .or_default()
654 .push(OutgoingFragment {
655 chunk_id: "chunk-a".into(),
656 fragment_id: "frag-1".into(),
657 });
658 }
659 let g = outgoing.read().unwrap();
660 assert_eq!(g.get("peer-1").unwrap().len(), 1);
661 assert_eq!(g.get("peer-1").unwrap()[0].chunk_id, "chunk-a");
662 }
663}