bp_core/
daemon.rs

1//! BillPouch daemon entry point.
2//!
3//! The daemon is started by the CLI (`bp hatch <type>`) when no running daemon
4//! is detected.  It owns the libp2p Swarm and the control Unix socket.
5
6use crate::{
7    config,
8    control::server::{load_cek_hints, run_control_server, DaemonState},
9    error::{BpError, BpResult},
10    identity::Identity,
11    network::{
12        self, run_quality_monitor, NetworkState, OutgoingAssignments, RemoteFragmentIndex,
13        ReputationStore, StorageManagerMap,
14    },
15    service::ServiceRegistry,
16    storage::FileRegistry,
17};
18use std::{
19    collections::HashMap,
20    sync::{Arc, RwLock},
21};
22use tokio::sync::mpsc;
23
24/// Start the daemon process.  This function blocks until the daemon is killed.
25///
26/// `passphrase` is forwarded to [`Identity::load`].  Pass `None` for
27/// plaintext identities; pass `Some(p)` when the identity was created with
28/// `--passphrase`.  As a convenience the daemon also checks the
29/// `BP_PASSPHRASE` environment variable when `passphrase` is `None`.
30pub async fn run_daemon(passphrase: Option<String>) -> BpResult<()> {
31    config::ensure_dirs()?;
32
33    // Resolve passphrase: explicit arg → BP_PASSPHRASE env var → None.
34    let resolved_pass = passphrase
35        .or_else(|| std::env::var("BP_PASSPHRASE").ok())
36        .filter(|s| !s.is_empty());
37
38    // ── Load identity ─────────────────────────────────────────────────────
39    let identity = Identity::load(resolved_pass.as_deref())?;
40    tracing::info!(
41        peer_id = %identity.peer_id,
42        fingerprint = %identity.fingerprint,
43        "BillPouch daemon starting"
44    );
45
46    // Write PID file so the CLI can detect a running daemon.
47    write_pid()?;
48
49    // ── Build shared state ────────────────────────────────────────────────
50    let network_state = Arc::new(RwLock::new(NetworkState::new()));
51    let (net_tx, net_rx) = mpsc::channel::<crate::network::NetworkCommand>(64);
52
53    // Shared storage managers: populated when Pouch services are hatched.
54    let storage_managers: StorageManagerMap = Arc::new(RwLock::new(HashMap::new()));
55
56    let qos = Arc::new(RwLock::new(crate::network::QosRegistry::new()));
57    let outgoing_assignments: OutgoingAssignments = Arc::new(RwLock::new(HashMap::new()));
58    let remote_fragment_index = Arc::new(RwLock::new(RemoteFragmentIndex::new()));
59
60    let daemon_state = Arc::new(DaemonState {
61        identity: identity.clone(),
62        services: RwLock::new(ServiceRegistry::new()),
63        network_state: Arc::clone(&network_state),
64        networks: RwLock::new(Vec::new()),
65        net_tx: net_tx.clone(),
66        storage_managers: Arc::clone(&storage_managers),
67        qos,
68        outgoing_assignments: Arc::clone(&outgoing_assignments),
69        remote_fragment_index: Arc::clone(&remote_fragment_index),
70        chunk_cek_hints: RwLock::new(load_cek_hints()),
71        reputation: RwLock::new(ReputationStore::new()),
72        file_registry: RwLock::new(
73            config::file_registry_path()
74                .map(|p| FileRegistry::load(&p))
75                .unwrap_or_default(),
76        ),
77    });
78
79    // ── Build libp2p swarm ────────────────────────────────────────────────
80    let swarm = network::build_swarm(identity.keypair.clone())
81        .map_err(|e| BpError::Network(e.to_string()))?;
82
83    // Allow the operator to bind a fixed port via BP_LISTEN_PORT (useful for
84    // Docker playgrounds and bootstrap peers).  Defaults to 0 (OS assigns a
85    // random port, which is fine for normal use and mDNS-based discovery).
86    let listen_port: u16 = std::env::var("BP_LISTEN_PORT")
87        .ok()
88        .and_then(|s| s.parse().ok())
89        .unwrap_or(0);
90    let listen_addr = format!("/ip4/0.0.0.0/tcp/{listen_port}")
91        .parse()
92        .map_err(|e: libp2p::multiaddr::Error| BpError::Network(e.to_string()))?;
93
94    // ── Spawn network loop ────────────────────────────────────────────────
95    let net_state = Arc::clone(&network_state);
96    let net_storage = Arc::clone(&storage_managers);
97    let net_outgoing = Arc::clone(&outgoing_assignments);
98    let net_fragment_idx = Arc::clone(&remote_fragment_index);
99    tokio::spawn(async move {
100        if let Err(e) = network::run_network_loop(
101            swarm,
102            net_rx,
103            net_state,
104            listen_addr,
105            net_storage,
106            net_outgoing,
107            net_fragment_idx,
108        )
109        .await
110        {
111            tracing::error!("Network loop exited with error: {}", e);
112        }
113    });
114
115    // ── Gossip eviction task — remove stale peers every 60 s ─────────────
116    let evict_state = Arc::clone(&network_state);
117    tokio::spawn(async move {
118        let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
119        loop {
120            interval.tick().await;
121            if let Ok(mut ns) = evict_state.write() {
122                ns.evict_stale(120); // evict nodes silent for >2 min
123            }
124        }
125    });
126
127    // ── Periodic NodeInfo re-announce — keep gossip state fresh ──────────
128    // announce_self is called once on Hatch; this loop re-broadcasts every
129    // 30 s so that peers joining the mesh late still receive our NodeInfo,
130    // and evict_stale(120) never drops live peers from neighbours.
131    let reannounce_state = Arc::clone(&daemon_state);
132    tokio::spawn(async move {
133        // First burst: re-announce at 10s, 20s, 30s after daemon start,
134        // then settle into 30s intervals. This covers gossipsub mesh
135        // formation delay in Docker playgrounds.
136        for delay in [10u64, 10, 10] {
137            tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
138            let services_snapshot: Vec<(String, crate::service::ServiceType, String)> = {
139                let svc = reannounce_state.services.read().unwrap();
140                svc.all()
141                    .iter()
142                    .map(|s| (s.id.clone(), s.service_type, s.network_id.clone()))
143                    .collect()
144            };
145            for (service_id, service_type, network_id) in services_snapshot {
146                let info = crate::network::state::NodeInfo {
147                    peer_id: reannounce_state.identity.peer_id.to_string(),
148                    user_fingerprint: reannounce_state.identity.fingerprint.clone(),
149                    user_alias: reannounce_state.identity.profile.alias.clone(),
150                    service_type,
151                    service_id,
152                    network_id: network_id.clone(),
153                    listen_addrs: vec![],
154                    announced_at: chrono::Utc::now().timestamp() as u64,
155                    metadata: std::collections::HashMap::new(),
156                };
157                if let Ok(payload) = serde_json::to_vec(&info) {
158                    let _ = reannounce_state
159                        .net_tx
160                        .send(crate::network::NetworkCommand::Announce {
161                            network_id,
162                            payload,
163                        })
164                        .await;
165                }
166            }
167        }
168        // Steady-state: re-announce every 30 s.
169        let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
170        loop {
171            interval.tick().await;
172            let services_snapshot: Vec<(String, crate::service::ServiceType, String)> = {
173                let svc = reannounce_state.services.read().unwrap();
174                svc.all()
175                    .iter()
176                    .map(|s| (s.id.clone(), s.service_type, s.network_id.clone()))
177                    .collect()
178            };
179            for (service_id, service_type, network_id) in services_snapshot {
180                let info = crate::network::state::NodeInfo {
181                    peer_id: reannounce_state.identity.peer_id.to_string(),
182                    user_fingerprint: reannounce_state.identity.fingerprint.clone(),
183                    user_alias: reannounce_state.identity.profile.alias.clone(),
184                    service_type,
185                    service_id,
186                    network_id: network_id.clone(),
187                    listen_addrs: vec![],
188                    announced_at: chrono::Utc::now().timestamp() as u64,
189                    metadata: std::collections::HashMap::new(),
190                };
191                if let Ok(payload) = serde_json::to_vec(&info) {
192                    let _ = reannounce_state
193                        .net_tx
194                        .send(crate::network::NetworkCommand::Announce {
195                            network_id,
196                            payload,
197                        })
198                        .await;
199                }
200            }
201        }
202    });
203
204    // ── Network quality monitor — Ping all Pouch peers, update QoS ──────
205    let monitor_state = Arc::clone(&network_state);
206    let monitor_qos = Arc::clone(&daemon_state.qos);
207    let monitor_tx = net_tx.clone();
208    let monitor_outgoing = Arc::clone(&outgoing_assignments);
209    tokio::spawn(async move {
210        run_quality_monitor(monitor_state, monitor_qos, monitor_tx, monitor_outgoing).await;
211    });
212
213    // ── Run control socket (blocks until daemon is killed) ────────────────
214    let socket_path = config::socket_path()?;
215    run_control_server(socket_path, daemon_state).await?;
216
217    remove_pid();
218    Ok(())
219}
220
221fn write_pid() -> BpResult<()> {
222    let pid = std::process::id();
223    let path = config::pid_path()?;
224    std::fs::write(&path, pid.to_string()).map_err(BpError::Io)
225}
226
227fn remove_pid() {
228    if let Ok(path) = config::pid_path() {
229        let _ = std::fs::remove_file(path);
230    }
231}
232
233/// Returns true if a daemon process is currently running (PID file exists and
234/// process is alive).
235pub fn is_running() -> bool {
236    let path = match config::pid_path() {
237        Ok(p) => p,
238        Err(_) => return false,
239    };
240    let content = match std::fs::read_to_string(&path) {
241        Ok(c) => c,
242        Err(_) => return false,
243    };
244    let pid: u32 = match content.trim().parse() {
245        Ok(p) => p,
246        Err(_) => return false,
247    };
248    // Check if process exists — kill(pid, 0) equivalent via /proc on Linux.
249    let proc_path = format!("/proc/{}", pid);
250    std::path::Path::new(&proc_path).exists()
251}