bp_core/network/
mod.rs

1//! P2P networking layer built on libp2p.
2//!
3//! This module owns the libp2p [`Swarm`] and exposes two public entry points:
4//!
5//! - [`build_swarm`] — constructs the swarm with all four behaviours
6//!   (gossipsub, Kademlia, Identify, mDNS) and the Noise+Yamux transport.
7//! - [`run_network_loop`] — async task that drives the swarm event loop and
8//!   processes [`NetworkCommand`]s sent by the daemon over a Tokio channel.
9//!
10//! ## Sub-modules
11//!
12//! - [`behaviour`] — the combined [`BillPouchBehaviour`] (`#[derive(NetworkBehaviour)]`).
13//! - [`state`]     — in-memory [`NetworkState`] updated from incoming gossip messages.
14
15pub mod behaviour;
16pub mod bootstrap;
17pub mod fragment_gossip;
18pub mod kad_store;
19pub mod qos;
20pub mod quality_monitor;
21pub mod reputation;
22pub mod state;
23
24pub use behaviour::{BillPouchBehaviour, FragmentRequest, FragmentResponse};
25pub use bootstrap::BootstrapList;
26pub use fragment_gossip::{FragmentIndexAnnouncement, FragmentPointer, RemoteFragmentIndex};
27pub use kad_store::KadPeers;
28pub use qos::{PeerQos, QosRegistry, FAULT_BLACKLISTED, FAULT_DEGRADED, FAULT_SUSPECTED};
29pub use quality_monitor::run_quality_monitor;
30pub use reputation::{ReputationRecord, ReputationStore, ReputationTier};
31pub use state::{NetworkState, NodeInfo};
32
33use crate::{
34    error::{BpError, BpResult},
35    storage::StorageManager,
36};
37use futures::StreamExt;
38use libp2p::{gossipsub, request_response, swarm::SwarmEvent, Multiaddr, PeerId, Swarm};
39use std::collections::{HashMap, HashSet};
40use std::sync::{Arc, RwLock};
41use std::time::Duration;
42use tokio::sync::mpsc;
43
44/// Shared map of active Pouch `StorageManager`s, keyed by `service_id`.
45pub type StorageManagerMap = Arc<RwLock<HashMap<String, Arc<RwLock<StorageManager>>>>>;
46
47/// A single fragment that was pushed to a remote Pouch.
48///
49/// Stored in [`OutgoingAssignments`] so the quality monitor can later issue
50/// Proof-of-Storage challenges for exactly the fragments each Pouch is
51/// supposed to hold.
52#[derive(Debug, Clone)]
53pub struct OutgoingFragment {
54    /// BLAKE3 prefix that identifies the chunk.
55    pub chunk_id: String,
56    /// UUID of the specific fragment within the chunk.
57    pub fragment_id: String,
58}
59
60/// Map of remote Pouch peer IDs → fragments pushed to them.
61///
62/// Updated by the network loop whenever a `PushFragment` command is executed.
63/// Read by [`run_quality_monitor`] to select challenge targets.
64///
65/// [`run_quality_monitor`]: crate::network::run_quality_monitor
66pub type OutgoingAssignments = Arc<RwLock<HashMap<String, Vec<OutgoingFragment>>>>;
67
68/// Commands the daemon can send to the network task over the [`mpsc`] channel.
69///
70/// The network loop runs inside a dedicated `tokio::spawn`ed task and receives
71/// these commands via the `cmd_rx` half of the channel created in [`run_daemon`].
72///
73/// [`run_daemon`]: crate::daemon::run_daemon
74#[derive(Debug)]
75pub enum NetworkCommand {
76    /// Subscribe to the gossipsub topic for `network_id`.
77    JoinNetwork { network_id: String },
78    /// Unsubscribe from the gossipsub topic for `network_id`.
79    LeaveNetwork { network_id: String },
80    /// Publish a serialised [`NodeInfo`] on the gossipsub topic for `network_id`.
81    Announce {
82        /// Network whose topic to publish on.
83        network_id: String,
84        /// Serialised [`NodeInfo`] bytes.
85        payload: Vec<u8>,
86    },
87    /// Publish a serialised [`FragmentIndexAnnouncement`] on the index gossip
88    /// topic (`billpouch/v1/{network_id}/index`) for `network_id`.
89    AnnounceIndex {
90        /// Network whose index topic to publish on.
91        network_id: String,
92        /// Serialised [`FragmentIndexAnnouncement`] bytes.
93        payload: Vec<u8>,
94    },
95    /// Dial a remote peer at a known [`Multiaddr`].
96    Dial { addr: Multiaddr },
97    /// Push a fragment to a remote Pouch peer for storage.
98    PushFragment {
99        peer_id: PeerId,
100        chunk_id: String,
101        fragment_id: String,
102        data: Vec<u8>,
103    },
104    /// Fetch all fragments a remote Pouch holds for a given chunk.
105    /// The response is sent back through the oneshot channel.
106    FetchChunkFragments {
107        peer_id: PeerId,
108        chunk_id: String,
109        resp_tx: tokio::sync::oneshot::Sender<FragmentResponse>,
110    },
111    /// Ask the network loop to exit cleanly.
112    Shutdown,
113    /// Dial a relay node and create a circuit reservation.
114    ///
115    /// After a successful reservation the node becomes reachable at
116    /// `/p2p-circuit` addresses routed through the relay, enabling
117    /// connectivity even when behind symmetric NAT.
118    DialRelay { relay_addr: Multiaddr },
119    /// Ping a remote peer for RTT measurement.
120    ///
121    /// The network loop sends a `FragmentRequest::Ping`, waits for the
122    /// `FragmentResponse::Pong`, computes the RTT in milliseconds and
123    /// forwards it to `resp_tx`.  On timeout or failure the sender is
124    /// dropped without sending.
125    Ping {
126        peer_id: PeerId,
127        /// Monotonic timestamp (ms) at send time — used to compute RTT.
128        sent_at_ms: u64,
129        /// Receives `rtt_ms` when the Pong arrives.
130        resp_tx: tokio::sync::oneshot::Sender<u64>,
131    },
132    /// Issue a Proof-of-Storage challenge to a remote Pouch.
133    ///
134    /// The network loop sends `FragmentRequest::ProofOfStorage`, awaits a
135    /// `FragmentResponse::ProofOfStorageOk`, and delivers `true` (proof
136    /// received) or `false` (no response / fragment not found) to `resp_tx`.
137    ProofOfStorage {
138        peer_id: PeerId,
139        chunk_id: String,
140        fragment_id: String,
141        nonce: u64,
142        /// Receives `true` if a proof was returned, `false` otherwise.
143        resp_tx: tokio::sync::oneshot::Sender<bool>,
144    },
145}
146
147/// Construct a fully configured libp2p [`Swarm`] using the Tokio runtime.
148///
149/// The transport stack is: TCP → Noise (encryption) → Yamux (multiplexing) +
150/// a relay-circuit transport for NAT traversal.
151/// The behaviour stack is: gossipsub + Kademlia + Identify + mDNS +
152/// AutoNAT + relay client.
153///
154/// # Errors
155/// Returns an error if any behaviour or transport component fails to initialise
156/// (typically a key encoding issue or an OS-level socket error).
157pub fn build_swarm(
158    keypair: libp2p::identity::Keypair,
159) -> anyhow::Result<Swarm<BillPouchBehaviour>> {
160    let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair)
161        .with_tokio()
162        .with_tcp(
163            libp2p::tcp::Config::default(),
164            libp2p::noise::Config::new,
165            libp2p::yamux::Config::default,
166        )?
167        .with_relay_client(libp2p::noise::Config::new, libp2p::yamux::Config::default)?
168        .with_behaviour(|key, relay_client| {
169            BillPouchBehaviour::new(key, relay_client)
170                .map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e.to_string()))
171        })
172        .map_err(|e| anyhow::anyhow!("{}", e))?
173        .build();
174
175    Ok(swarm)
176}
177
178/// Drive the P2P event loop until shutdown.
179///
180/// Spawned as a background task by [`run_daemon`].  Runs a `tokio::select!`
181/// loop that concurrently:
182/// - polls the libp2p swarm for events (gossip, mDNS, Identify, Kademlia),
183/// - reads [`NetworkCommand`]s from `cmd_rx` and mutates swarm state.
184///
185/// The loop exits when the command channel is closed or a `Shutdown` command
186/// is received.
187///
188/// # Arguments
189/// - `swarm` — the libp2p swarm built by [`build_swarm`].
190/// - `cmd_rx` — receiving end of the daemon → network command channel.
191/// - `state` — shared [`NetworkState`] updated from gossip messages.
192/// - `listen_addr` — multiaddr to bind the TCP listener on.
193///
194/// [`run_daemon`]: crate::daemon::run_daemon
195#[allow(clippy::too_many_arguments)]
196pub async fn run_network_loop(
197    mut swarm: Swarm<BillPouchBehaviour>,
198    mut cmd_rx: mpsc::Receiver<NetworkCommand>,
199    state: Arc<RwLock<NetworkState>>,
200    listen_addr: Multiaddr,
201    storage_managers: StorageManagerMap,
202    outgoing_assignments: OutgoingAssignments,
203    remote_fragment_index: Arc<RwLock<fragment_gossip::RemoteFragmentIndex>>,
204) -> BpResult<()> {
205    swarm
206        .listen_on(listen_addr.clone())
207        .map_err(|e| BpError::Network(e.to_string()))?;
208
209    tracing::info!("Network listening on {}", listen_addr);
210
211    let mut subscribed_networks: HashSet<String> = HashSet::new();
212
213    // Map outbound request IDs → oneshot response channels for FetchChunkFragments.
214    let mut pending_fetches: HashMap<
215        request_response::OutboundRequestId,
216        tokio::sync::oneshot::Sender<FragmentResponse>,
217    > = HashMap::new();
218
219    // Map outbound request IDs → (sent_at_ms, oneshot) for Ping RTT tracking.
220    let mut pending_pings: HashMap<
221        request_response::OutboundRequestId,
222        (u64, tokio::sync::oneshot::Sender<u64>),
223    > = HashMap::new();
224
225    // Map outbound request IDs → oneshot for Proof-of-Storage challenges.
226    let mut pending_pos: HashMap<
227        request_response::OutboundRequestId,
228        tokio::sync::oneshot::Sender<bool>,
229    > = HashMap::new();
230
231    // ── Kademlia peer persistence ─────────────────────────────────────────
232    // Load known peer addresses from the last run and dial them immediately
233    // so the node can reconnect without waiting for the mDNS warm-up period.
234    let mut local_kad_peers: kad_store::KadPeers = crate::config::kad_peers_path()
235        .map(|p| kad_store::KadPeers::load(&p))
236        .unwrap_or_default();
237
238    if !local_kad_peers.0.is_empty() {
239        tracing::info!(
240            peers = local_kad_peers.0.len(),
241            "kad_store: dialing saved peers"
242        );
243        for addrs in local_kad_peers.0.values() {
244            for addr_str in addrs {
245                if let Ok(addr) = addr_str.parse::<libp2p::Multiaddr>() {
246                    let _ = swarm.dial(addr);
247                }
248            }
249        }
250    }
251
252    // ── Bootstrap nodes ───────────────────────────────────────────────────
253    // Load user-configured bootstrap peers (supports WAN discovery where
254    // mDNS multicast is unavailable) and add them to Kademlia + gossipsub.
255    if let Ok(bp_path) = crate::config::bootstrap_path() {
256        bootstrap::BootstrapList::load(&bp_path).apply(&mut swarm);
257    }
258
259    // How often (in seconds) to flush known peers to disk.
260    const KAD_SAVE_INTERVAL_SECS: u64 = 600;
261    let mut kad_save_interval = tokio::time::interval(Duration::from_secs(KAD_SAVE_INTERVAL_SECS));
262    kad_save_interval.reset(); // skip the first immediate tick
263
264    loop {
265        tokio::select! {
266            // ── Incoming swarm event ──────────────────────────────────────────
267            event = swarm.select_next_some() => {
268                handle_swarm_event(event, &mut swarm, &state, &mut subscribed_networks, &storage_managers, &mut pending_fetches, &mut pending_pings, &mut pending_pos, &remote_fragment_index, &mut local_kad_peers).await;
269            }
270            // ── Command from daemon ───────────────────────────────────────────
271            cmd = cmd_rx.recv() => {
272                match cmd {
273                    None => {
274                        tracing::info!("Network command channel closed — shutting down");
275                        save_kad_peers(&local_kad_peers);
276                        break;
277                    }
278                    Some(NetworkCommand::Shutdown) => {
279                        tracing::info!("Network shutdown requested");
280                        save_kad_peers(&local_kad_peers);
281                        break;
282                    }
283                    Some(NetworkCommand::JoinNetwork { network_id }) => {
284                        if !subscribed_networks.contains(&network_id) {
285                            let topic = gossipsub::IdentTopic::new(NodeInfo::topic_name(&network_id));
286                            if let Err(e) = swarm.behaviour_mut().gossipsub.subscribe(&topic) {
287                                tracing::warn!("Failed to subscribe to {}: {}", network_id, e);
288                            } else {
289                                subscribed_networks.insert(network_id.clone());
290                                tracing::info!("Joined network gossip: {}", network_id);
291                            }
292                            // Also subscribe to the fragment-index topic.
293                            let idx_topic = gossipsub::IdentTopic::new(
294                                fragment_gossip::FragmentIndexAnnouncement::topic_name(&network_id),
295                            );
296                            if let Err(e) = swarm.behaviour_mut().gossipsub.subscribe(&idx_topic) {
297                                tracing::warn!("Failed to subscribe to index topic {}: {}", network_id, e);
298                            }
299                        }
300                    }
301                    Some(NetworkCommand::LeaveNetwork { network_id }) => {
302                        let topic = gossipsub::IdentTopic::new(NodeInfo::topic_name(&network_id));
303                        let _ = swarm.behaviour_mut().gossipsub.unsubscribe(&topic);
304                        let idx_topic = gossipsub::IdentTopic::new(
305                            fragment_gossip::FragmentIndexAnnouncement::topic_name(&network_id),
306                        );
307                        let _ = swarm.behaviour_mut().gossipsub.unsubscribe(&idx_topic);
308
309                        subscribed_networks.remove(&network_id);
310                    }
311                    Some(NetworkCommand::Announce { network_id, payload }) => {
312                        let topic = gossipsub::IdentTopic::new(NodeInfo::topic_name(&network_id));
313                        if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, payload) {
314                            tracing::warn!("Gossip publish failed: {}", e);
315                        }
316                    }
317                    Some(NetworkCommand::AnnounceIndex { network_id, payload }) => {
318                        let topic = gossipsub::IdentTopic::new(
319                            fragment_gossip::FragmentIndexAnnouncement::topic_name(&network_id),
320                        );
321                        if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, payload) {
322                            tracing::warn!("FragmentIndex gossip publish failed: {}", e);
323                        }
324                    }
325                    Some(NetworkCommand::Dial { addr }) => {
326                        if let Err(e) = swarm.dial(addr.clone()) {
327                            tracing::warn!("Dial {} failed: {}", addr, e);
328                        }
329                    }
330                    Some(NetworkCommand::PushFragment {
331                        peer_id,
332                        chunk_id,
333                        fragment_id,
334                        data,
335                    }) => {
336                        let req = FragmentRequest::Store {
337                            chunk_id: chunk_id.clone(),
338                            fragment_id: fragment_id.clone(),
339                            data,
340                        };
341                        let _ = swarm
342                            .behaviour_mut()
343                            .fragment_exchange
344                            .send_request(&peer_id, req);
345                        // Record the assignment so the quality monitor can issue PoS challenges.
346                        if let Ok(mut assignments) = outgoing_assignments.write() {
347                            assignments
348                                .entry(peer_id.to_string())
349                                .or_default()
350                                .push(OutgoingFragment {
351                                    chunk_id: chunk_id.clone(),
352                                    fragment_id: fragment_id.clone(),
353                                });
354                        }
355                        tracing::debug!(peer=%peer_id, chunk=%chunk_id, frag=%fragment_id, "PushFragment sent");
356                    }
357                    Some(NetworkCommand::FetchChunkFragments {
358                        peer_id,
359                        chunk_id,
360                        resp_tx,
361                    }) => {
362                        let req = FragmentRequest::FetchChunkFragments {
363                            chunk_id: chunk_id.clone(),
364                        };
365                        let req_id = swarm
366                            .behaviour_mut()
367                            .fragment_exchange
368                            .send_request(&peer_id, req);
369                        pending_fetches.insert(req_id, resp_tx);
370                        tracing::debug!(peer=%peer_id, chunk=%chunk_id, "FetchChunkFragments sent");
371                    }
372                    Some(NetworkCommand::DialRelay { relay_addr }) => {
373                        if let Err(e) = swarm.dial(relay_addr.clone()) {
374                            tracing::warn!(addr=%relay_addr, "Failed to dial relay: {}", e);
375                        } else {
376                            tracing::info!(addr=%relay_addr, "Dialing relay node for NAT traversal");
377                        }
378                    }
379                    Some(NetworkCommand::Ping {
380                        peer_id,
381                        sent_at_ms,
382                        resp_tx,
383                    }) => {
384                        let nonce = sent_at_ms; // reuse timestamp as nonce
385                        let req = FragmentRequest::Ping { nonce };
386                        let req_id = swarm
387                            .behaviour_mut()
388                            .fragment_exchange
389                            .send_request(&peer_id, req);
390                        pending_pings.insert(req_id, (sent_at_ms, resp_tx));
391                        tracing::debug!(peer=%peer_id, nonce=%nonce, "Ping sent");
392                    }
393                    Some(NetworkCommand::ProofOfStorage {
394                        peer_id,
395                        chunk_id,
396                        fragment_id,
397                        nonce,
398                        resp_tx,
399                    }) => {
400                        let req = FragmentRequest::ProofOfStorage {
401                            chunk_id: chunk_id.clone(),
402                            fragment_id: fragment_id.clone(),
403                            nonce,
404                        };
405                        let req_id = swarm
406                            .behaviour_mut()
407                            .fragment_exchange
408                            .send_request(&peer_id, req);
409                        pending_pos.insert(req_id, resp_tx);
410                        tracing::debug!(
411                            peer = %peer_id,
412                            chunk = %chunk_id,
413                            frag = %fragment_id,
414                            nonce = %nonce,
415                            "ProofOfStorage challenge sent"
416                        );
417                    }
418                }
419            }
420            // ── Periodic Kademlia peer save ───────────────────────────────────
421            _ = kad_save_interval.tick() => {
422                save_kad_peers(&local_kad_peers);
423            }
424        }
425    }
426
427    Ok(())
428}
429
430/// Flush `peers` to disk using the configured path; silently skips on error.
431fn save_kad_peers(peers: &kad_store::KadPeers) {
432    if let Ok(path) = crate::config::kad_peers_path() {
433        peers.save(&path);
434        tracing::debug!(n = peers.0.len(), "kad_store: peers saved");
435    }
436}
437
438#[allow(clippy::too_many_arguments)]
439async fn handle_swarm_event(
440    event: SwarmEvent<behaviour::BillPouchBehaviourEvent>,
441    swarm: &mut Swarm<BillPouchBehaviour>,
442    state: &Arc<RwLock<NetworkState>>,
443    _subscribed: &mut HashSet<String>,
444    storage_managers: &StorageManagerMap,
445    pending_fetches: &mut HashMap<
446        request_response::OutboundRequestId,
447        tokio::sync::oneshot::Sender<FragmentResponse>,
448    >,
449    pending_pings: &mut HashMap<
450        request_response::OutboundRequestId,
451        (u64, tokio::sync::oneshot::Sender<u64>),
452    >,
453    pending_pos: &mut HashMap<
454        request_response::OutboundRequestId,
455        tokio::sync::oneshot::Sender<bool>,
456    >,
457    remote_fragment_index: &Arc<RwLock<fragment_gossip::RemoteFragmentIndex>>,
458    known_peers: &mut kad_store::KadPeers,
459) {
460    match event {
461        // ── Gossipsub: incoming NodeInfo announcement ─────────────────────
462        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Gossipsub(
463            gossipsub::Event::Message {
464                propagation_source,
465                message,
466                ..
467            },
468        )) => match serde_json::from_slice::<NodeInfo>(&message.data) {
469            Ok(node_info) => {
470                tracing::debug!(
471                    peer=%propagation_source,
472                    fingerprint=%node_info.user_fingerprint,
473                    svc=%node_info.service_type,
474                    "Gossip NodeInfo received"
475                );
476                if let Ok(mut st) = state.write() {
477                    st.upsert(node_info);
478                }
479            }
480            Err(_) => {
481                // Try as FragmentIndexAnnouncement.
482                match serde_json::from_slice::<fragment_gossip::FragmentIndexAnnouncement>(
483                    &message.data,
484                ) {
485                    Ok(ann) => {
486                        tracing::debug!(
487                            peer = %propagation_source,
488                            chunk = %ann.chunk_id,
489                            ptrs = ann.pointers.len(),
490                            "Gossip FragmentIndex received"
491                        );
492                        if let Ok(mut idx) = remote_fragment_index.write() {
493                            idx.upsert(ann);
494                        }
495                    }
496                    Err(e) => {
497                        tracing::warn!("Failed to deserialize gossip message: {}", e);
498                    }
499                }
500            }
501        },
502
503        // ── mDNS: discovered a local peer ─────────────────────────────────
504        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Mdns(
505            libp2p::mdns::Event::Discovered(list),
506        )) => {
507            for (peer_id, addr) in list {
508                tracing::info!(peer=%peer_id, addr=%addr, "mDNS peer discovered");
509                swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
510                known_peers.add(peer_id.to_string(), addr.to_string());
511                swarm.behaviour_mut().kad.add_address(&peer_id, addr);
512            }
513        }
514
515        // ── mDNS: peer expired ────────────────────────────────────────────
516        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Mdns(
517            libp2p::mdns::Event::Expired(list),
518        )) => {
519            for (peer_id, _addr) in list {
520                tracing::debug!(peer=%peer_id, "mDNS peer expired");
521                swarm
522                    .behaviour_mut()
523                    .gossipsub
524                    .remove_explicit_peer(&peer_id);
525            }
526        }
527
528        // ── Identify: received remote peer info ───────────────────────────
529        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Identify(
530            libp2p::identify::Event::Received { peer_id, info, .. },
531        )) => {
532            tracing::debug!(peer=%peer_id, agent=%info.agent_version, "Identify received");
533            for addr in info.listen_addrs {
534                known_peers.add(peer_id.to_string(), addr.to_string());
535                swarm.behaviour_mut().kad.add_address(&peer_id, addr);
536            }
537        }
538        // ── AutoNAT: public reachability status changed ────────────────────
539        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Autonat(
540            libp2p::autonat::Event::StatusChanged { old, new },
541        )) => {
542            tracing::info!(
543                old = ?old,
544                new = ?new,
545                "AutoNAT: public reachability status changed"
546            );
547        }
548
549        // ── AutoNAT: probe events (debug only) ───────────────────────────
550        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Autonat(ev)) => {
551            tracing::debug!(event = ?ev, "AutoNAT probe event");
552        }
553
554        // ── Relay client: reservation accepted ─────────────────────────────
555        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Relay(
556            libp2p::relay::client::Event::ReservationReqAccepted { relay_peer_id, .. },
557        )) => {
558            tracing::info!(
559                relay = %relay_peer_id,
560                "Relay reservation accepted — node is reachable via relay"
561            );
562        }
563
564        // ── Relay client: outbound circuit established ───────────────────────
565        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Relay(
566            libp2p::relay::client::Event::OutboundCircuitEstablished { relay_peer_id, .. },
567        )) => {
568            tracing::info!(
569                relay = %relay_peer_id,
570                "Relay outbound circuit established"
571            );
572        }
573
574        // ── Relay client: other events (debug) ────────────────────────────
575        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::Relay(ev)) => {
576            tracing::debug!(event = ?ev, "Relay client event");
577        }
578        // ── New listen address ────────────────────────────────────────────
579        SwarmEvent::NewListenAddr { address, .. } => {
580            tracing::info!(addr=%address, "Now listening");
581        }
582
583        // ── Connection established ────────────────────────────────────────
584        SwarmEvent::ConnectionEstablished { peer_id, .. } => {
585            tracing::debug!(peer=%peer_id, "Connection established");
586        }
587
588        // ── Connection closed ─────────────────────────────────────────────
589        SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
590            tracing::debug!(peer=%peer_id, cause=?cause, "Connection closed");
591        }
592
593        // ── Fragment exchange: incoming request ───────────────────────────
594        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::FragmentExchange(
595            request_response::Event::Message {
596                peer,
597                message:
598                    request_response::Message::Request {
599                        request, channel, ..
600                    },
601            },
602        )) => {
603            let response = serve_fragment_request(&request, storage_managers);
604            if let Err(e) = swarm
605                .behaviour_mut()
606                .fragment_exchange
607                .send_response(channel, response)
608            {
609                tracing::warn!(peer=%peer, "send_response failed: {:?}", e);
610            }
611        }
612
613        // ── Fragment exchange: incoming response ──────────────────────────
614        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::FragmentExchange(
615            request_response::Event::Message {
616                peer,
617                message:
618                    request_response::Message::Response {
619                        request_id,
620                        response,
621                    },
622            },
623        )) => {
624            // Route Pong responses to the waiting Ping oneshot channel.
625            if let FragmentResponse::Pong { nonce } = &response {
626                if let Some((sent_at_ms, tx)) = pending_pings.remove(&request_id) {
627                    let now_ms = std::time::SystemTime::now()
628                        .duration_since(std::time::UNIX_EPOCH)
629                        .unwrap_or_default()
630                        .as_millis() as u64;
631                    let rtt_ms = now_ms.saturating_sub(sent_at_ms);
632                    tracing::debug!(peer=%peer, nonce=%nonce, rtt_ms=%rtt_ms, "Pong received");
633                    let _ = tx.send(rtt_ms);
634                    return;
635                }
636            }
637            // Route ProofOfStorageOk to the waiting PoS oneshot channel.
638            if let FragmentResponse::ProofOfStorageOk { .. } = &response {
639                if let Some(tx) = pending_pos.remove(&request_id) {
640                    tracing::debug!(peer = %peer, "ProofOfStorage: proof received");
641                    let _ = tx.send(true);
642                    return;
643                }
644            }
645            // NotFound on a PoS challenge counts as a failure.
646            if let FragmentResponse::NotFound = &response {
647                if let Some(tx) = pending_pos.remove(&request_id) {
648                    tracing::debug!(peer = %peer, "ProofOfStorage: fragment not found");
649                    let _ = tx.send(false);
650                    return;
651                }
652            }
653            // Route fragment responses to the waiting fetch oneshot channel.
654            if let Some(tx) = pending_fetches.remove(&request_id) {
655                let _ = tx.send(response);
656            } else {
657                tracing::debug!(peer=%peer, "Untracked fragment response (fire-and-forget)");
658            }
659        }
660
661        // ── Fragment exchange: outbound failure ─────────────────────────
662        SwarmEvent::Behaviour(behaviour::BillPouchBehaviourEvent::FragmentExchange(
663            request_response::Event::OutboundFailure {
664                request_id, error, ..
665            },
666        )) => {
667            tracing::warn!("Fragment request failed: {:?}", error);
668            // Drop all channels so callers get notified via RecvError.
669            pending_fetches.remove(&request_id);
670            pending_pings.remove(&request_id);
671            if let Some(tx) = pending_pos.remove(&request_id) {
672                let _ = tx.send(false);
673            }
674        }
675
676        _ => {}
677    }
678}
679
680/// Serve a [`FragmentRequest`] from a remote peer by looking up local storage.
681fn serve_fragment_request(
682    req: &FragmentRequest,
683    storage_managers: &StorageManagerMap,
684) -> FragmentResponse {
685    let managers = storage_managers.read().unwrap();
686    match req {
687        FragmentRequest::Fetch {
688            chunk_id,
689            fragment_id,
690        } => {
691            for sm_arc in managers.values() {
692                let sm = sm_arc.read().unwrap();
693                if sm
694                    .index
695                    .fragments_for_chunk(chunk_id)
696                    .iter()
697                    .any(|m| m.fragment_id == *fragment_id)
698                {
699                    match sm.load_fragment(chunk_id, fragment_id) {
700                        Ok(fragment) => {
701                            return FragmentResponse::Found {
702                                data: fragment.to_bytes(),
703                            }
704                        }
705                        Err(e) => tracing::warn!("serve_fragment load error: {e}"),
706                    }
707                }
708            }
709            FragmentResponse::NotFound
710        }
711        FragmentRequest::FetchChunkFragments { chunk_id } => {
712            let mut fragments = Vec::new();
713            for sm_arc in managers.values() {
714                let sm = sm_arc.read().unwrap();
715                for meta in sm.index.fragments_for_chunk(chunk_id) {
716                    match sm.load_fragment(chunk_id, &meta.fragment_id) {
717                        Ok(f) => fragments.push((meta.fragment_id.clone(), f.to_bytes())),
718                        Err(e) => tracing::warn!("serve_fragment load error: {e}"),
719                    }
720                }
721            }
722            if fragments.is_empty() {
723                FragmentResponse::NotFound
724            } else {
725                FragmentResponse::FoundMany { fragments }
726            }
727        }
728        FragmentRequest::Store {
729            chunk_id,
730            fragment_id,
731            data,
732        } => {
733            // Try to store in any manager that has capacity.
734            use crate::coding::rlnc::EncodedFragment;
735            match EncodedFragment::from_bytes(fragment_id.clone(), chunk_id.clone(), data) {
736                Ok(fragment) => {
737                    for sm_arc in managers.values() {
738                        let mut sm = sm_arc.write().unwrap();
739                        match sm.store_fragment(&fragment) {
740                            Ok(()) => {
741                                tracing::debug!(chunk=%chunk_id, frag=%fragment_id, "Remote fragment stored");
742                                return FragmentResponse::Stored;
743                            }
744                            Err(e) => {
745                                tracing::debug!("Store attempt failed: {e}");
746                            }
747                        }
748                    }
749                    FragmentResponse::StoreFailed {
750                        reason: "No storage manager with capacity".into(),
751                    }
752                }
753                Err(e) => FragmentResponse::StoreFailed {
754                    reason: format!("Invalid fragment data: {e}"),
755                },
756            }
757        }
758        FragmentRequest::Ping { nonce } => FragmentResponse::Pong { nonce: *nonce },
759        FragmentRequest::ProofOfStorage {
760            chunk_id,
761            fragment_id,
762            nonce,
763        } => {
764            // Load the fragment data and compute BLAKE3(data || nonce.to_le_bytes()).
765            for sm_arc in managers.values() {
766                let sm = sm_arc.read().unwrap();
767                if sm
768                    .index
769                    .fragments_for_chunk(chunk_id)
770                    .iter()
771                    .any(|m| m.fragment_id == *fragment_id)
772                {
773                    match sm.load_fragment(chunk_id, fragment_id) {
774                        Ok(fragment) => {
775                            let data = fragment.to_bytes();
776                            let mut hasher = blake3::Hasher::new();
777                            hasher.update(&data);
778                            hasher.update(&nonce.to_le_bytes());
779                            let proof: [u8; 32] = hasher.finalize().into();
780                            return FragmentResponse::ProofOfStorageOk { proof };
781                        }
782                        Err(e) => {
783                            tracing::warn!(
784                                chunk = %chunk_id,
785                                frag = %fragment_id,
786                                "serve_fragment PoS load error: {e}"
787                            );
788                        }
789                    }
790                }
791            }
792            FragmentResponse::NotFound
793        }
794    }
795}