1use crate::{
6 coding::{params as coding_params, rlnc},
7 control::protocol::{
8 ControlRequest, ControlResponse, FileEntry, FlockData, GetFileData, HatchData, InviteData,
9 ListFilesData, NetworkQosSummary, PouchStat, PutFileData, StatusData, StorageInfoData,
10 },
11 error::BpResult,
12 identity::Identity,
13 network::{
14 fragment_gossip::{FragmentIndexAnnouncement, FragmentPointer, RemoteFragmentIndex},
15 state::NodeInfo,
16 FragmentResponse, NetworkCommand, OutgoingAssignments, QosRegistry, ReputationStore,
17 StorageManagerMap,
18 },
19 service::{ServiceInfo, ServiceRegistry, ServiceStatus, ServiceType},
20 storage::{ChunkCipher, FileRegistry, StorageManager},
21};
22use libp2p::{Multiaddr, PeerId};
23use std::{
24 collections::HashMap,
25 path::Path,
26 sync::{Arc, RwLock},
27 time::{SystemTime, UNIX_EPOCH},
28};
29use tokio::{
30 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
31 net::{UnixListener, UnixStream},
32 sync::mpsc,
33};
34
35pub fn load_cek_hints() -> HashMap<String, [u8; 32]> {
43 let path = match crate::config::cek_hints_path() {
44 Ok(p) => p,
45 Err(e) => {
46 tracing::warn!("cek_hints_path error: {e}");
47 return HashMap::new();
48 }
49 };
50 load_cek_hints_at(&path)
51}
52
53fn load_cek_hints_at(path: &std::path::Path) -> HashMap<String, [u8; 32]> {
55 if !path.exists() {
56 return HashMap::new();
57 }
58 let data = match std::fs::read_to_string(path) {
59 Ok(d) => d,
60 Err(e) => {
61 tracing::warn!("Failed to read cek_hints: {e}");
62 return HashMap::new();
63 }
64 };
65 let hex_map: HashMap<String, String> = match serde_json::from_str(&data) {
66 Ok(m) => m,
67 Err(e) => {
68 tracing::warn!("Failed to parse cek_hints: {e}");
69 return HashMap::new();
70 }
71 };
72 hex_map
73 .into_iter()
74 .filter_map(|(k, v)| {
75 let bytes = hex::decode(&v).ok()?;
76 if bytes.len() != 32 {
77 return None;
78 }
79 let mut arr = [0u8; 32];
80 arr.copy_from_slice(&bytes);
81 Some((k, arr))
82 })
83 .collect()
84}
85
86fn persist_cek_hints(hints: &HashMap<String, [u8; 32]>) {
92 let path = match crate::config::cek_hints_path() {
93 Ok(p) => p,
94 Err(e) => {
95 tracing::warn!("cek_hints_path error: {e}");
96 return;
97 }
98 };
99 persist_cek_hints_at(&path, hints);
100}
101
102fn persist_cek_hints_at(path: &std::path::Path, hints: &HashMap<String, [u8; 32]>) {
104 if let Some(parent) = path.parent() {
106 if let Err(e) = std::fs::create_dir_all(parent) {
107 tracing::warn!("Failed to create cek_hints dir: {e}");
108 return;
109 }
110 }
111 let hex_map: HashMap<&str, String> = hints
112 .iter()
113 .map(|(k, v)| (k.as_str(), hex::encode(v)))
114 .collect();
115 match serde_json::to_string(&hex_map) {
116 Ok(json) => {
117 if let Err(e) = std::fs::write(path, json) {
118 tracing::warn!("Failed to write cek_hints: {e}");
119 }
120 }
121 Err(e) => tracing::warn!("Failed to serialize cek_hints: {e}"),
122 }
123}
124
125pub struct DaemonState {
127 pub identity: Identity,
128 pub services: RwLock<ServiceRegistry>,
129 pub network_state: Arc<RwLock<crate::network::state::NetworkState>>,
130 pub networks: RwLock<Vec<String>>,
131 pub net_tx: mpsc::Sender<NetworkCommand>,
132 pub storage_managers: StorageManagerMap,
137 pub qos: Arc<RwLock<QosRegistry>>,
140 pub outgoing_assignments: OutgoingAssignments,
145 pub remote_fragment_index: Arc<RwLock<RemoteFragmentIndex>>,
152 pub chunk_cek_hints: RwLock<HashMap<String, [u8; 32]>>,
158 pub reputation: RwLock<ReputationStore>,
162 pub file_registry: RwLock<FileRegistry>,
167}
168
169pub async fn run_control_server(
171 socket_path: impl AsRef<Path>,
172 state: Arc<DaemonState>,
173) -> BpResult<()> {
174 let path = socket_path.as_ref();
176 if path.exists() {
177 std::fs::remove_file(path).ok();
178 }
179
180 let listener =
181 UnixListener::bind(path).map_err(|e| crate::error::BpError::Control(e.to_string()))?;
182
183 tracing::info!("Control socket listening at {:?}", path);
184
185 loop {
186 match listener.accept().await {
187 Ok((stream, _)) => {
188 let state = Arc::clone(&state);
189 tokio::spawn(async move {
190 if let Err(e) = handle_connection(stream, state).await {
191 tracing::warn!("Control connection error: {}", e);
192 }
193 });
194 }
195 Err(e) => {
196 tracing::error!("Accept error on control socket: {}", e);
197 }
198 }
199 }
200}
201
202async fn handle_connection(stream: UnixStream, state: Arc<DaemonState>) -> anyhow::Result<()> {
203 let (read_half, mut write_half) = stream.into_split();
204 let mut reader = BufReader::new(read_half);
205 let mut line = String::new();
206
207 reader.read_line(&mut line).await?;
208 let line = line.trim();
209 if line.is_empty() {
210 return Ok(());
211 }
212
213 let request: ControlRequest = match serde_json::from_str(line) {
214 Ok(r) => r,
215 Err(e) => {
216 let resp = ControlResponse::err(format!("Invalid request JSON: {}", e));
217 send_response(&mut write_half, &resp).await?;
218 return Ok(());
219 }
220 };
221
222 let response = dispatch(request, &state).await;
223 send_response(&mut write_half, &response).await?;
224 Ok(())
225}
226
227async fn send_response(
228 writer: &mut tokio::net::unix::OwnedWriteHalf,
229 resp: &ControlResponse,
230) -> anyhow::Result<()> {
231 let mut json = serde_json::to_string(resp)?;
232 json.push('\n');
233 writer.write_all(json.as_bytes()).await?;
234 Ok(())
235}
236
237async fn dispatch(req: ControlRequest, state: &Arc<DaemonState>) -> ControlResponse {
239 match req {
240 ControlRequest::Ping => ControlResponse::ok("pong"),
242
243 ControlRequest::Status => {
245 let services = state.services.read().unwrap();
246 let networks = state.networks.read().unwrap();
247 let ns = state.network_state.read().unwrap();
248 let own_peer = state.identity.peer_id.to_string();
249
250 let (rep_tier, rep_score, avail_factor) = {
252 let (tier_str, score, tier_enum) = {
253 let rep = state.reputation.read().unwrap();
254 match rep.get(&own_peer) {
255 Some(r) => (r.tier.to_string(), r.reputation_score, r.tier),
256 None => (
257 crate::network::ReputationTier::R1.to_string(),
258 0i64,
259 crate::network::ReputationTier::R1,
260 ),
261 }
262 };
263 let factor = if tier_enum == crate::network::ReputationTier::R0 {
266 0.0_f64
267 } else {
268 let ph = tier_enum.qos_target_ph();
269 let qos = state.qos.read().unwrap();
270 let own_score = qos
271 .get(&own_peer)
272 .map(|q| q.stability_score())
273 .unwrap_or(0.4);
274 let mut stabilities = qos.all_stability_scores();
275 stabilities.push(own_score);
276 crate::coding::params::compute_network_storage_factor(&stabilities, ph)
277 };
278 (tier_str, score, factor)
279 };
280
281 let pouch_stats: Vec<crate::control::protocol::PouchStat> = {
283 let managers = state.storage_managers.read().unwrap();
284 services
285 .all()
286 .iter()
287 .filter(|s| s.service_type == ServiceType::Pouch)
288 .map(|s| {
289 let tier_label = s.metadata.get("tier").and_then(|v| v.as_str()).map(|t| {
290 format!(
291 "{} — {}",
292 t,
293 match t {
294 "T1" => "Pebble",
295 "T2" => "Stone",
296 "T3" => "Boulder",
297 "T4" => "Rock",
298 "T5" => "Monolith",
299 _ => "?",
300 }
301 )
302 });
303 if let Some(sm_lock) = managers.get(&s.id) {
304 let sm = sm_lock.read().unwrap();
305 crate::control::protocol::PouchStat {
306 service_id: s.id.clone(),
307 network_id: s.network_id.clone(),
308 storage_tier: tier_label,
309 storage_bid_bytes: sm.meta.storage_bytes_bid,
310 storage_used_bytes: sm.meta.storage_bytes_used,
311 available_bytes: ((sm.meta.available_bytes() as f64) * avail_factor)
312 .round()
313 as u64,
314 }
315 } else {
316 crate::control::protocol::PouchStat {
317 service_id: s.id.clone(),
318 network_id: s.network_id.clone(),
319 storage_tier: tier_label,
320 storage_bid_bytes: 0,
321 storage_used_bytes: 0,
322 available_bytes: 0,
323 }
324 }
325 })
326 .collect()
327 };
328
329 let qos_peer_count = state.qos.read().unwrap().peer_count();
331 let network_qos = {
332 let qos = state.qos.read().unwrap();
333 let rep = state.reputation.read().unwrap();
334 let peer_count = qos.peer_count();
335 if peer_count == 0 {
336 None
337 } else {
338 let scores = qos.all_stability_scores();
339 let avg_stability = if scores.is_empty() {
340 0.0
341 } else {
342 scores.iter().sum::<f64>() / scores.len() as f64
343 };
344 let mut tier_counts: std::collections::HashMap<String, usize> =
345 std::collections::HashMap::new();
346 for peer_id in qos.peer_ids() {
347 let tier = rep.tier(peer_id).to_string();
348 *tier_counts.entry(tier).or_insert(0) += 1;
349 }
350 Some(NetworkQosSummary {
351 observed_peers: peer_count,
352 avg_stability,
353 tier_counts,
354 })
355 }
356 };
357
358 let data = StatusData {
359 peer_id: own_peer,
360 fingerprint: state.identity.fingerprint.clone(),
361 alias: state.identity.profile.alias.clone(),
362 local_services: services.all().into_iter().cloned().collect(),
363 networks: networks.clone(),
364 known_peers: ns.len().max(qos_peer_count),
365 version: env!("CARGO_PKG_VERSION").to_string(),
366 reputation_tier: rep_tier,
367 reputation_score: rep_score,
368 pouch_stats,
369 network_qos,
370 };
371 ControlResponse::ok(data)
372 }
373
374 ControlRequest::AnnounceNow => {
376 let svcs: Vec<(String, ServiceType, String)> = {
379 state
380 .services
381 .read()
382 .unwrap()
383 .all()
384 .iter()
385 .filter(|s| s.status == ServiceStatus::Running)
386 .map(|s| (s.id.clone(), s.service_type, s.network_id.clone()))
387 .collect()
388 };
389 if svcs.is_empty() {
390 tracing::debug!("AnnounceNow: no running services, skipping");
391 } else {
392 for (id, stype, net) in &svcs {
393 announce_self(state, id, *stype, net).await;
394 }
395 tracing::debug!(
396 "AnnounceNow: announced {} service(s), waiting 2s",
397 svcs.len()
398 );
399 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
400 }
401 ControlResponse::ok("announced")
402 }
403
404 ControlRequest::Hatch {
406 service_type,
407 network_id,
408 metadata,
409 } => {
410 let already_joined = {
412 let nets = state.networks.read().unwrap();
413 nets.contains(&network_id)
414 };
415 if !already_joined {
416 let _ = state
417 .net_tx
418 .send(NetworkCommand::JoinNetwork {
419 network_id: network_id.clone(),
420 })
421 .await;
422 state.networks.write().unwrap().push(network_id.clone());
423 }
424
425 if service_type == ServiceType::Pouch {
428 let already_exists =
429 state.services.read().unwrap().all().iter().any(|s| {
430 s.service_type == ServiceType::Pouch && s.network_id == network_id
431 });
432 if already_exists {
433 return ControlResponse::err(format!(
434 "a Pouch for network '{}' already exists on this node; \
435 two correlated Pouches defeat distributed redundancy",
436 network_id
437 ));
438 }
439 }
440
441 let info = ServiceInfo::new(service_type, network_id.clone(), metadata.clone());
442 let service_id = info.id.clone();
443
444 {
447 let mut reg = state.services.write().unwrap();
448 let mut running_info = info.clone();
449 running_info.status = ServiceStatus::Running;
450 reg.register(running_info);
451 } if service_type == ServiceType::Pouch {
455 let quota = metadata
456 .get("storage_bytes")
457 .or_else(|| metadata.get("storage_bytes_bid"))
458 .and_then(|v| v.as_u64())
459 .unwrap_or(0);
460
461 match StorageManager::init(network_id.clone(), service_id.clone(), quota) {
462 Ok(sm) => {
463 state
464 .storage_managers
465 .write()
466 .unwrap()
467 .insert(service_id.clone(), Arc::new(RwLock::new(sm)));
468 tracing::info!(service_id=%service_id, quota_bytes=%quota, "StorageManager initialised");
469 }
470 Err(e) => {
471 tracing::warn!("Failed to init StorageManager for {}: {}", service_id, e);
472 }
473 }
474 }
475
476 announce_self(state, &service_id, service_type, &network_id).await;
478
479 ControlResponse::ok(HatchData {
480 service_id: service_id.clone(),
481 service_type,
482 network_id,
483 message: format!("🦤 {} service hatched — id: {}", service_type, service_id),
484 })
485 }
486
487 ControlRequest::Farewell { service_id } => {
489 let mut reg = state.services.write().unwrap();
490 match reg.remove(&service_id) {
491 Some(info) => ControlResponse::ok(serde_json::json!({
492 "service_id": info.id,
493 "service_type": info.service_type,
494 "message": format!("Service {} stopped", info.id),
495 })),
496 None => ControlResponse::err(format!("No service with id '{}'", service_id)),
497 }
498 }
499 ControlRequest::FarewellEvict { service_id } => {
501 let now_secs = SystemTime::now()
502 .duration_since(UNIX_EPOCH)
503 .unwrap_or_default()
504 .as_secs();
505
506 let (network_id, service_type, peer_id_str) = {
508 let reg = state.services.read().unwrap();
509 match reg.get(&service_id) {
510 Some(info) => (
511 info.network_id.clone(),
512 info.service_type,
513 state.identity.peer_id.to_string(),
514 ),
515 None => {
516 return ControlResponse::err(format!(
517 "No service with id '{}'",
518 service_id
519 ));
520 }
521 }
522 };
523
524 let (chunk_count, fragment_count, bytes_freed) = {
526 let mut managers = state.storage_managers.write().unwrap();
527 if let Some(sm_lock) = managers.remove(&service_id) {
528 let sm = sm_lock.read().unwrap();
529 let summary = sm.storage_summary();
530 if let Err(e) = sm.purge() {
531 tracing::warn!("Failed to purge storage for {}: {}", service_id, e);
532 }
533 summary
534 } else {
535 (0, 0, 0)
536 }
537 };
538
539 {
541 let mut meta = HashMap::new();
542 meta.insert("evicting".to_string(), serde_json::Value::Bool(true));
543 let info = NodeInfo {
544 peer_id: peer_id_str,
545 user_fingerprint: state.identity.fingerprint.clone(),
546 user_alias: state.identity.profile.alias.clone(),
547 service_type,
548 service_id: service_id.clone(),
549 network_id: network_id.clone(),
550 listen_addrs: vec![],
551 announced_at: now_secs,
552 metadata: meta,
553 };
554 if let Ok(payload) = serde_json::to_vec(&info) {
555 let _ = state
556 .net_tx
557 .send(NetworkCommand::Announce {
558 network_id: network_id.clone(),
559 payload,
560 })
561 .await;
562 }
563 }
564
565 {
567 let local_peer = state.identity.peer_id.to_string();
568 let mut rep = state.reputation.write().unwrap();
569 rep.get_or_create(&local_peer, now_secs)
570 .evict_without_notice(now_secs);
571 }
572
573 state.services.write().unwrap().remove(&service_id);
575
576 tracing::info!(
577 service_id = %service_id,
578 chunks = chunk_count,
579 fragments = fragment_count,
580 bytes = bytes_freed,
581 "Pouch evicted"
582 );
583
584 ControlResponse::ok(serde_json::json!({
585 "service_id": service_id,
586 "network_id": network_id,
587 "chunks_removed": chunk_count,
588 "fragments_removed": fragment_count,
589 "bytes_freed": bytes_freed,
590 "message": format!(
591 "Pouch {} evicted — {} chunk(s), {} fragment(s), {} bytes freed. \
592 Network peers will rebalance via Proof-of-Storage.",
593 service_id, chunk_count, fragment_count, bytes_freed
594 ),
595 }))
596 }
597 ControlRequest::Pause {
599 service_id,
600 eta_minutes,
601 } => {
602 let now_secs = SystemTime::now()
603 .duration_since(UNIX_EPOCH)
604 .unwrap_or_default()
605 .as_secs();
606
607 let (network_id, service_type) = {
608 let mut reg = state.services.write().unwrap();
609 match reg.get_mut(&service_id) {
610 Some(info) => {
611 if info.status == crate::service::ServiceStatus::Stopped
612 || matches!(info.status, crate::service::ServiceStatus::Stopping)
613 {
614 return ControlResponse::err(format!(
615 "Service '{}' is not running",
616 service_id
617 ));
618 }
619 info.status = crate::service::ServiceStatus::Paused {
620 eta_minutes,
621 paused_at: now_secs,
622 };
623 info.metadata
624 .insert("maintenance".into(), serde_json::Value::Bool(true));
625 info.metadata
626 .insert("eta_minutes".into(), serde_json::Value::from(eta_minutes));
627 (info.network_id.clone(), info.service_type)
628 }
629 None => {
630 return ControlResponse::err(format!(
631 "No service with id '{}'",
632 service_id
633 ));
634 }
635 }
636 };
637
638 announce_self(state, &service_id, service_type, &network_id).await;
639
640 ControlResponse::ok(serde_json::json!({
641 "service_id": service_id,
642 "status": "paused",
643 "eta_minutes": eta_minutes,
644 "message": format!(
645 "Service {} paused — announcing maintenance (ETA {} min)",
646 service_id, eta_minutes
647 ),
648 }))
649 }
650
651 ControlRequest::Resume { service_id } => {
653 let (network_id, service_type) = {
654 let mut reg = state.services.write().unwrap();
655 match reg.get_mut(&service_id) {
656 Some(info) => {
657 if !matches!(info.status, crate::service::ServiceStatus::Paused { .. }) {
658 return ControlResponse::err(format!(
659 "Service '{}' is not paused",
660 service_id
661 ));
662 }
663 info.status = crate::service::ServiceStatus::Running;
664 info.metadata.remove("maintenance");
665 info.metadata.remove("eta_minutes");
666 (info.network_id.clone(), info.service_type)
667 }
668 None => {
669 return ControlResponse::err(format!(
670 "No service with id '{}'",
671 service_id
672 ));
673 }
674 }
675 };
676
677 announce_self(state, &service_id, service_type, &network_id).await;
678
679 ControlResponse::ok(serde_json::json!({
680 "service_id": service_id,
681 "status": "running",
682 "message": format!("Service {} resumed", service_id),
683 }))
684 }
685
686 ControlRequest::Flock => {
688 let services = state.services.read().unwrap();
689 let networks = state.networks.read().unwrap();
690 let ns = state.network_state.read().unwrap();
691 let data = FlockData {
692 local_services: services.all().into_iter().cloned().collect(),
693 known_peers: ns.all().into_iter().cloned().collect(),
694 networks: networks.clone(),
695 peer_count: ns.len(),
696 };
697 ControlResponse::ok(data)
698 }
699
700 ControlRequest::Join { network_id } => {
702 {
703 let nets = state.networks.read().unwrap();
704 if nets.contains(&network_id) {
705 return ControlResponse::err(format!(
706 "Already a member of network '{}'",
707 network_id
708 ));
709 }
710 }
711 if let Err(e) = crate::storage::manifest::NetworkMetaKey::load_or_create(&network_id) {
715 tracing::warn!(network = %network_id, "Failed to ensure NetworkMetaKey: {e}");
716 }
717 let _ = state
718 .net_tx
719 .send(NetworkCommand::JoinNetwork {
720 network_id: network_id.clone(),
721 })
722 .await;
723 state.networks.write().unwrap().push(network_id.clone());
724 ControlResponse::ok(serde_json::json!({
725 "network_id": network_id,
726 "message": format!("Joined network '{}'", network_id),
727 }))
728 }
729
730 ControlRequest::Leave { network_id, force } => {
732 let blocking: Vec<ServiceInfo> = {
734 let reg = state.services.read().unwrap();
735 reg.all()
736 .iter()
737 .filter(|s| s.network_id == network_id)
738 .map(|s| (*s).clone())
739 .collect()
740 };
741
742 if !blocking.is_empty() && !force {
743 let blocking_json: Vec<serde_json::Value> = blocking
745 .iter()
746 .map(|s| {
747 let hint = if s.service_type == ServiceType::Pouch {
748 format!("bp farewell {} --evict", s.id)
749 } else {
750 format!("bp farewell {}", s.id)
751 };
752 serde_json::json!({
753 "id": s.id,
754 "type": s.service_type.to_string(),
755 "hint": hint,
756 })
757 })
758 .collect();
759 return ControlResponse::ok(serde_json::json!({
760 "network_id": network_id,
761 "blocked": true,
762 "blocking_services": blocking_json,
763 "message": format!(
764 "Cannot leave '{}': {} active service(s) must be stopped first \
765 (see 'blocking_services' for commands, or use --force to auto-evict)",
766 network_id,
767 blocking.len()
768 ),
769 }));
770 }
771
772 let now_secs = SystemTime::now()
774 .duration_since(UNIX_EPOCH)
775 .unwrap_or_default()
776 .as_secs();
777 let mut evicted: Vec<serde_json::Value> = Vec::new();
778
779 for svc in &blocking {
780 let service_id = svc.id.clone();
781 let service_type = svc.service_type;
782 let peer_id_str = state.identity.peer_id.to_string();
783
784 if service_type == ServiceType::Pouch {
785 {
787 let mut managers = state.storage_managers.write().unwrap();
788 if let Some(sm_lock) = managers.remove(&service_id) {
789 let sm = sm_lock.read().unwrap();
790 if let Err(e) = sm.purge() {
791 tracing::warn!(
792 "leave --force: failed to purge storage for {}: {}",
793 service_id,
794 e
795 );
796 }
797 }
798 }
799 {
801 let mut meta = HashMap::new();
802 meta.insert("evicting".to_string(), serde_json::Value::Bool(true));
803 let info = NodeInfo {
804 peer_id: peer_id_str.clone(),
805 user_fingerprint: state.identity.fingerprint.clone(),
806 user_alias: state.identity.profile.alias.clone(),
807 service_type,
808 service_id: service_id.clone(),
809 network_id: network_id.clone(),
810 listen_addrs: vec![],
811 announced_at: now_secs,
812 metadata: meta,
813 };
814 if let Ok(payload) = serde_json::to_vec(&info) {
815 let _ = state
816 .net_tx
817 .send(NetworkCommand::Announce {
818 network_id: network_id.clone(),
819 payload,
820 })
821 .await;
822 }
823 }
824 {
826 let mut rep = state.reputation.write().unwrap();
827 rep.get_or_create(&peer_id_str, now_secs)
828 .evict_without_notice(now_secs);
829 }
830 tracing::info!(
831 service_id = %service_id,
832 network_id = %network_id,
833 "Pouch auto-evicted by leave --force"
834 );
835 } else {
836 tracing::info!(
837 service_id = %service_id,
838 service_type = %service_type,
839 network_id = %network_id,
840 "service stopped by leave --force"
841 );
842 }
843
844 state.services.write().unwrap().remove(&service_id);
846 evicted.push(serde_json::json!({
847 "service_id": service_id,
848 "service_type": service_type.to_string(),
849 "evicted": service_type == ServiceType::Pouch,
850 }));
851 }
852
853 let _ = state
855 .net_tx
856 .send(NetworkCommand::LeaveNetwork {
857 network_id: network_id.clone(),
858 })
859 .await;
860 state.networks.write().unwrap().retain(|n| n != &network_id);
861
862 let msg = if evicted.is_empty() {
863 format!("Left network '{}'", network_id)
864 } else {
865 format!(
866 "Left network '{}' — {} service(s) auto-evicted",
867 network_id,
868 evicted.len()
869 )
870 };
871 ControlResponse::ok(serde_json::json!({
872 "network_id": network_id,
873 "services_auto_evicted": evicted,
874 "message": msg,
875 }))
876 }
877
878 ControlRequest::ConnectRelay { relay_addr } => {
880 let addr: Multiaddr = match relay_addr.parse() {
881 Ok(a) => a,
882 Err(e) => {
883 return ControlResponse::err(format!("Invalid relay multiaddr: {}", e));
884 }
885 };
886 let _ = state
887 .net_tx
888 .send(NetworkCommand::DialRelay { relay_addr: addr })
889 .await;
890 ControlResponse::ok(serde_json::json!({
891 "relay_addr": relay_addr,
892 "message": format!("Dialing relay '{}'", relay_addr),
893 }))
894 }
895 ControlRequest::CreateInvite {
897 network_id,
898 invitee_fingerprint,
899 invite_password,
900 ttl_hours,
901 } => {
902 let ttl = ttl_hours.unwrap_or(24);
903 let expires_at =
904 (chrono::Utc::now() + chrono::Duration::hours(ttl as i64)).timestamp() as u64;
905 match crate::invite::create_invite(
906 &state.identity,
907 &network_id,
908 invitee_fingerprint,
909 ttl,
910 &invite_password,
911 ) {
912 Ok(blob) => ControlResponse::ok(InviteData {
913 blob,
914 network_id,
915 expires_at,
916 inviter_fingerprint: state.identity.fingerprint.clone(),
917 }),
918 Err(e) => ControlResponse::err(format!("Failed to create invite: {e}")),
919 }
920 }
921 ControlRequest::PutFile {
923 chunk_data,
924 ph,
925 q_target,
926 network_id,
927 file_name,
928 } => {
929 let ph = ph.unwrap_or(0.999);
930 let q_target = q_target.unwrap_or(1.0);
931
932 let managers_snap: Vec<(String, Arc<RwLock<StorageManager>>)> = {
934 let map = state.storage_managers.read().unwrap();
935 map.iter()
936 .map(|(id, sm)| (id.clone(), Arc::clone(sm)))
937 .collect()
938 };
939
940 let candidates: Vec<_> = managers_snap
942 .iter()
943 .filter(|(_, sm)| {
944 let meta = &sm.read().unwrap().meta;
945 network_id.is_empty() || meta.network_id == network_id
946 })
947 .collect();
948
949 let has_local_pouch = !candidates.is_empty();
953
954 let pouch_peer_ids: Vec<String> = {
956 let ns = state.network_state.read().unwrap();
957 ns.in_network(&network_id)
958 .into_iter()
959 .filter(|n| n.service_type == ServiceType::Pouch)
960 .map(|n| n.peer_id.clone())
961 .collect()
962 };
963
964 let stabilities: Vec<f64> = {
967 let qos = state.qos.read().unwrap();
968 if pouch_peer_ids.is_empty() {
969 vec![0.8; candidates.len().max(1)]
970 } else {
971 pouch_peer_ids
972 .iter()
973 .map(|pid| match qos.get(pid) {
974 Some(p) if p.is_observed() => p.stability_score(),
975 _ => 0.8,
976 })
977 .collect()
978 }
979 };
980
981 let (k, n, q, pe) =
982 match coding_params::compute_coding_params(&stabilities, ph, q_target) {
983 Ok(p) => {
984 let pe = coding_params::effective_recovery_probability(&stabilities, p.k);
985 (p.k, p.n, p.q, pe)
986 }
987 Err(e) => {
988 tracing::warn!("compute_coding_params failed ({e}), using fallback");
989 let peer_n = stabilities.len().max(2);
990 let fallback_k = (peer_n / 2).max(1);
991 let fallback_q = (peer_n - fallback_k) as f64 / fallback_k as f64;
992 let pe =
993 coding_params::effective_recovery_probability(&stabilities, fallback_k);
994 (fallback_k, peer_n, fallback_q, pe)
995 }
996 };
997
998 tracing::info!(
999 network = %network_id, k = %k, n = %n, q = %q,
1000 ph = %ph, pe = %pe, peers = %stabilities.len(),
1001 "PutFile: computed coding params"
1002 );
1003
1004 let plaintext_hash: [u8; 32] = *blake3::hash(&chunk_data).as_bytes();
1008 let secret_mat = state.identity.secret_material();
1009 let cipher = ChunkCipher::for_user(&secret_mat, &plaintext_hash);
1010 let encrypted_chunk = match cipher.encrypt(&chunk_data) {
1011 Ok(b) => b,
1012 Err(e) => return ControlResponse::err(format!("Chunk encryption error: {e}")),
1013 };
1014
1015 let fragments = match rlnc::encode(&encrypted_chunk, k, n) {
1016 Ok(f) => f,
1017 Err(e) => return ControlResponse::err(format!("Encode error: {e}")),
1018 };
1019
1020 let chunk_id = match fragments.first() {
1021 Some(f) => f.chunk_id.clone(),
1022 None => return ControlResponse::err("Encoding produced no fragments"),
1023 };
1024
1025 state
1027 .chunk_cek_hints
1028 .write()
1029 .unwrap()
1030 .insert(chunk_id.clone(), plaintext_hash);
1031 {
1033 let hints = state.chunk_cek_hints.read().unwrap();
1034 persist_cek_hints(&hints);
1035 }
1036
1037 let mut stored = 0usize;
1039 if has_local_pouch {
1040 let (_, sm_arc) = candidates[0];
1041 let mut sm = sm_arc.write().unwrap();
1042 for fragment in &fragments {
1043 match sm.store_fragment(fragment) {
1044 Ok(()) => stored += 1,
1045 Err(e) => {
1046 tracing::warn!(chunk_id=%chunk_id, "store_fragment failed: {e}");
1047 break;
1048 }
1049 }
1050 }
1051 } let remote_pouches: Vec<PeerId> = {
1055 let ns = state.network_state.read().unwrap();
1056 let local_peer = state.identity.peer_id.to_string();
1057 ns.in_network(&network_id)
1058 .into_iter()
1059 .filter(|n| n.service_type == ServiceType::Pouch && n.peer_id != local_peer)
1060 .filter_map(|n| n.peer_id.parse::<PeerId>().ok())
1061 .collect()
1062 };
1063
1064 if stored == 0 && remote_pouches.is_empty() {
1066 return ControlResponse::err(format!(
1067 "No Pouches available on network '{}' — hatch a Pouch or wait for Pouch peers to join",
1068 network_id
1069 ));
1070 }
1071
1072 let mut distributed = 0usize;
1073 let mut index_pointers: Vec<FragmentPointer> = Vec::new();
1074 if !remote_pouches.is_empty() {
1075 for (i, fragment) in fragments.iter().enumerate() {
1076 let peer = remote_pouches[i % remote_pouches.len()];
1077 if state
1078 .net_tx
1079 .send(NetworkCommand::PushFragment {
1080 peer_id: peer,
1081 chunk_id: chunk_id.clone(),
1082 fragment_id: fragment.id.clone(),
1083 data: fragment.to_bytes(),
1084 })
1085 .await
1086 .is_ok()
1087 {
1088 distributed += 1;
1089 index_pointers.push(FragmentPointer {
1090 peer_id: peer.to_string(),
1091 fragment_id: fragment.id.clone(),
1092 });
1093 }
1094 }
1095 }
1096
1097 if !index_pointers.is_empty() {
1100 let ann = FragmentIndexAnnouncement {
1101 network_id: network_id.clone(),
1102 chunk_id: chunk_id.clone(),
1103 announced_at: chrono::Utc::now().timestamp() as u64,
1104 pointers: index_pointers,
1105 };
1106 if let Ok(payload) = serde_json::to_vec(&ann) {
1107 let _ = state
1108 .net_tx
1109 .send(NetworkCommand::AnnounceIndex {
1110 network_id: network_id.clone(),
1111 payload,
1112 })
1113 .await;
1114 if let Ok(mut idx) = state.remote_fragment_index.write() {
1116 idx.upsert(ann);
1117 }
1118 }
1119 }
1120
1121 tracing::info!(
1122 chunk_id=%chunk_id, stored=%stored, distributed=%distributed,
1123 total=%fragments.len(), "PutFile stored + distributed"
1124 );
1125
1126 if let Some(name) = file_name {
1128 let entry = crate::storage::StoredFileEntry {
1129 file_name: name,
1130 size_bytes: chunk_data.len() as u64,
1131 chunk_id: chunk_id.clone(),
1132 network_id: network_id.clone(),
1133 uploaded_at: chrono::Utc::now().timestamp() as u64,
1134 };
1135 if let Ok(path) = crate::config::file_registry_path() {
1136 state
1137 .file_registry
1138 .write()
1139 .unwrap()
1140 .insert_and_save(entry, &path);
1141 }
1142 }
1143
1144 ControlResponse::ok(PutFileData {
1145 chunk_id: chunk_id.clone(),
1146 k,
1147 n,
1148 q,
1149 ph,
1150 pe,
1151 fragments_stored: stored,
1152 fragments_distributed: distributed,
1153 message: format!(
1154 "Stored {stored}/{total} locally, pushed {distributed} to {peers} remote pouch(es) — chunk_id: {chunk_id}",
1155 total = fragments.len(),
1156 peers = remote_pouches.len(),
1157 ),
1158 })
1159 }
1160
1161 ControlRequest::GetFile {
1163 chunk_id,
1164 network_id,
1165 } => {
1166 let managers_snap: Vec<Arc<RwLock<StorageManager>>> = {
1167 let map = state.storage_managers.read().unwrap();
1168 map.values()
1169 .filter(|sm| {
1170 let meta = &sm.read().unwrap().meta;
1171 network_id.is_empty() || meta.network_id == network_id
1172 })
1173 .map(Arc::clone)
1174 .collect()
1175 };
1176
1177 let remote_only = managers_snap.is_empty();
1180
1181 let mut all_fragments = Vec::new();
1183 for sm_arc in &managers_snap {
1184 let sm = sm_arc.read().unwrap();
1185 let metas = sm.index.fragments_for_chunk(&chunk_id).to_vec();
1186 for meta in metas {
1187 match sm.load_fragment(&chunk_id, &meta.fragment_id) {
1188 Ok(f) => all_fragments.push(f),
1189 Err(e) => tracing::warn!("load_fragment failed: {e}"),
1190 }
1191 }
1192 }
1193
1194 let local_count = all_fragments.len();
1195
1196 let k_needed = all_fragments.first().map(|f| f.k).unwrap_or(0);
1198
1199 let mut fragments_remote = 0usize;
1203 if remote_only || (k_needed > 0 && all_fragments.len() < k_needed) {
1204 let local_peer = state.identity.peer_id.to_string();
1205
1206 let target_peers: Vec<PeerId> = {
1210 let idx = state.remote_fragment_index.read().unwrap();
1211 let ptrs = idx.pointers_for(&chunk_id);
1212 if !ptrs.is_empty() {
1213 let mut seen = std::collections::HashSet::new();
1215 ptrs.iter()
1216 .filter(|p| p.peer_id != local_peer)
1217 .filter_map(|p| {
1218 if seen.insert(p.peer_id.clone()) {
1219 p.peer_id.parse::<PeerId>().ok()
1220 } else {
1221 None
1222 }
1223 })
1224 .collect()
1225 } else {
1226 let ns = state.network_state.read().unwrap();
1228 ns.in_network(&network_id)
1229 .into_iter()
1230 .filter(|n| {
1231 n.service_type == ServiceType::Pouch && n.peer_id != local_peer
1232 })
1233 .filter_map(|n| n.peer_id.parse::<PeerId>().ok())
1234 .collect()
1235 }
1236 };
1237
1238 let mut receivers = Vec::new();
1240 for peer in &target_peers {
1241 let (tx, rx) = tokio::sync::oneshot::channel();
1242 if state
1243 .net_tx
1244 .send(NetworkCommand::FetchChunkFragments {
1245 peer_id: *peer,
1246 chunk_id: chunk_id.clone(),
1247 resp_tx: tx,
1248 })
1249 .await
1250 .is_ok()
1251 {
1252 receivers.push(rx);
1253 }
1254 }
1255
1256 use crate::coding::rlnc::EncodedFragment;
1258 for rx in receivers {
1259 match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
1260 Ok(Ok(FragmentResponse::FoundMany { fragments })) => {
1261 for (frag_id, bytes) in fragments {
1262 if all_fragments.len() >= k_needed {
1263 break;
1264 }
1265 match EncodedFragment::from_bytes(frag_id, chunk_id.clone(), &bytes)
1266 {
1267 Ok(f) => {
1268 all_fragments.push(f);
1269 fragments_remote += 1;
1270 }
1271 Err(e) => tracing::warn!("Remote fragment parse error: {e}"),
1272 }
1273 }
1274 }
1275 Ok(Ok(FragmentResponse::Found { data })) => {
1276 if all_fragments.len() < k_needed {
1277 let frag_id = uuid::Uuid::new_v4().to_string();
1278 match EncodedFragment::from_bytes(frag_id, chunk_id.clone(), &data)
1279 {
1280 Ok(f) => {
1281 all_fragments.push(f);
1282 fragments_remote += 1;
1283 }
1284 Err(e) => tracing::warn!("Remote fragment parse error: {e}"),
1285 }
1286 }
1287 }
1288 Ok(Ok(_)) => {} Ok(Err(_)) => tracing::debug!("Remote fetch: oneshot dropped"),
1290 Err(_) => tracing::debug!("Remote fetch: timeout"),
1291 }
1292 if all_fragments.len() >= k_needed {
1293 break;
1294 }
1295 }
1296 }
1297
1298 if all_fragments.is_empty() {
1299 return ControlResponse::err(format!(
1300 "Chunk '{chunk_id}' not found locally or on remote peers"
1301 ));
1302 }
1303
1304 let fragments_used = all_fragments.len();
1305 match rlnc::decode(&all_fragments) {
1306 Ok(encrypted_data) => {
1307 let plaintext_hash = {
1310 state
1311 .chunk_cek_hints
1312 .read()
1313 .unwrap()
1314 .get(&chunk_id)
1315 .copied()
1316 };
1317 let data = match plaintext_hash {
1318 Some(ph) => {
1319 let secret_mat = state.identity.secret_material();
1320 let cipher = ChunkCipher::for_user(&secret_mat, &ph);
1321 match cipher.decrypt(&encrypted_data) {
1322 Ok(d) => d,
1323 Err(e) => {
1324 return ControlResponse::err(format!(
1325 "Chunk decryption failed: {e}"
1326 ))
1327 }
1328 }
1329 }
1330 None => {
1331 return ControlResponse::err(format!(
1332 "CEK hint not found for chunk '{}' — daemon may have restarted",
1333 chunk_id
1334 ))
1335 }
1336 };
1337 tracing::info!(
1338 chunk_id=%chunk_id, local=%local_count,
1339 remote=%fragments_remote, total=%fragments_used,
1340 "GetFile decoded + decrypted"
1341 );
1342 ControlResponse::ok(GetFileData {
1343 chunk_id,
1344 data,
1345 fragments_used,
1346 fragments_remote,
1347 })
1348 }
1349 Err(e) => ControlResponse::err(format!("Decode failed: {e}")),
1350 }
1351 }
1352
1353 ControlRequest::StorageInfo { network_id } => {
1355 let managers = state.storage_managers.read().unwrap();
1356 let services = state.services.read().unwrap();
1357 let registry = state.file_registry.read().unwrap();
1358
1359 let avail_factor = {
1361 let own_peer = state.identity.peer_id.to_string();
1362 let tier_enum = {
1363 let rep = state.reputation.read().unwrap();
1364 match rep.get(&own_peer) {
1365 Some(r) => r.tier,
1366 None => crate::network::ReputationTier::R1,
1367 }
1368 };
1369 if tier_enum == crate::network::ReputationTier::R0 {
1370 0.0_f64
1371 } else {
1372 let ph = tier_enum.qos_target_ph();
1373 let qos = state.qos.read().unwrap();
1374 let own_score = qos
1375 .get(&own_peer)
1376 .map(|q| q.stability_score())
1377 .unwrap_or(0.4);
1378 let mut stabilities = qos.all_stability_scores();
1379 stabilities.push(own_score);
1380 crate::coding::params::compute_network_storage_factor(&stabilities, ph)
1381 }
1382 };
1383
1384 let pouches: Vec<PouchStat> = services
1385 .all()
1386 .iter()
1387 .filter(|s| {
1388 s.service_type == ServiceType::Pouch
1389 && (network_id.is_empty() || s.network_id == network_id)
1390 })
1391 .map(|s| {
1392 let tier_label = s.metadata.get("tier").and_then(|v| v.as_str()).map(|t| {
1393 format!(
1394 "{} — {}",
1395 t,
1396 match t {
1397 "T1" => "Pebble",
1398 "T2" => "Stone",
1399 "T3" => "Boulder",
1400 "T4" => "Rock",
1401 "T5" => "Monolith",
1402 _ => "?",
1403 }
1404 )
1405 });
1406 if let Some(sm_lock) = managers.get(&s.id) {
1407 let sm = sm_lock.read().unwrap();
1408 PouchStat {
1409 service_id: s.id.clone(),
1410 network_id: s.network_id.clone(),
1411 storage_tier: tier_label,
1412 storage_bid_bytes: sm.meta.storage_bytes_bid,
1413 storage_used_bytes: sm.meta.storage_bytes_used,
1414 available_bytes: ((sm.meta.available_bytes() as f64) * avail_factor)
1415 .round() as u64,
1416 }
1417 } else {
1418 PouchStat {
1419 service_id: s.id.clone(),
1420 network_id: s.network_id.clone(),
1421 storage_tier: tier_label,
1422 storage_bid_bytes: 0,
1423 storage_used_bytes: 0,
1424 available_bytes: 0,
1425 }
1426 }
1427 })
1428 .collect();
1429
1430 let total_bid_bytes: u64 = pouches.iter().map(|p| p.storage_bid_bytes).sum();
1431 let total_used_bytes: u64 = pouches.iter().map(|p| p.storage_used_bytes).sum();
1432 let total_available_bytes: u64 = pouches.iter().map(|p| p.available_bytes).sum();
1433
1434 let net_filter = if network_id.is_empty() {
1435 ""
1436 } else {
1437 &network_id
1438 };
1439 let file_entries = registry.list(net_filter);
1440 let total_files_uploaded = file_entries.len();
1441 let total_uploaded_bytes: u64 = file_entries.iter().map(|e| e.size_bytes).sum();
1442
1443 ControlResponse::ok(StorageInfoData {
1444 pouches,
1445 total_bid_bytes,
1446 total_used_bytes,
1447 total_available_bytes,
1448 total_files_uploaded,
1449 total_uploaded_bytes,
1450 })
1451 }
1452
1453 ControlRequest::ListFiles { network_id } => {
1455 let registry = state.file_registry.read().unwrap();
1456 let net_filter = if network_id.is_empty() {
1457 ""
1458 } else {
1459 &network_id
1460 };
1461 let entries: Vec<FileEntry> = registry
1462 .list(net_filter)
1463 .into_iter()
1464 .map(|e| FileEntry {
1465 file_name: e.file_name.clone(),
1466 size_bytes: e.size_bytes,
1467 chunk_id: e.chunk_id.clone(),
1468 network_id: e.network_id.clone(),
1469 uploaded_at: e.uploaded_at,
1470 })
1471 .collect();
1472 let total_bytes = entries.iter().map(|e| e.size_bytes).sum();
1473 let total_files = entries.len();
1474 ControlResponse::ok(ListFilesData {
1475 files: entries,
1476 network_id: network_id.clone(),
1477 total_files,
1478 total_bytes,
1479 })
1480 }
1481 }
1482}
1483
1484async fn announce_self(
1487 state: &Arc<DaemonState>,
1488 service_id: &str,
1489 service_type: ServiceType,
1490 network_id: &str,
1491) {
1492 let listen_addrs: Vec<String> = vec![]; let info = NodeInfo {
1495 peer_id: state.identity.peer_id.to_string(),
1496 user_fingerprint: state.identity.fingerprint.clone(),
1497 user_alias: state.identity.profile.alias.clone(),
1498 service_type,
1499 service_id: service_id.to_string(),
1500 network_id: network_id.to_string(),
1501 listen_addrs,
1502 announced_at: chrono::Utc::now().timestamp() as u64,
1503 metadata: HashMap::new(),
1504 };
1505
1506 match serde_json::to_vec(&info) {
1507 Ok(payload) => {
1508 let _ = state
1509 .net_tx
1510 .send(NetworkCommand::Announce {
1511 network_id: network_id.to_string(),
1512 payload,
1513 })
1514 .await;
1515 }
1516 Err(e) => tracing::warn!("Failed to serialize NodeInfo: {}", e),
1517 }
1518}
1519
1520#[cfg(test)]
1525mod tests {
1526 use super::*;
1527 use std::collections::HashMap;
1528
1529 #[test]
1531 fn load_cek_hints_missing_file_returns_empty() {
1532 let dir = tempdir();
1533 let path = dir.join("cek_hints.json");
1534 let hints = load_cek_hints_at(&path);
1535 assert!(hints.is_empty(), "expected empty map for missing file");
1536 }
1537
1538 #[test]
1540 fn cek_hints_persist_load_roundtrip() {
1541 let dir = tempdir();
1542 let path = dir.join("cek_hints.json");
1543
1544 let mut hints: HashMap<String, [u8; 32]> = HashMap::new();
1545 let key_a = *b"abcdefghijklmnopqrstuvwxyz012345";
1546 let key_b = *b"ABCDEFGHIJKLMNOPQRSTUVWXYZ678901";
1547 hints.insert("chunk-aaa".to_string(), key_a);
1548 hints.insert("chunk-bbb".to_string(), key_b);
1549
1550 persist_cek_hints_at(&path, &hints);
1551 let loaded = load_cek_hints_at(&path);
1552
1553 assert_eq!(loaded.len(), 2, "wrong number of hints loaded");
1554 assert_eq!(loaded["chunk-aaa"], key_a);
1555 assert_eq!(loaded["chunk-bbb"], key_b);
1556 }
1557
1558 #[test]
1560 fn load_cek_hints_corrupt_file_returns_empty() {
1561 let dir = tempdir();
1562 let path = dir.join("cek_hints.json");
1563 std::fs::write(&path, b"NOT_VALID_JSON").unwrap();
1564
1565 let hints = load_cek_hints_at(&path);
1566 assert!(hints.is_empty(), "corrupt file must yield empty map");
1567 }
1568
1569 fn tempdir() -> std::path::PathBuf {
1572 let mut path = std::env::temp_dir();
1573 path.push(format!("bp_test_{}", uuid::Uuid::new_v4()));
1574 std::fs::create_dir_all(&path).unwrap();
1575 path
1576 }
1577}