From cf871a7d2c0cae872a4a0ce2803dfb7e70c42b51 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 14:04:08 +0100 Subject: [PATCH 1/6] refactor: replace broadcast/watch shutdown channels with CancellationToken Switch from tokio broadcast and watch channels to tokio_util CancellationToken for shutdown signaling in both Devnet and RunningNode. This simplifies the shutdown logic and enables concurrent node shutdown in devnet. Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 1 + src/devnet.rs | 40 +++++++++++++++++++++++----------------- src/node.rs | 40 +++++++++++++++------------------------- tests/e2e/testnet.rs | 2 +- 4 files changed, 40 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fa4e2cfa..e3f0ca0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ sha2 = "0.10" # Async runtime tokio = { version = "1.35", features = ["full", "signal"] } +tokio-util = { version = "0.7", features = ["rt"] } futures = "0.3" # CLI diff --git a/src/devnet.rs b/src/devnet.rs index 5eb08c26..122ca741 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -17,7 +17,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; use tokio::task::JoinHandle; use tokio::time::Instant; use tracing::{debug, info, warn}; @@ -63,8 +64,6 @@ const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3; /// Health monitor check interval (seconds). const HEALTH_CHECK_INTERVAL_SECS: u64 = 5; -/// Shutdown broadcast channel capacity. -const SHUTDOWN_CHANNEL_CAPACITY: usize = 1; // ============================================================================= // AntProtocol Devnet Configuration @@ -293,7 +292,7 @@ impl DevnetNode { pub struct Devnet { config: DevnetConfig, nodes: Vec, - shutdown_tx: broadcast::Sender<()>, + shutdown: CancellationToken, state: Arc>, health_monitor: Option>, } @@ -348,12 +347,10 @@ impl Devnet { tokio::fs::create_dir_all(&config.data_dir).await?; - let (shutdown_tx, _) = broadcast::channel(SHUTDOWN_CHANNEL_CAPACITY); - Ok(Self { config, nodes: Vec::new(), - shutdown_tx, + shutdown: CancellationToken::new(), state: Arc::new(RwLock::new(NetworkState::Uninitialized)), health_monitor: None, }) @@ -396,24 +393,33 @@ impl Devnet { info!("Shutting down devnet"); *self.state.write().await = NetworkState::ShuttingDown; - let _ = self.shutdown_tx.send(()); + self.shutdown.cancel(); if let Some(handle) = self.health_monitor.take() { handle.abort(); } + let mut shutdown_futures = Vec::with_capacity(self.nodes.len()); for node in self.nodes.iter_mut().rev() { debug!("Stopping node {}", node.index); if let Some(handle) = node.protocol_task.take() { handle.abort(); } - if let Some(ref p2p) = node.p2p_node { - if let Err(e) = p2p.shutdown().await { - warn!("Error shutting down node {}: {}", node.index, e); + + let node_index = node.index; + let node_state = Arc::clone(&node.state); + let p2p_node = node.p2p_node.take(); + + shutdown_futures.push(async move { + if let Some(p2p) = p2p_node { + if let Err(e) = p2p.shutdown().await { + warn!("Error shutting down node {}: {}", node_index, e); + } } - } - *node.state.write().await = NodeState::Stopped; + *node_state.write().await = NodeState::Stopped; + }); } + futures::future::join_all(shutdown_futures).await; if self.config.cleanup_data_dir { if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await { @@ -687,17 +693,17 @@ impl Devnet { .iter() .filter_map(|n| n.p2p_node.clone()) .collect(); - let mut shutdown_rx = self.shutdown_tx.subscribe(); + let shutdown = self.shutdown.clone(); self.health_monitor = Some(tokio::spawn(async move { let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS); loop { tokio::select! { - _ = shutdown_rx.recv() => break, + () = shutdown.cancelled() => break, () = tokio::time::sleep(check_interval) => { for (i, node) in nodes.iter().enumerate() { - if !node.is_running().await { + if !node.is_running() { warn!("Node {} appears unhealthy", i); } } @@ -710,7 +716,7 @@ impl Devnet { impl Drop for Devnet { fn drop(&mut self) { - let _ = self.shutdown_tx.send(()); + self.shutdown.cancel(); if let Some(handle) = self.health_monitor.take() { handle.abort(); } diff --git a/src/node.rs b/src/node.rs index c2d824d4..a3e749e6 100644 --- a/src/node.rs +++ b/src/node.rs @@ -19,8 +19,8 @@ use saorsa_core::{ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -use tokio::sync::watch; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Maximum number of records for quoting metrics. @@ -55,8 +55,8 @@ impl NodeBuilder { // Ensure root directory exists std::fs::create_dir_all(&self.config.root_dir)?; - // Create shutdown channel - let (shutdown_tx, shutdown_rx) = watch::channel(false); + // Create shutdown token + let shutdown = CancellationToken::new(); // Create event channel let (events_tx, events_rx) = create_event_channel(); @@ -97,8 +97,7 @@ impl NodeBuilder { let node = RunningNode { config: self.config, p2p_node: Arc::new(p2p_node), - shutdown_tx, - shutdown_rx, + shutdown, events_tx, events_rx: Some(events_rx), upgrade_monitor, @@ -272,8 +271,7 @@ impl NodeBuilder { pub struct RunningNode { config: NodeConfig, p2p_node: Arc, - shutdown_tx: watch::Sender, - shutdown_rx: watch::Receiver, + shutdown: CancellationToken, events_tx: NodeEventsSender, events_rx: Option, upgrade_monitor: Option>, @@ -336,17 +334,15 @@ impl RunningNode { if let Some(ref monitor) = self.upgrade_monitor { let monitor = Arc::clone(monitor); let events_tx = self.events_tx.clone(); - let mut shutdown_rx = self.shutdown_rx.clone(); + let shutdown = self.shutdown.clone(); tokio::spawn(async move { let upgrader = AutoApplyUpgrader::new(); loop { tokio::select! { - _ = shutdown_rx.changed() => { - if *shutdown_rx.borrow() { - break; - } + () = shutdown.cancelled() => { + break; } result = monitor.check_for_updates() => { if let Ok(Some(upgrade_info)) = result { @@ -435,11 +431,9 @@ impl RunningNode { loop { tokio::select! { - _ = self.shutdown_rx.changed() => { - if *self.shutdown_rx.borrow() { - info!("Shutdown signal received"); - break; - } + () = self.shutdown.cancelled() => { + info!("Shutdown signal received"); + break; } _ = tokio::signal::ctrl_c() => { info!("Received SIGINT (Ctrl-C), initiating shutdown"); @@ -465,11 +459,9 @@ impl RunningNode { async fn run_event_loop(&mut self) -> Result<()> { loop { tokio::select! { - _ = self.shutdown_rx.changed() => { - if *self.shutdown_rx.borrow() { - info!("Shutdown signal received"); - break; - } + () = self.shutdown.cancelled() => { + info!("Shutdown signal received"); + break; } _ = tokio::signal::ctrl_c() => { info!("Received Ctrl-C, initiating shutdown"); @@ -533,9 +525,7 @@ impl RunningNode { /// Request the node to shut down. pub fn shutdown(&self) { - if let Err(e) = self.shutdown_tx.send(true) { - warn!("Failed to send shutdown signal: {e}"); - } + self.shutdown.cancel(); } } diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 209c6c70..c266f3f2 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1138,7 +1138,7 @@ impl TestNetwork { () = tokio::time::sleep(check_interval) => { // Check each node's health for (i, node) in nodes.iter().enumerate() { - if !node.is_running().await { + if !node.is_running() { warn!("Node {} appears unhealthy", i); } } From eabd560632e41580fb70c75118cdfe8d4cd63364 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 15:09:41 +0100 Subject: [PATCH 2/6] fix: resolve fmt and clippy warnings from CancellationToken refactor Co-Authored-By: Claude Opus 4.6 --- src/devnet.rs | 3 +-- src/node.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/devnet.rs b/src/devnet.rs index 122ca741..610c78cc 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -18,9 +18,9 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; use tokio::task::JoinHandle; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; // ============================================================================= @@ -64,7 +64,6 @@ const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3; /// Health monitor check interval (seconds). const HEALTH_CHECK_INTERVAL_SECS: u64 = 5; - // ============================================================================= // AntProtocol Devnet Configuration // ============================================================================= diff --git a/src/node.rs b/src/node.rs index a3e749e6..d5eb6ebc 100644 --- a/src/node.rs +++ b/src/node.rs @@ -425,7 +425,7 @@ impl RunningNode { /// Run the main event loop, handling shutdown and signals. #[cfg(unix)] - async fn run_event_loop(&mut self) -> Result<()> { + async fn run_event_loop(&self) -> Result<()> { let mut sigterm = signal(SignalKind::terminate())?; let mut sighup = signal(SignalKind::hangup())?; @@ -456,7 +456,7 @@ impl RunningNode { /// Run the main event loop, handling shutdown signals (non-Unix version). #[cfg(not(unix))] - async fn run_event_loop(&mut self) -> Result<()> { + async fn run_event_loop(&self) -> Result<()> { loop { tokio::select! { () = self.shutdown.cancelled() => { From f324e0c0facd9eaa31e7923e17ace118bf3e4d73 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 15:26:33 +0100 Subject: [PATCH 3/6] fix: upgrade saorsa-core to 0.11.0 and await async is_running() Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 2 +- src/devnet.rs | 2 +- tests/e2e/testnet.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e3f0ca0d..8cb3ac06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ path = "src/bin/saorsa-client/main.rs" [dependencies] # Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = "0.10.4" +saorsa-core = "0.11.0" saorsa-pqc = "0.4.0" # Payment verification - autonomi network lookup + EVM payment diff --git a/src/devnet.rs b/src/devnet.rs index 610c78cc..28a58c93 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -702,7 +702,7 @@ impl Devnet { () = shutdown.cancelled() => break, () = tokio::time::sleep(check_interval) => { for (i, node) in nodes.iter().enumerate() { - if !node.is_running() { + if !node.is_running().await { warn!("Node {} appears unhealthy", i); } } diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index c266f3f2..209c6c70 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1138,7 +1138,7 @@ impl TestNetwork { () = tokio::time::sleep(check_interval) => { // Check each node's health for (i, node) in nodes.iter().enumerate() { - if !node.is_running() { + if !node.is_running().await { warn!("Node {} appears unhealthy", i); } } From c3ddcef23079a3a5ca7197a121443a1357061ecd Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 12 Feb 2026 21:30:02 +0100 Subject: [PATCH 4/6] fix: remove unnecessary async call for is_running() in node health checks --- src/devnet.rs | 2 +- tests/e2e/testnet.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/devnet.rs b/src/devnet.rs index 28a58c93..610c78cc 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -702,7 +702,7 @@ impl Devnet { () = shutdown.cancelled() => break, () = tokio::time::sleep(check_interval) => { for (i, node) in nodes.iter().enumerate() { - if !node.is_running().await { + if !node.is_running() { warn!("Node {} appears unhealthy", i); } } diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 209c6c70..c266f3f2 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1138,7 +1138,7 @@ impl TestNetwork { () = tokio::time::sleep(check_interval) => { // Check each node's health for (i, node) in nodes.iter().enumerate() { - if !node.is_running().await { + if !node.is_running() { warn!("Node {} appears unhealthy", i); } } From 44997800467690e0300ffcd51bc198f2f96dd6ff Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 17 Feb 2026 11:36:07 +0100 Subject: [PATCH 5/6] chore: bump saorsa-core to 0.11.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8cb3ac06..612237cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ path = "src/bin/saorsa-client/main.rs" [dependencies] # Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = "0.11.0" +saorsa-core = "0.11.1" saorsa-pqc = "0.4.0" # Payment verification - autonomi network lookup + EVM payment From 857398cd66df8b29855461ce44d865682809ee56 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 17 Feb 2026 12:50:07 +0100 Subject: [PATCH 6/6] fix(e2e): avoid teardown timeout accumulation in test harness --- tests/e2e/testnet.rs | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index c266f3f2..28625bda 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -15,6 +15,7 @@ use ant_evm::RewardsAddress; use bytes::Bytes; +use futures::future::join_all; use rand::Rng; use saorsa_core::{NodeConfig as CoreNodeConfig, P2PEvent, P2PNode}; use saorsa_node::ant_protocol::{ @@ -83,6 +84,12 @@ const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60; /// Default timeout for chunk operations (seconds). const DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS: u64 = 30; +/// Short node-level network timeout for E2E test harness. +/// +/// This bounds DHT leave/request waits during shutdown so tests do not spend +/// most of their runtime in graceful teardown. +const TEST_CORE_CONNECTION_TIMEOUT_SECS: u64 = 2; + // ============================================================================= // AntProtocol Test Configuration // ============================================================================= @@ -987,6 +994,7 @@ impl TestNetwork { core_config.listen_addr = node.address; core_config.listen_addrs = vec![node.address]; core_config.enable_ipv6 = false; // Disable IPv6 for local testing to avoid dual-stack binding issues + core_config.connection_timeout = Duration::from_secs(TEST_CORE_CONNECTION_TIMEOUT_SECS); core_config .bootstrap_peers .clone_from(&node.bootstrap_addrs); @@ -1165,17 +1173,30 @@ impl TestNetwork { handle.abort(); } - // Stop all nodes in reverse order + // Stop all nodes in reverse order. + // We shutdown nodes concurrently to avoid serially accumulating DHT + // graceful-leave waits across every node. + let mut shutdown_futures = Vec::with_capacity(self.nodes.len()); for node in self.nodes.iter_mut().rev() { debug!("Stopping node {}", node.index); if let Some(handle) = node.protocol_task.take() { handle.abort(); } - if let Some(ref p2p) = node.p2p_node { - if let Err(e) = p2p.shutdown().await { - warn!("Error shutting down node {}: {}", node.index, e); - } + *node.state.write().await = NodeState::Stopping; + + if let Some(p2p) = node.p2p_node.clone() { + let node_index = node.index; + shutdown_futures.push(async move { (node_index, p2p.shutdown().await) }); + } + } + + for (node_index, result) in join_all(shutdown_futures).await { + if let Err(e) = result { + warn!("Error shutting down node {}: {}", node_index, e); } + } + + for node in &self.nodes { *node.state.write().await = NodeState::Stopped; }