bp_core/network/
fragment_gossip.rs

1//! Fragment-index gossip — distributed discovery of which Pouch holds which fragment.
2//!
3//! ## Purpose
4//!
5//! When a Bill node runs `PutFile` and distributes RLNC fragments to remote
6//! Pouch peers, it publishes a [`FragmentIndexAnnouncement`] on a dedicated
7//! gossipsub topic.  All nodes that receive the announcement update their
8//! local [`RemoteFragmentIndex`], so they know exactly which Pouch holds
9//! each fragment of a chunk.
10//!
11//! This enables **targeted** `FetchChunk` requests:
12//! instead of broadcasting a [`crate::network::NetworkCommand::FetchChunkFragments`]
13//! to every Pouch in the network, `GetFile` can send a
14//! [`crate::network::NetworkCommand::FetchChunkFragments`] only to the peers
15//! listed in [`RemoteFragmentIndex::pointers_for`].
16//!
17//! ## Topic
18//!
19//! `billpouch/v1/{network_id}/index`
20//!
21//! Separate from the NodeInfo topic (`billpouch/v1/{network_id}/nodes`) so
22//! that consumers can subscribe selectively.
23
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26
27// ── Message types ─────────────────────────────────────────────────────────────
28
29/// A gossipped pointer: which `peer_id` (Pouch) holds `fragment_id`.
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
31pub struct FragmentPointer {
32    /// libp2p PeerId (base58) of the Pouch that stores the fragment.
33    pub peer_id: String,
34    /// UUID of the specific RLNC fragment within the chunk.
35    pub fragment_id: String,
36}
37
38/// Gossip announcement published after a `PutFile` distributes fragments.
39///
40/// Published on [`FragmentIndexAnnouncement::topic_name`] by the Bill node
41/// that executed the distribution.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct FragmentIndexAnnouncement {
44    /// Network this chunk belongs to.
45    pub network_id: String,
46    /// BLAKE3-derived chunk identifier (same as `chunk_id` in `PutFile` response).
47    pub chunk_id: String,
48    /// Unix timestamp (seconds) of this announcement.
49    pub announced_at: u64,
50    /// One entry per distributed fragment: which Pouch holds it.
51    pub pointers: Vec<FragmentPointer>,
52}
53
54impl FragmentIndexAnnouncement {
55    /// Gossipsub topic name for the fragment index of `network_id`.
56    ///
57    /// Format: `billpouch/v1/{network_id}/index`
58    pub fn topic_name(network_id: &str) -> String {
59        format!("billpouch/v1/{}/index", network_id)
60    }
61}
62
63// ── RemoteFragmentIndex ───────────────────────────────────────────────────────
64
65/// In-memory index accumulated from received [`FragmentIndexAnnouncement`]s.
66///
67/// Maps `chunk_id → Vec<FragmentPointer>`.
68///
69/// Held in [`crate::control::server::DaemonState`] under an `Arc<RwLock<>>`.
70#[derive(Debug, Default)]
71pub struct RemoteFragmentIndex {
72    /// chunk_id → list of (peer_id, fragment_id) pointers.
73    inner: HashMap<String, Vec<FragmentPointer>>,
74}
75
76impl RemoteFragmentIndex {
77    /// Create an empty index.
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    /// Incorporate a [`FragmentIndexAnnouncement`].
83    ///
84    /// Merges new pointers into the existing list for the chunk, de-duplicating
85    /// by `fragment_id` so repeated announcements are idempotent.
86    pub fn upsert(&mut self, ann: FragmentIndexAnnouncement) {
87        let entry = self.inner.entry(ann.chunk_id).or_default();
88        for ptr in ann.pointers {
89            if !entry.iter().any(|e| e.fragment_id == ptr.fragment_id) {
90                entry.push(ptr);
91            }
92        }
93    }
94
95    /// All known fragment pointers for a chunk.  Returns an empty slice if
96    /// the chunk has never been announced.
97    pub fn pointers_for(&self, chunk_id: &str) -> &[FragmentPointer] {
98        self.inner
99            .get(chunk_id)
100            .map(Vec::as_slice)
101            .unwrap_or_default()
102    }
103
104    /// Pointers for a chunk hosted by a specific peer.
105    pub fn pointers_for_peer<'a, 'b>(
106        &'a self,
107        chunk_id: &'b str,
108        peer_id: &'b str,
109    ) -> impl Iterator<Item = &'a FragmentPointer> + use<'a, 'b> {
110        self.pointers_for(chunk_id)
111            .iter()
112            .filter(move |p| p.peer_id == peer_id)
113    }
114
115    /// Remove all pointer entries for a given `peer_id` (e.g. after it is
116    /// blacklisted or evicted from the network).
117    pub fn evict_peer(&mut self, peer_id: &str) {
118        for ptrs in self.inner.values_mut() {
119            ptrs.retain(|p| p.peer_id != peer_id);
120        }
121        self.inner.retain(|_, v| !v.is_empty());
122    }
123
124    /// Total number of chunks tracked.
125    pub fn chunk_count(&self) -> usize {
126        self.inner.len()
127    }
128
129    /// Total number of fragment pointers across all chunks.
130    pub fn pointer_count(&self) -> usize {
131        self.inner.values().map(Vec::len).sum()
132    }
133
134    /// All chunk IDs for which at least one pointer is known.
135    pub fn chunk_ids(&self) -> impl Iterator<Item = &str> {
136        self.inner.keys().map(String::as_str)
137    }
138}
139
140// ── Tests ─────────────────────────────────────────────────────────────────────
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    fn make_ann(chunk: &str, pointers: &[(&str, &str)]) -> FragmentIndexAnnouncement {
147        FragmentIndexAnnouncement {
148            network_id: "test".into(),
149            chunk_id: chunk.into(),
150            announced_at: 0,
151            pointers: pointers
152                .iter()
153                .map(|(peer, frag)| FragmentPointer {
154                    peer_id: (*peer).into(),
155                    fragment_id: (*frag).into(),
156                })
157                .collect(),
158        }
159    }
160
161    #[test]
162    fn topic_name_format() {
163        assert_eq!(
164            FragmentIndexAnnouncement::topic_name("amici"),
165            "billpouch/v1/amici/index"
166        );
167    }
168
169    #[test]
170    fn upsert_and_query() {
171        let mut idx = RemoteFragmentIndex::new();
172        idx.upsert(make_ann(
173            "chunk-1",
174            &[("peer-a", "frag-1"), ("peer-b", "frag-2")],
175        ));
176        let ptrs = idx.pointers_for("chunk-1");
177        assert_eq!(ptrs.len(), 2);
178        assert_eq!(idx.chunk_count(), 1);
179        assert_eq!(idx.pointer_count(), 2);
180    }
181
182    #[test]
183    fn upsert_is_idempotent() {
184        let mut idx = RemoteFragmentIndex::new();
185        idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")]));
186        idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")])); // duplicate
187        assert_eq!(idx.pointers_for("chunk-1").len(), 1);
188    }
189
190    #[test]
191    fn upsert_merges_new_frags() {
192        let mut idx = RemoteFragmentIndex::new();
193        idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")]));
194        idx.upsert(make_ann("chunk-1", &[("peer-b", "frag-2")]));
195        assert_eq!(idx.pointers_for("chunk-1").len(), 2);
196    }
197
198    #[test]
199    fn evict_peer_removes_entries() {
200        let mut idx = RemoteFragmentIndex::new();
201        idx.upsert(make_ann(
202            "chunk-1",
203            &[("peer-a", "frag-1"), ("peer-b", "frag-2")],
204        ));
205        idx.evict_peer("peer-a");
206        let ptrs = idx.pointers_for("chunk-1");
207        assert_eq!(ptrs.len(), 1);
208        assert_eq!(ptrs[0].peer_id, "peer-b");
209    }
210
211    #[test]
212    fn evict_last_peer_removes_chunk() {
213        let mut idx = RemoteFragmentIndex::new();
214        idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")]));
215        idx.evict_peer("peer-a");
216        assert_eq!(idx.chunk_count(), 0);
217    }
218
219    #[test]
220    fn pointers_for_unknown_chunk_is_empty() {
221        let idx = RemoteFragmentIndex::new();
222        assert!(idx.pointers_for("unknown").is_empty());
223    }
224
225    #[test]
226    fn pointers_for_peer_filters_correctly() {
227        let mut idx = RemoteFragmentIndex::new();
228        idx.upsert(make_ann(
229            "chunk-1",
230            &[
231                ("peer-a", "frag-1"),
232                ("peer-b", "frag-2"),
233                ("peer-a", "frag-3"),
234            ],
235        ));
236        let count = idx.pointers_for_peer("chunk-1", "peer-a").count();
237        assert_eq!(count, 2);
238    }
239}