1pub mod behaviour;
16pub mod bootstrap;
17pub mod fragment_gossip;
18pub mod kad_store;
19pub mod qos;
20pub mod quality_monitor;
21pub mod reputation;
22pub mod state;
23
24pub use behaviour::{BillPouchBehaviour, FragmentRequest, FragmentResponse};
25pub use bootstrap::BootstrapList;
26pub use fragment_gossip::{FragmentIndexAnnouncement, FragmentPointer, RemoteFragmentIndex};
27pub use kad_store::KadPeers;
28pub use qos::{PeerQos, QosRegistry, FAULT_BLACKLISTED, FAULT_DEGRADED, FAULT_SUSPECTED};
29pub use quality_monitor::run_quality_monitor;
30pub use reputation::{ReputationRecord, ReputationStore, ReputationTier};
31pub use state::{NetworkState, NodeInfo};
32
33use crate::{
34 error::{BpError, BpResult},
35 storage::StorageManager,
36};
37use futures::StreamExt;
38use libp2p::{gossipsub, request_response, swarm::SwarmEvent, Multiaddr, PeerId, Swarm};
39use std::collections::{HashMap, HashSet};
40use std::sync::{Arc, RwLock};
41use std::time::Duration;
42use tokio::sync::mpsc;
43
44pub type StorageManagerMap = Arc<RwLock<HashMap<String, Arc<RwLock<StorageManager>>>>>;
46
47#[derive(Debug, Clone)]
53pub struct OutgoingFragment {
54 pub chunk_id: String,
56 pub fragment_id: String,
58}
59
60pub type OutgoingAssignments = Arc<RwLock<HashMap<String, Vec<OutgoingFragment>>>>;
67
68#[derive(Debug)]
75pub enum NetworkCommand {
76 JoinNetwork { network_id: String },
78 LeaveNetwork { network_id: String },
80 Announce {
82 network_id: String,
84 payload: Vec<u8>,
86 },
87 AnnounceIndex {
90 network_id: String,
92 payload: Vec<u8>,
94 },
95 Dial { addr: Multiaddr },
97 PushFragment {
99 peer_id: PeerId,
100 chunk_id: String,
101 fragment_id: String,
102 data: Vec<u8>,
103 },
104 FetchChunkFragments {
107 peer_id: PeerId,
108 chunk_id: String,
109 resp_tx: tokio::sync::oneshot::Sender<FragmentResponse>,
110 },
111 Shutdown,
113 DialRelay { relay_addr: Multiaddr },
119 Ping {
126 peer_id: PeerId,
127 sent_at_ms: u64,
129 resp_tx: tokio::sync::oneshot::Sender<u64>,
131 },
132 ProofOfStorage {
138 peer_id: PeerId,
139 chunk_id: String,
140 fragment_id: String,
141 nonce: u64,
142 resp_tx: tokio::sync::oneshot::Sender<bool>,
144 },
145}
146
147pub fn build_swarm(
158 keypair: libp2p::identity::Keypair,
159) -> anyhow::Result<Swarm<BillPouchBehaviour>> {
160 let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair)
161 .with_tokio()
162 .with_tcp(
163 libp2p::tcp::Config::default(),
164 libp2p::noise::Config::new,
165 libp2p::yamux::Config::default,
166 )?
167 .with_relay_client(libp2p::noise::Config::new, libp2p::yamux::Config::default)?
168 .with_behaviour(|key, relay_client| {
169 BillPouchBehaviour::new(key, relay_client)
170 .map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e.to_string()))
171 })
172 .map_err(|e| anyhow::anyhow!("{}", e))?
173 .build();
174
175 Ok(swarm)
176}
177
178#[allow(clippy::too_many_arguments)]
196pub async fn run_network_loop(
197 mut swarm: Swarm<BillPouchBehaviour>,
198 mut cmd_rx: mpsc::Receiver<NetworkCommand>,
199 state: Arc<RwLock<NetworkState>>,
200 listen_addr: Multiaddr,
201 storage_managers: StorageManagerMap,
202 outgoing_assignments: OutgoingAssignments,
203 remote_fragment_index: Arc<RwLock<fragment_gossip::RemoteFragmentIndex>>,
204) -> BpResult<()> {
205 swarm
206 .listen_on(listen_addr.clone())
207 .map_err(|e| BpError::Network(e.to_string()))?;
208
209 tracing::info!("Network listening on {}", listen_addr);
210
211 let mut subscribed_networks: HashSet<String> = HashSet::new();
212
213 let mut pending_fetches: HashMap<
215 request_response::OutboundRequestId,
216 tokio::sync::oneshot::Sender<FragmentResponse>,
217 > = HashMap::new();
218
219 let mut pending_pings: HashMap<
221 request_response::OutboundRequestId,
222 (u64, tokio::sync::oneshot::Sender<u64>),
223 > = HashMap::new();
224
225 let mut pending_pos: HashMap<
227 request_response::OutboundRequestId,
228 tokio::sync::oneshot::Sender<bool>,
229 > = HashMap::new();
230
231 let mut local_kad_peers: kad_store::KadPeers = crate::config::kad_peers_path()
235 .map(|p| kad_store::KadPeers::load(&p))
236 .unwrap_or_default();
237
238 if !local_kad_peers.0.is_empty() {
239 tracing::info!(
240 peers = local_kad_peers.0.len(),
241 "kad_store: dialing saved peers"
242 );
243 for addrs in local_kad_peers.0.values() {
244 for addr_str in addrs {
245 if let Ok(addr) = addr_str.parse::<libp2p::Multiaddr>() {
246 let _ = swarm.dial(addr);
247 }
248 }
249 }
250 }
251
252 if let Ok(bp_path) = crate::config::bootstrap_path() {
256 bootstrap::BootstrapList::load(&bp_path).apply(&mut swarm);
257 }
258
259 const KAD_SAVE_INTERVAL_SECS: u64 = 600;
261 let mut kad_save_interval = tokio::time::interval(Duration::from_secs(KAD_SAVE_INTERVAL_SECS));
262 kad_save_interval.reset(); loop {
265 tokio::select! {
266 event = swarm.select_next_some() => {
268 handle_swarm_event(event, &mut swarm, &state, &mut subscribed_networks, &storage_managers, &mut pending_fetches, &mut pending_pings, &mut pending_pos, &remote_fragment_index, &mut local_kad_peers).await;
269 }
270 cmd = cmd_rx.recv() => {
272 match cmd {
273 None => {
274 tracing::info!("Network command channel closed — shutting down");
275 save_kad_peers(&local_kad_peers);
276 break;
277 }
278 Some(NetworkCommand::Shutdown) => {
279 tracing::info!("Network shutdown requested");
280 save_kad_peers(&local_kad_peers);
281 break;
282 }
283 Some(NetworkCommand::JoinNetwork { network_id }) => {
284 if !subscribed_networks.contains(&network_id) {
285 let topic = gossipsub::IdentTopic::new(NodeInfo::topic_name(&network_id));
286 if let Err(e) = swarm.behaviour_mut().gossipsub.subscribe(&topic) {
287 tracing::warn!("Failed to subscribe to {}: {}", network_id, e);
288 } else {
289 subscribed_networks.insert(network_id.clone());
290 tracing::info!("Joined network gossip: {}", network_id);
291 }
292 let idx_topic = gossipsub::IdentTopic::new(
294 fragment_gossip::FragmentIndexAnnouncement::topic_name(&network_id),
295 );
296 if let Err(e) = swarm.behaviour_mut().gossipsub.subscribe(&idx_topic) {
297 tracing::warn!("Failed to subscribe to index topic {}: {}", network_id, e);
298 }
299 }
300 }
301 Some(NetworkCommand::LeaveNetwork { network_id }) => {
302 let topic = gossipsub::IdentTopic::new(NodeInfo::topic_name(&network_id));
303 let _ = swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
304 let idx_topic = gossipsub::IdentTopic::new(
305 fragment_gossip::FragmentIndexAnnouncement::topic_name(&network_id),
306 );
307 let _ = swarm.behaviour_mut().gossipsub.unsubscribe(&idx_topic);
308
309 subscribed_networks.remove(&network_id);
310 }
311 Some(NetworkCommand::Announce { network_id, payload }) => {
312 let topic = gossipsub::IdentTopic::new(NodeInfo::topic_name(&network_id));
313 if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, payload) {
314 tracing::warn!("Gossip publish failed: {}", e);
315 }
316 }
317 Some(NetworkCommand::AnnounceIndex { network_id, payload }) => {
318 let topic = gossipsub::IdentTopic::new(
319 fragment_gossip::FragmentIndexAnnouncement::topic_name(&network_id),
320 );
321 if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, payload) {
322 tracing::warn!("FragmentIndex gossip publish failed: {}", e);
323 }
324 }
325 Some(NetworkCommand::Dial { addr }) => {
326 if let Err(e) = swarm.dial(addr.clone()) {
327 tracing::warn!("Dial {} failed: {}", addr, e);
328 }
329 }
330 Some(NetworkCommand::PushFragment {
331 peer_id,
332 chunk_id,
333 fragment_id,
334 data,
335 }) => {
336 let req = FragmentRequest::Store {
337 chunk_id: chunk_id.clone(),
338 fragment_id: fragment_id.clone(),
339 data,
340 };
341 let _ = swarm
342 .behaviour_mut()
343 .fragment_exchange
344 .send_request(&peer_id, req);
345 if let Ok(mut assignments) = outgoing_assignments.write() {
347 assignments
348 .entry(peer_id.to_string())
349 .or_default()
350 .push(OutgoingFragment {
351 chunk_id: chunk_id.clone(),
352 fragment_id: fragment_id.clone(),
353 });
354 }
355 tracing::debug!(peer=%peer_id, chunk=%chunk_id, frag=%fragment_id, "PushFragment sent");
356 }
357 Some(NetworkCommand::FetchChunkFragments {
358 peer_id,
359 chunk_id,
360 resp_tx,
361 }) => {
362 let req = FragmentRequest::FetchChunkFragments {
363 chunk_id: chunk_id.clone(),
364 };
365 let req_id = swarm
366 .behaviour_mut()
367 .fragment_exchange
368 .send_request(&peer_id, req);
369 pending_fetches.insert(req_id, resp_tx);
370 tracing::debug!(peer=%peer_id, chunk=%chunk_id, "FetchChunkFragments sent");
371 }
372 Some(NetworkCommand::DialRelay { relay_addr }) => {
373 if let Err(e) = swarm.dial(relay_addr.clone()) {
374 tracing::warn!(addr=%relay_addr, "Failed to dial relay: {}", e);
375 } else {
376 tracing::info!(addr=%relay_addr, "Dialing relay node for NAT traversal");
377 }
378 }
379 Some(NetworkCommand::Ping {
380 peer_id,
381 sent_at_ms,
382 resp_tx,
383 }) => {
384 let nonce = sent_at_ms; let req = FragmentRequest::Ping { nonce };
386 let req_id = swarm
387 .behaviour_mut()
388 .fragment_exchange
389 .send_request(&peer_id, req);
390 pending_pings.insert(req_id, (sent_at_ms, resp_tx));
391 tracing::debug!(peer=%peer_id, nonce=%nonce, "Ping sent");
392 }
393 Some(NetworkCommand::ProofOfStorage {
394 peer_id,
395 chunk_id,
396 fragment_id,
397 nonce,
398 resp_tx,
399 }) => {
400 let req = FragmentRequest::ProofOfStorage {
401 chunk_id: chunk_id.clone(),
402 fragment_id: fragment_id.clone(),
403 nonce,
404 };
405 let req_id = swarm
406 .behaviour_mut()
407 .fragment_exchange
408 .send_request(&peer_id, req);
409 pending_pos.insert(req_id, resp_tx);
410 tracing::debug!(
411 peer = %peer_id,
412 chunk = %chunk_id,
413 frag = %fragment_id,
414 nonce = %nonce,
415 "ProofOfStorage challenge sent"
416 );
417 }
418 }
419 }
420 _ = kad_save_interval.tick() => {
422 save_kad_peers(&local_kad_peers);
423 }
424 }
425 }
426
427 Ok(())
428}
429
430fn save_kad_peers(peers: &kad_store::KadPeers) {
432 if let Ok(path) = crate::config::kad_peers_path() {
433 peers.save(&path);
434 tracing::debug!(n = peers.0.len(), "kad_store: peers saved");
435 }
436}
437
438#[allow(clippy::too_many_arguments)]
439async fn handle_swarm_event(
440 event: SwarmEvent<behaviour::BillPouchBehaviourEvent>,
441 swarm: &mut Swarm<BillPouchBehaviour>,
442 state: &Arc<RwLock<NetworkState>>,
443 _subscribed: &mut HashSet<String>,
444 storage_managers: &StorageManagerMap,
445 pending_fetches: &mut HashMap<
446 request_response::OutboundRequestId,
447 tokio::sync::oneshot::Sender<FragmentResponse>,
448 >,
449 pending_pings: &mut HashMap<
450 request_response::OutboundRequestId,
451 (u64, tokio::sync::oneshot::Sender<u64>),
452 >,
453 pending_pos: &mut HashMap<
454 request_response::OutboundRequestId,
455 tokio::sync::oneshot::Sender<bool>,
456 >,
457 remote_fragment_index: &Arc<RwLock<fragment_gossip::RemoteFragmentIndex>>,
458 known_peers: &mut kad_store::KadPeers,
459) {
460 match event {
461 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Gossipsub(
463 gossipsub::Event::Message {
464 propagation_source,
465 message,
466 ..
467 },
468 )) => match serde_json::from_slice::<NodeInfo>(&message.data) {
469 Ok(node_info) => {
470 tracing::debug!(
471 peer=%propagation_source,
472 fingerprint=%node_info.user_fingerprint,
473 svc=%node_info.service_type,
474 "Gossip NodeInfo received"
475 );
476 if let Ok(mut st) = state.write() {
477 st.upsert(node_info);
478 }
479 }
480 Err(_) => {
481 match serde_json::from_slice::<fragment_gossip::FragmentIndexAnnouncement>(
483 &message.data,
484 ) {
485 Ok(ann) => {
486 tracing::debug!(
487 peer = %propagation_source,
488 chunk = %ann.chunk_id,
489 ptrs = ann.pointers.len(),
490 "Gossip FragmentIndex received"
491 );
492 if let Ok(mut idx) = remote_fragment_index.write() {
493 idx.upsert(ann);
494 }
495 }
496 Err(e) => {
497 tracing::warn!("Failed to deserialize gossip message: {}", e);
498 }
499 }
500 }
501 },
502
503 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Mdns(
505 libp2p::mdns::Event::Discovered(list),
506 )) => {
507 for (peer_id, addr) in list {
508 tracing::info!(peer=%peer_id, addr=%addr, "mDNS peer discovered");
509 swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
510 known_peers.add(peer_id.to_string(), addr.to_string());
511 swarm.behaviour_mut().kad.add_address(&peer_id, addr);
512 }
513 }
514
515 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Mdns(
517 libp2p::mdns::Event::Expired(list),
518 )) => {
519 for (peer_id, _addr) in list {
520 tracing::debug!(peer=%peer_id, "mDNS peer expired");
521 swarm
522 .behaviour_mut()
523 .gossipsub
524 .remove_explicit_peer(&peer_id);
525 }
526 }
527
528 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Identify(
530 libp2p::identify::Event::Received { peer_id, info, .. },
531 )) => {
532 tracing::debug!(peer=%peer_id, agent=%info.agent_version, "Identify received");
533 for addr in info.listen_addrs {
534 known_peers.add(peer_id.to_string(), addr.to_string());
535 swarm.behaviour_mut().kad.add_address(&peer_id, addr);
536 }
537 }
538 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Autonat(
540 libp2p::autonat::Event::StatusChanged { old, new },
541 )) => {
542 tracing::info!(
543 old = ?old,
544 new = ?new,
545 "AutoNAT: public reachability status changed"
546 );
547 }
548
549 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Autonat(ev)) => {
551 tracing::debug!(event = ?ev, "AutoNAT probe event");
552 }
553
554 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Relay(
556 libp2p::relay::client::Event::ReservationReqAccepted { relay_peer_id, .. },
557 )) => {
558 tracing::info!(
559 relay = %relay_peer_id,
560 "Relay reservation accepted — node is reachable via relay"
561 );
562 }
563
564 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Relay(
566 libp2p::relay::client::Event::OutboundCircuitEstablished { relay_peer_id, .. },
567 )) => {
568 tracing::info!(
569 relay = %relay_peer_id,
570 "Relay outbound circuit established"
571 );
572 }
573
574 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Relay(ev)) => {
576 tracing::debug!(event = ?ev, "Relay client event");
577 }
578 SwarmEvent::NewListenAddr { address, .. } => {
580 tracing::info!(addr=%address, "Now listening");
581 }
582
583 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
585 tracing::debug!(peer=%peer_id, "Connection established");
586 }
587
588 SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
590 tracing::debug!(peer=%peer_id, cause=?cause, "Connection closed");
591 }
592
593 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::FragmentExchange(
595 request_response::Event::Message {
596 peer,
597 message:
598 request_response::Message::Request {
599 request, channel, ..
600 },
601 },
602 )) => {
603 let response = serve_fragment_request(&request, storage_managers);
604 if let Err(e) = swarm
605 .behaviour_mut()
606 .fragment_exchange
607 .send_response(channel, response)
608 {
609 tracing::warn!(peer=%peer, "send_response failed: {:?}", e);
610 }
611 }
612
613 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::FragmentExchange(
615 request_response::Event::Message {
616 peer,
617 message:
618 request_response::Message::Response {
619 request_id,
620 response,
621 },
622 },
623 )) => {
624 if let FragmentResponse::Pong { nonce } = &response {
626 if let Some((sent_at_ms, tx)) = pending_pings.remove(&request_id) {
627 let now_ms = std::time::SystemTime::now()
628 .duration_since(std::time::UNIX_EPOCH)
629 .unwrap_or_default()
630 .as_millis() as u64;
631 let rtt_ms = now_ms.saturating_sub(sent_at_ms);
632 tracing::debug!(peer=%peer, nonce=%nonce, rtt_ms=%rtt_ms, "Pong received");
633 let _ = tx.send(rtt_ms);
634 return;
635 }
636 }
637 if let FragmentResponse::ProofOfStorageOk { .. } = &response {
639 if let Some(tx) = pending_pos.remove(&request_id) {
640 tracing::debug!(peer = %peer, "ProofOfStorage: proof received");
641 let _ = tx.send(true);
642 return;
643 }
644 }
645 if let FragmentResponse::NotFound = &response {
647 if let Some(tx) = pending_pos.remove(&request_id) {
648 tracing::debug!(peer = %peer, "ProofOfStorage: fragment not found");
649 let _ = tx.send(false);
650 return;
651 }
652 }
653 if let Some(tx) = pending_fetches.remove(&request_id) {
655 let _ = tx.send(response);
656 } else {
657 tracing::debug!(peer=%peer, "Untracked fragment response (fire-and-forget)");
658 }
659 }
660
661 SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::FragmentExchange(
663 request_response::Event::OutboundFailure {
664 request_id, error, ..
665 },
666 )) => {
667 tracing::warn!("Fragment request failed: {:?}", error);
668 pending_fetches.remove(&request_id);
670 pending_pings.remove(&request_id);
671 if let Some(tx) = pending_pos.remove(&request_id) {
672 let _ = tx.send(false);
673 }
674 }
675
676 _ => {}
677 }
678}
679
680fn serve_fragment_request(
682 req: &FragmentRequest,
683 storage_managers: &StorageManagerMap,
684) -> FragmentResponse {
685 let managers = storage_managers.read().unwrap();
686 match req {
687 FragmentRequest::Fetch {
688 chunk_id,
689 fragment_id,
690 } => {
691 for sm_arc in managers.values() {
692 let sm = sm_arc.read().unwrap();
693 if sm
694 .index
695 .fragments_for_chunk(chunk_id)
696 .iter()
697 .any(|m| m.fragment_id == *fragment_id)
698 {
699 match sm.load_fragment(chunk_id, fragment_id) {
700 Ok(fragment) => {
701 return FragmentResponse::Found {
702 data: fragment.to_bytes(),
703 }
704 }
705 Err(e) => tracing::warn!("serve_fragment load error: {e}"),
706 }
707 }
708 }
709 FragmentResponse::NotFound
710 }
711 FragmentRequest::FetchChunkFragments { chunk_id } => {
712 let mut fragments = Vec::new();
713 for sm_arc in managers.values() {
714 let sm = sm_arc.read().unwrap();
715 for meta in sm.index.fragments_for_chunk(chunk_id) {
716 match sm.load_fragment(chunk_id, &meta.fragment_id) {
717 Ok(f) => fragments.push((meta.fragment_id.clone(), f.to_bytes())),
718 Err(e) => tracing::warn!("serve_fragment load error: {e}"),
719 }
720 }
721 }
722 if fragments.is_empty() {
723 FragmentResponse::NotFound
724 } else {
725 FragmentResponse::FoundMany { fragments }
726 }
727 }
728 FragmentRequest::Store {
729 chunk_id,
730 fragment_id,
731 data,
732 } => {
733 use crate::coding::rlnc::EncodedFragment;
735 match EncodedFragment::from_bytes(fragment_id.clone(), chunk_id.clone(), data) {
736 Ok(fragment) => {
737 for sm_arc in managers.values() {
738 let mut sm = sm_arc.write().unwrap();
739 match sm.store_fragment(&fragment) {
740 Ok(()) => {
741 tracing::debug!(chunk=%chunk_id, frag=%fragment_id, "Remote fragment stored");
742 return FragmentResponse::Stored;
743 }
744 Err(e) => {
745 tracing::debug!("Store attempt failed: {e}");
746 }
747 }
748 }
749 FragmentResponse::StoreFailed {
750 reason: "No storage manager with capacity".into(),
751 }
752 }
753 Err(e) => FragmentResponse::StoreFailed {
754 reason: format!("Invalid fragment data: {e}"),
755 },
756 }
757 }
758 FragmentRequest::Ping { nonce } => FragmentResponse::Pong { nonce: *nonce },
759 FragmentRequest::ProofOfStorage {
760 chunk_id,
761 fragment_id,
762 nonce,
763 } => {
764 for sm_arc in managers.values() {
766 let sm = sm_arc.read().unwrap();
767 if sm
768 .index
769 .fragments_for_chunk(chunk_id)
770 .iter()
771 .any(|m| m.fragment_id == *fragment_id)
772 {
773 match sm.load_fragment(chunk_id, fragment_id) {
774 Ok(fragment) => {
775 let data = fragment.to_bytes();
776 let mut hasher = blake3::Hasher::new();
777 hasher.update(&data);
778 hasher.update(&nonce.to_le_bytes());
779 let proof: [u8; 32] = hasher.finalize().into();
780 return FragmentResponse::ProofOfStorageOk { proof };
781 }
782 Err(e) => {
783 tracing::warn!(
784 chunk = %chunk_id,
785 frag = %fragment_id,
786 "serve_fragment PoS load error: {e}"
787 );
788 }
789 }
790 }
791 }
792 FragmentResponse::NotFound
793 }
794 }
795}