1use crate::{
7 config,
8 control::server::{load_cek_hints, run_control_server, DaemonState},
9 error::{BpError, BpResult},
10 identity::Identity,
11 network::{
12 self, run_quality_monitor, NetworkState, OutgoingAssignments, RemoteFragmentIndex,
13 ReputationStore, StorageManagerMap,
14 },
15 service::ServiceRegistry,
16 storage::FileRegistry,
17};
18use std::{
19 collections::HashMap,
20 sync::{Arc, RwLock},
21};
22use tokio::sync::mpsc;
23
24pub async fn run_daemon(passphrase: Option<String>) -> BpResult<()> {
31 config::ensure_dirs()?;
32
33 let resolved_pass = passphrase
35 .or_else(|| std::env::var("BP_PASSPHRASE").ok())
36 .filter(|s| !s.is_empty());
37
38 let identity = Identity::load(resolved_pass.as_deref())?;
40 tracing::info!(
41 peer_id = %identity.peer_id,
42 fingerprint = %identity.fingerprint,
43 "BillPouch daemon starting"
44 );
45
46 write_pid()?;
48
49 let network_state = Arc::new(RwLock::new(NetworkState::new()));
51 let (net_tx, net_rx) = mpsc::channel::<crate::network::NetworkCommand>(64);
52
53 let storage_managers: StorageManagerMap = Arc::new(RwLock::new(HashMap::new()));
55
56 let qos = Arc::new(RwLock::new(crate::network::QosRegistry::new()));
57 let outgoing_assignments: OutgoingAssignments = Arc::new(RwLock::new(HashMap::new()));
58 let remote_fragment_index = Arc::new(RwLock::new(RemoteFragmentIndex::new()));
59
60 let daemon_state = Arc::new(DaemonState {
61 identity: identity.clone(),
62 services: RwLock::new(ServiceRegistry::new()),
63 network_state: Arc::clone(&network_state),
64 networks: RwLock::new(Vec::new()),
65 net_tx: net_tx.clone(),
66 storage_managers: Arc::clone(&storage_managers),
67 qos,
68 outgoing_assignments: Arc::clone(&outgoing_assignments),
69 remote_fragment_index: Arc::clone(&remote_fragment_index),
70 chunk_cek_hints: RwLock::new(load_cek_hints()),
71 reputation: RwLock::new(ReputationStore::new()),
72 file_registry: RwLock::new(
73 config::file_registry_path()
74 .map(|p| FileRegistry::load(&p))
75 .unwrap_or_default(),
76 ),
77 });
78
79 let swarm = network::build_swarm(identity.keypair.clone())
81 .map_err(|e| BpError::Network(e.to_string()))?;
82
83 let listen_port: u16 = std::env::var("BP_LISTEN_PORT")
87 .ok()
88 .and_then(|s| s.parse().ok())
89 .unwrap_or(0);
90 let listen_addr = format!("/ip4/0.0.0.0/tcp/{listen_port}")
91 .parse()
92 .map_err(|e: libp2p::multiaddr::Error| BpError::Network(e.to_string()))?;
93
94 let net_state = Arc::clone(&network_state);
96 let net_storage = Arc::clone(&storage_managers);
97 let net_outgoing = Arc::clone(&outgoing_assignments);
98 let net_fragment_idx = Arc::clone(&remote_fragment_index);
99 tokio::spawn(async move {
100 if let Err(e) = network::run_network_loop(
101 swarm,
102 net_rx,
103 net_state,
104 listen_addr,
105 net_storage,
106 net_outgoing,
107 net_fragment_idx,
108 )
109 .await
110 {
111 tracing::error!("Network loop exited with error: {}", e);
112 }
113 });
114
115 let evict_state = Arc::clone(&network_state);
117 tokio::spawn(async move {
118 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
119 loop {
120 interval.tick().await;
121 if let Ok(mut ns) = evict_state.write() {
122 ns.evict_stale(120); }
124 }
125 });
126
127 let reannounce_state = Arc::clone(&daemon_state);
132 tokio::spawn(async move {
133 for delay in [10u64, 10, 10] {
137 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
138 let services_snapshot: Vec<(String, crate::service::ServiceType, String)> = {
139 let svc = reannounce_state.services.read().unwrap();
140 svc.all()
141 .iter()
142 .map(|s| (s.id.clone(), s.service_type, s.network_id.clone()))
143 .collect()
144 };
145 for (service_id, service_type, network_id) in services_snapshot {
146 let info = crate::network::state::NodeInfo {
147 peer_id: reannounce_state.identity.peer_id.to_string(),
148 user_fingerprint: reannounce_state.identity.fingerprint.clone(),
149 user_alias: reannounce_state.identity.profile.alias.clone(),
150 service_type,
151 service_id,
152 network_id: network_id.clone(),
153 listen_addrs: vec![],
154 announced_at: chrono::Utc::now().timestamp() as u64,
155 metadata: std::collections::HashMap::new(),
156 };
157 if let Ok(payload) = serde_json::to_vec(&info) {
158 let _ = reannounce_state
159 .net_tx
160 .send(crate::network::NetworkCommand::Announce {
161 network_id,
162 payload,
163 })
164 .await;
165 }
166 }
167 }
168 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
170 loop {
171 interval.tick().await;
172 let services_snapshot: Vec<(String, crate::service::ServiceType, String)> = {
173 let svc = reannounce_state.services.read().unwrap();
174 svc.all()
175 .iter()
176 .map(|s| (s.id.clone(), s.service_type, s.network_id.clone()))
177 .collect()
178 };
179 for (service_id, service_type, network_id) in services_snapshot {
180 let info = crate::network::state::NodeInfo {
181 peer_id: reannounce_state.identity.peer_id.to_string(),
182 user_fingerprint: reannounce_state.identity.fingerprint.clone(),
183 user_alias: reannounce_state.identity.profile.alias.clone(),
184 service_type,
185 service_id,
186 network_id: network_id.clone(),
187 listen_addrs: vec![],
188 announced_at: chrono::Utc::now().timestamp() as u64,
189 metadata: std::collections::HashMap::new(),
190 };
191 if let Ok(payload) = serde_json::to_vec(&info) {
192 let _ = reannounce_state
193 .net_tx
194 .send(crate::network::NetworkCommand::Announce {
195 network_id,
196 payload,
197 })
198 .await;
199 }
200 }
201 }
202 });
203
204 let monitor_state = Arc::clone(&network_state);
206 let monitor_qos = Arc::clone(&daemon_state.qos);
207 let monitor_tx = net_tx.clone();
208 let monitor_outgoing = Arc::clone(&outgoing_assignments);
209 tokio::spawn(async move {
210 run_quality_monitor(monitor_state, monitor_qos, monitor_tx, monitor_outgoing).await;
211 });
212
213 let socket_path = config::socket_path()?;
215 run_control_server(socket_path, daemon_state).await?;
216
217 remove_pid();
218 Ok(())
219}
220
221fn write_pid() -> BpResult<()> {
222 let pid = std::process::id();
223 let path = config::pid_path()?;
224 std::fs::write(&path, pid.to_string()).map_err(BpError::Io)
225}
226
227fn remove_pid() {
228 if let Ok(path) = config::pid_path() {
229 let _ = std::fs::remove_file(path);
230 }
231}
232
233pub fn is_running() -> bool {
236 let path = match config::pid_path() {
237 Ok(p) => p,
238 Err(_) => return false,
239 };
240 let content = match std::fs::read_to_string(&path) {
241 Ok(c) => c,
242 Err(_) => return false,
243 };
244 let pid: u32 = match content.trim().parse() {
245 Ok(p) => p,
246 Err(_) => return false,
247 };
248 let proc_path = format!("/proc/{}", pid);
250 std::path::Path::new(&proc_path).exists()
251}