bp_core/network/
fragment_gossip.rs1use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
31pub struct FragmentPointer {
32 pub peer_id: String,
34 pub fragment_id: String,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct FragmentIndexAnnouncement {
44 pub network_id: String,
46 pub chunk_id: String,
48 pub announced_at: u64,
50 pub pointers: Vec<FragmentPointer>,
52}
53
54impl FragmentIndexAnnouncement {
55 pub fn topic_name(network_id: &str) -> String {
59 format!("billpouch/v1/{}/index", network_id)
60 }
61}
62
63#[derive(Debug, Default)]
71pub struct RemoteFragmentIndex {
72 inner: HashMap<String, Vec<FragmentPointer>>,
74}
75
76impl RemoteFragmentIndex {
77 pub fn new() -> Self {
79 Self::default()
80 }
81
82 pub fn upsert(&mut self, ann: FragmentIndexAnnouncement) {
87 let entry = self.inner.entry(ann.chunk_id).or_default();
88 for ptr in ann.pointers {
89 if !entry.iter().any(|e| e.fragment_id == ptr.fragment_id) {
90 entry.push(ptr);
91 }
92 }
93 }
94
95 pub fn pointers_for(&self, chunk_id: &str) -> &[FragmentPointer] {
98 self.inner
99 .get(chunk_id)
100 .map(Vec::as_slice)
101 .unwrap_or_default()
102 }
103
104 pub fn pointers_for_peer<'a, 'b>(
106 &'a self,
107 chunk_id: &'b str,
108 peer_id: &'b str,
109 ) -> impl Iterator<Item = &'a FragmentPointer> + use<'a, 'b> {
110 self.pointers_for(chunk_id)
111 .iter()
112 .filter(move |p| p.peer_id == peer_id)
113 }
114
115 pub fn evict_peer(&mut self, peer_id: &str) {
118 for ptrs in self.inner.values_mut() {
119 ptrs.retain(|p| p.peer_id != peer_id);
120 }
121 self.inner.retain(|_, v| !v.is_empty());
122 }
123
124 pub fn chunk_count(&self) -> usize {
126 self.inner.len()
127 }
128
129 pub fn pointer_count(&self) -> usize {
131 self.inner.values().map(Vec::len).sum()
132 }
133
134 pub fn chunk_ids(&self) -> impl Iterator<Item = &str> {
136 self.inner.keys().map(String::as_str)
137 }
138}
139
140#[cfg(test)]
143mod tests {
144 use super::*;
145
146 fn make_ann(chunk: &str, pointers: &[(&str, &str)]) -> FragmentIndexAnnouncement {
147 FragmentIndexAnnouncement {
148 network_id: "test".into(),
149 chunk_id: chunk.into(),
150 announced_at: 0,
151 pointers: pointers
152 .iter()
153 .map(|(peer, frag)| FragmentPointer {
154 peer_id: (*peer).into(),
155 fragment_id: (*frag).into(),
156 })
157 .collect(),
158 }
159 }
160
161 #[test]
162 fn topic_name_format() {
163 assert_eq!(
164 FragmentIndexAnnouncement::topic_name("amici"),
165 "billpouch/v1/amici/index"
166 );
167 }
168
169 #[test]
170 fn upsert_and_query() {
171 let mut idx = RemoteFragmentIndex::new();
172 idx.upsert(make_ann(
173 "chunk-1",
174 &[("peer-a", "frag-1"), ("peer-b", "frag-2")],
175 ));
176 let ptrs = idx.pointers_for("chunk-1");
177 assert_eq!(ptrs.len(), 2);
178 assert_eq!(idx.chunk_count(), 1);
179 assert_eq!(idx.pointer_count(), 2);
180 }
181
182 #[test]
183 fn upsert_is_idempotent() {
184 let mut idx = RemoteFragmentIndex::new();
185 idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")]));
186 idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")])); assert_eq!(idx.pointers_for("chunk-1").len(), 1);
188 }
189
190 #[test]
191 fn upsert_merges_new_frags() {
192 let mut idx = RemoteFragmentIndex::new();
193 idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")]));
194 idx.upsert(make_ann("chunk-1", &[("peer-b", "frag-2")]));
195 assert_eq!(idx.pointers_for("chunk-1").len(), 2);
196 }
197
198 #[test]
199 fn evict_peer_removes_entries() {
200 let mut idx = RemoteFragmentIndex::new();
201 idx.upsert(make_ann(
202 "chunk-1",
203 &[("peer-a", "frag-1"), ("peer-b", "frag-2")],
204 ));
205 idx.evict_peer("peer-a");
206 let ptrs = idx.pointers_for("chunk-1");
207 assert_eq!(ptrs.len(), 1);
208 assert_eq!(ptrs[0].peer_id, "peer-b");
209 }
210
211 #[test]
212 fn evict_last_peer_removes_chunk() {
213 let mut idx = RemoteFragmentIndex::new();
214 idx.upsert(make_ann("chunk-1", &[("peer-a", "frag-1")]));
215 idx.evict_peer("peer-a");
216 assert_eq!(idx.chunk_count(), 0);
217 }
218
219 #[test]
220 fn pointers_for_unknown_chunk_is_empty() {
221 let idx = RemoteFragmentIndex::new();
222 assert!(idx.pointers_for("unknown").is_empty());
223 }
224
225 #[test]
226 fn pointers_for_peer_filters_correctly() {
227 let mut idx = RemoteFragmentIndex::new();
228 idx.upsert(make_ann(
229 "chunk-1",
230 &[
231 ("peer-a", "frag-1"),
232 ("peer-b", "frag-2"),
233 ("peer-a", "frag-3"),
234 ],
235 ));
236 let count = idx.pointers_for_peer("chunk-1", "peer-a").count();
237 assert_eq!(count, 2);
238 }
239}