Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ path = "src/bin/saorsa-client/main.rs"

[dependencies]
# Core (provides EVERYTHING: networking, DHT, security, trust, storage)
saorsa-core = "0.11.0"
saorsa-core = "0.11.1"
saorsa-pqc = "0.4.0"

# Payment verification - autonomi network lookup + EVM payment
Expand All @@ -51,6 +51,7 @@ sha2 = "0.10"

# Async runtime
tokio = { version = "1.35", features = ["full", "signal"] }
tokio-util = { version = "0.7", features = ["rt"] }
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CancellationToken lives under tokio_util::sync and typically requires enabling the tokio-util crate's sync feature. With only features = [\"rt\"], this may fail to compile. Consider switching to tokio-util = { version = \"0.7\", features = [\"sync\"] } (or include both \"sync\" and any other needed features).

Suggested change
tokio-util = { version = "0.7", features = ["rt"] }
tokio-util = { version = "0.7", features = ["rt", "sync"] }

Copilot uses AI. Check for mistakes.
futures = "0.3"

# CLI
Expand Down
41 changes: 23 additions & 18 deletions src/devnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

// =============================================================================
Expand Down Expand Up @@ -63,9 +64,6 @@ const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
/// Health monitor check interval (seconds).
const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;

/// Shutdown broadcast channel capacity.
const SHUTDOWN_CHANNEL_CAPACITY: usize = 1;

// =============================================================================
// AntProtocol Devnet Configuration
// =============================================================================
Expand Down Expand Up @@ -293,7 +291,7 @@ impl DevnetNode {
pub struct Devnet {
config: DevnetConfig,
nodes: Vec<DevnetNode>,
shutdown_tx: broadcast::Sender<()>,
shutdown: CancellationToken,
state: Arc<RwLock<NetworkState>>,
health_monitor: Option<JoinHandle<()>>,
}
Expand Down Expand Up @@ -348,12 +346,10 @@ impl Devnet {

tokio::fs::create_dir_all(&config.data_dir).await?;

let (shutdown_tx, _) = broadcast::channel(SHUTDOWN_CHANNEL_CAPACITY);

Ok(Self {
config,
nodes: Vec::new(),
shutdown_tx,
shutdown: CancellationToken::new(),
state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
health_monitor: None,
})
Expand Down Expand Up @@ -396,24 +392,33 @@ impl Devnet {
info!("Shutting down devnet");
*self.state.write().await = NetworkState::ShuttingDown;

let _ = self.shutdown_tx.send(());
self.shutdown.cancel();

if let Some(handle) = self.health_monitor.take() {
handle.abort();
}

let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
for node in self.nodes.iter_mut().rev() {
debug!("Stopping node {}", node.index);
if let Some(handle) = node.protocol_task.take() {
handle.abort();
}
if let Some(ref p2p) = node.p2p_node {
if let Err(e) = p2p.shutdown().await {
warn!("Error shutting down node {}: {}", node.index, e);

let node_index = node.index;
let node_state = Arc::clone(&node.state);
let p2p_node = node.p2p_node.take();

shutdown_futures.push(async move {
if let Some(p2p) = p2p_node {
if let Err(e) = p2p.shutdown().await {
warn!("Error shutting down node {}: {}", node_index, e);
}
}
}
*node.state.write().await = NodeState::Stopped;
*node_state.write().await = NodeState::Stopped;
});
Comment on lines +410 to +419
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async block pushed into shutdown_futures is not spawned as a task or wrapped in a future that will execute independently. Since join_all expects futures that are already running or self-contained, this code will execute the shutdown logic sequentially when join_all polls each future, defeating the purpose of concurrent shutdown. Spawn each shutdown as a tokio task (e.g., tokio::spawn(async move { ... })) and collect the join handles instead.

Copilot uses AI. Check for mistakes.
}
futures::future::join_all(shutdown_futures).await;
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing import for futures crate. The code uses futures::future::join_all but there is no corresponding use futures statement at the top of the file. This will cause a compilation error. Add use futures or use futures::future to the imports.

Copilot uses AI. Check for mistakes.

if self.config.cleanup_data_dir {
if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
Expand Down Expand Up @@ -687,17 +692,17 @@ impl Devnet {
.iter()
.filter_map(|n| n.p2p_node.clone())
.collect();
let mut shutdown_rx = self.shutdown_tx.subscribe();
let shutdown = self.shutdown.clone();

self.health_monitor = Some(tokio::spawn(async move {
let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);

loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
() = shutdown.cancelled() => break,
() = tokio::time::sleep(check_interval) => {
for (i, node) in nodes.iter().enumerate() {
if !node.is_running().await {
if !node.is_running() {
warn!("Node {} appears unhealthy", i);
}
}
Expand All @@ -710,7 +715,7 @@ impl Devnet {

impl Drop for Devnet {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(());
self.shutdown.cancel();
if let Some(handle) = self.health_monitor.take() {
handle.abort();
}
Expand Down
44 changes: 17 additions & 27 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use saorsa_core::{
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

/// Maximum number of records for quoting metrics.
Expand Down Expand Up @@ -55,8 +55,8 @@ impl NodeBuilder {
// Ensure root directory exists
std::fs::create_dir_all(&self.config.root_dir)?;

// Create shutdown channel
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Create shutdown token
let shutdown = CancellationToken::new();

// Create event channel
let (events_tx, events_rx) = create_event_channel();
Expand Down Expand Up @@ -97,8 +97,7 @@ impl NodeBuilder {
let node = RunningNode {
config: self.config,
p2p_node: Arc::new(p2p_node),
shutdown_tx,
shutdown_rx,
shutdown,
events_tx,
events_rx: Some(events_rx),
upgrade_monitor,
Expand Down Expand Up @@ -272,8 +271,7 @@ impl NodeBuilder {
pub struct RunningNode {
config: NodeConfig,
p2p_node: Arc<P2PNode>,
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
shutdown: CancellationToken,
events_tx: NodeEventsSender,
events_rx: Option<NodeEventsChannel>,
upgrade_monitor: Option<Arc<UpgradeMonitor>>,
Expand Down Expand Up @@ -336,17 +334,15 @@ impl RunningNode {
if let Some(ref monitor) = self.upgrade_monitor {
let monitor = Arc::clone(monitor);
let events_tx = self.events_tx.clone();
let mut shutdown_rx = self.shutdown_rx.clone();
let shutdown = self.shutdown.clone();

tokio::spawn(async move {
let upgrader = AutoApplyUpgrader::new();

loop {
tokio::select! {
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
break;
}
() = shutdown.cancelled() => {
break;
}
result = monitor.check_for_updates() => {
if let Ok(Some(upgrade_info)) = result {
Expand Down Expand Up @@ -429,17 +425,15 @@ impl RunningNode {

/// Run the main event loop, handling shutdown and signals.
#[cfg(unix)]
async fn run_event_loop(&mut self) -> Result<()> {
async fn run_event_loop(&self) -> Result<()> {
let mut sigterm = signal(SignalKind::terminate())?;
let mut sighup = signal(SignalKind::hangup())?;

loop {
tokio::select! {
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
info!("Shutdown signal received");
break;
}
() = self.shutdown.cancelled() => {
info!("Shutdown signal received");
break;
}
_ = tokio::signal::ctrl_c() => {
info!("Received SIGINT (Ctrl-C), initiating shutdown");
Expand All @@ -462,14 +456,12 @@ impl RunningNode {

/// Run the main event loop, handling shutdown signals (non-Unix version).
#[cfg(not(unix))]
async fn run_event_loop(&mut self) -> Result<()> {
async fn run_event_loop(&self) -> Result<()> {
loop {
tokio::select! {
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
info!("Shutdown signal received");
break;
}
() = self.shutdown.cancelled() => {
info!("Shutdown signal received");
break;
}
_ = tokio::signal::ctrl_c() => {
info!("Received Ctrl-C, initiating shutdown");
Expand Down Expand Up @@ -533,9 +525,7 @@ impl RunningNode {

/// Request the node to shut down.
pub fn shutdown(&self) {
if let Err(e) = self.shutdown_tx.send(true) {
warn!("Failed to send shutdown signal: {e}");
}
self.shutdown.cancel();
}
}

Expand Down
33 changes: 27 additions & 6 deletions tests/e2e/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

use ant_evm::RewardsAddress;
use bytes::Bytes;
use futures::future::join_all;
use rand::Rng;
use saorsa_core::{NodeConfig as CoreNodeConfig, P2PEvent, P2PNode};
use saorsa_node::ant_protocol::{
Expand Down Expand Up @@ -83,6 +84,12 @@ const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
/// Default timeout for chunk operations (seconds).
const DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS: u64 = 30;

/// Short node-level network timeout for E2E test harness.
///
/// This bounds DHT leave/request waits during shutdown so tests do not spend
/// most of their runtime in graceful teardown.
const TEST_CORE_CONNECTION_TIMEOUT_SECS: u64 = 2;

// =============================================================================
// AntProtocol Test Configuration
// =============================================================================
Expand Down Expand Up @@ -987,6 +994,7 @@ impl TestNetwork {
core_config.listen_addr = node.address;
core_config.listen_addrs = vec![node.address];
core_config.enable_ipv6 = false; // Disable IPv6 for local testing to avoid dual-stack binding issues
core_config.connection_timeout = Duration::from_secs(TEST_CORE_CONNECTION_TIMEOUT_SECS);
core_config
.bootstrap_peers
.clone_from(&node.bootstrap_addrs);
Expand Down Expand Up @@ -1138,7 +1146,7 @@ impl TestNetwork {
() = tokio::time::sleep(check_interval) => {
// Check each node's health
for (i, node) in nodes.iter().enumerate() {
if !node.is_running().await {
if !node.is_running() {
warn!("Node {} appears unhealthy", i);
}
}
Expand All @@ -1165,17 +1173,30 @@ impl TestNetwork {
handle.abort();
}

// Stop all nodes in reverse order
// Stop all nodes in reverse order.
// We shutdown nodes concurrently to avoid serially accumulating DHT
// graceful-leave waits across every node.
let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
for node in self.nodes.iter_mut().rev() {
debug!("Stopping node {}", node.index);
if let Some(handle) = node.protocol_task.take() {
handle.abort();
}
if let Some(ref p2p) = node.p2p_node {
if let Err(e) = p2p.shutdown().await {
warn!("Error shutting down node {}: {}", node.index, e);
}
*node.state.write().await = NodeState::Stopping;

if let Some(p2p) = node.p2p_node.clone() {
let node_index = node.index;
shutdown_futures.push(async move { (node_index, p2p.shutdown().await) });
}
}

for (node_index, result) in join_all(shutdown_futures).await {
if let Err(e) = result {
warn!("Error shutting down node {}: {}", node_index, e);
}
}

for node in &self.nodes {
*node.state.write().await = NodeState::Stopped;
}

Expand Down
Loading