diff --git a/Cargo.toml b/Cargo.toml index ce14c307..d5fa01a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,12 +32,12 @@ path = "src/bin/saorsa-cli/main.rs" [dependencies] # Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = "0.14.1" +saorsa-core = "0.17" saorsa-pqc = "0.5" # Payment verification - autonomi network lookup + EVM payment ant-evm = "0.1.19" -evmlib = "0.4.7" +evmlib = "0.4.9" xor_name = "5" libp2p = "0.56" # For PeerId in payment proofs multihash = "0.19" # For identity multihash in PeerId construction diff --git a/docs/DESIGN.md b/docs/DESIGN.md index f6497a27..ee1fea11 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -8,7 +8,7 @@ Build a **pure quantum-proof network node** (`saorsa-node`) that: 3. Auto-migrates local ant-node data on startup 4. Implements auto-upgrade with ML-DSA signature verification 5. Supports dual IPv4/IPv6 DHT for maximum connectivity -6. Features geographic routing, Sybil resistance, and EigenTrust +6. Features geographic routing, Sybil resistance, and trust-based routing ## Architecture Philosophy @@ -122,7 +122,7 @@ impl SaorsaNode { | Network Protocol | **Dual IPv4/IPv6 DHT** | Maximum connectivity and resilience | | Geographic Routing | **Enabled** | No datacenter concentration | | Sybil Resistance | **Required** | Prevent Sybil attacks | -| Node Reputation | **EigenTrust** | Measure and remove bad nodes | +| Node Reputation | **TrustEngine** | Measure and block bad nodes | | Auto-Upgrade | Phase 1 Critical | Essential for network transition | --- @@ -170,7 +170,7 @@ saorsa-node/ **REMOVED** (provided by saorsa-core): - `network/` - Use NetworkCoordinator + DualStackNetworkNode -- `trust/` - Use EigenTrustEngine +- `trust/` - Use TrustEngine - `storage/` - Use ContentStore - `replication/` - Use ReplicationManager @@ -182,20 +182,18 @@ saorsa-node/ ```rust use saorsa_core::{ - adaptive::coordinator::NetworkCoordinator, - adaptive::security::SecurityManager, - adaptive::trust::EigenTrustEngine, - bootstrap::BootstrapManager, - dht::trust_weighted_kademlia::TrustWeightedKademlia, - messaging::NetworkConfig, - security::{IPv6NodeID, IPDiversityEnforcer}, + P2PNode, NodeConfig, NodeMode, + adaptive::trust::TrustEngine, + adaptive::dht::AdaptiveDhtConfig, + BootstrapConfig, BootstrapManager, + IPDiversityConfig, + identity::peer_id::PeerId, }; pub struct RunningNode { shutdown_sender: watch::Sender, // USE SAORSA-CORE DIRECTLY - NO REIMPLEMENTATION! - coordinator: Arc, // Integrates ALL components - security: Arc, // Rate limiting, blacklist, eclipse detection + node: Arc, // Integrates ALL components bootstrap: Arc, // 30,000 peer cache // Events node_events_channel: NodeEventsChannel, @@ -203,7 +201,7 @@ pub struct RunningNode { } pub struct NodeBuilder { - network_config: NetworkConfig, // saorsa-core's config + node_config: NodeConfig, // saorsa-core's config identity: saorsa_core::identity::NodeIdentity, root_dir: PathBuf, auto_migrate_ant_data: bool, @@ -274,22 +272,19 @@ pub struct IPv6NodeID { **File:** `saorsa-core/src/adaptive/trust.rs` (825 lines) ```rust -// Just use saorsa-core's EigenTrust++ engine! -use saorsa_core::adaptive::trust::EigenTrustEngine; +// Just use saorsa-core's TrustEngine (formerly EigenTrust++)! +use saorsa_core::TrustEngine; // Multi-factor trust scoring ALREADY IMPLEMENTED: -// - 40% response_rate (correct/total responses) -// - 20% uptime_estimate -// - 15% storage_contributed -// - 15% bandwidth_contributed -// - 10% compute_contributed -// + Time decay (0.99 per hour) -// + Pre-trusted node bootstrap (0.9 initial) -// + Background computation every 5 minutes - -let engine = EigenTrustEngine::new(pre_trusted_nodes); -engine.update_local_trust(from, to, success).await; -let score = engine.get_trust_async(node_id).await; +// - Response rate tracking +// - Connection success/failure monitoring +// - Time decay +// - Pre-trusted node bootstrap +// - Background computation + +// Trust is accessed via P2PNode: +let score = node.peer_trust(&peer_id); +node.report_trust_event(&peer_id, TrustEvent::SuccessfulResponse); ``` #### 5. Geographic Routing - ALREADY IN SAORSA-CORE! @@ -306,44 +301,31 @@ use saorsa_core::dht::geographic_routing::{GeographicRegion, LatencyAwareSelecti // ASN diversity enforcement ``` -#### 6. Security Manager - ALREADY IN SAORSA-CORE! - -**File:** `saorsa-core/src/adaptive/security.rs` (1,326 lines) +#### 6. Security - ALREADY IN SAORSA-CORE! ```rust -// Comprehensive security - just configure and use! -use saorsa_core::adaptive::security::{SecurityManager, SecurityConfig}; - -let security = SecurityManager::new(config, identity); - -// ALREADY IMPLEMENTED: -// - Rate limiting: 100 req/min per node, 500/min per IP -// - Join rate: 20 new nodes/hour -// - Blacklist with 24-hour TTL -// - Eclipse attack detection via diversity scoring -// - Message integrity verification (ML-DSA) -// - Full audit logging with 30-day retention +// IP diversity enforcement for Sybil resistance +use saorsa_core::IPDiversityConfig; + +// Multi-layer subnet enforcement ALREADY IMPLEMENTED: +// - Per-subnet limits (/64, /48, /32) +// - ASN diversity +// - Configurable via IPDiversityConfig::permissive() / ::testnet() + +// Rate limiting and trust-based blocking handled by AdaptiveDHT ``` -#### 7. NetworkCoordinator - INTEGRATES EVERYTHING! +#### 7. P2PNode - INTEGRATES EVERYTHING! -**File:** `saorsa-core/src/adaptive/coordinator.rs` +**File:** `saorsa-core/src/network.rs` ```rust -// The coordinator brings ALL components together -pub struct NetworkCoordinator { - identity: Arc, - transport: Arc, - dht: Arc, // Trust-weighted Kademlia - router: Arc, // Geographic + trust routing - trust_engine: Arc, // EigenTrust++ - gossip: Arc, // Pub/sub messaging - storage: Arc, // DHT storage - replication: Arc, // k=8 replication - churn_handler: Arc, // Node churn handling - security: Arc, // All security features - // + ML optimization components -} +// P2PNode brings ALL components together +// Access trust via: +node.trust_engine() // Arc +node.adaptive_dht() // &AdaptiveDHT +node.peer_trust(&peer) // Quick trust score lookup +node.report_trust_event(&peer, event) // Report trust signals ``` #### 8. What saorsa-node ACTUALLY Needs to Build @@ -368,7 +350,7 @@ pub struct AntDataMigrator { /// Node lifecycle and CLI (wrapper around saorsa-core) pub struct NodeLifecycle { - coordinator: Arc, + node: Arc, upgrade_monitor: UpgradeMonitor, migrator: Option, } @@ -381,11 +363,10 @@ pub struct NodeLifecycle { **KEY INSIGHT**: saorsa-core already provides: - Dual IPv4/IPv6 with DualStackNetworkNode and Happy Eyeballs - Sybil Resistance with IPv6NodeID and IPDiversityEnforcer -- EigenTrust++ with full trust engine +- TrustEngine with trust scoring and blocking - Geographic Routing with 7 regions and latency-aware selection -- Security Manager with rate limiting, blacklist, eclipse detection -- NetworkCoordinator that integrates everything -- Storage and replication via ContentStore and ReplicationManager +- IP diversity enforcement for Sybil resistance +- P2PNode that integrates everything **saorsa-node only needs to build**: 1. Auto-upgrade system (Phase 1 Critical) @@ -476,7 +457,7 @@ pub struct NodeLifecycle { ### 5. Network Hardening - **Geographic routing**: No datacenter concentration in close groups - **Sybil resistance**: Join rate limiting, node age, resource verification -- **EigenTrust**: Node reputation and automatic bad node removal +- **TrustEngine**: Node reputation and automatic bad node blocking - **Rationale**: Production-grade security ### 6. Migration Strategy: Client-as-Bridge + Node Auto-Migration diff --git a/src/bin/saorsa-cli/cli.rs b/src/bin/saorsa-cli/cli.rs index 627ac628..00372153 100644 --- a/src/bin/saorsa-cli/cli.rs +++ b/src/bin/saorsa-cli/cli.rs @@ -21,6 +21,10 @@ pub struct Cli { #[arg(long, default_value_t = 60)] pub timeout_secs: u64, + /// Allow loopback connections (required for devnet/local testing). + #[arg(long)] + pub allow_loopback: bool, + /// Log level. #[arg(long, default_value = "info")] pub log_level: String, diff --git a/src/bin/saorsa-cli/main.rs b/src/bin/saorsa-cli/main.rs index c95ca72a..62d2cefe 100644 --- a/src/bin/saorsa-cli/main.rs +++ b/src/bin/saorsa-cli/main.rs @@ -60,7 +60,7 @@ async fn main() -> color_eyre::Result<()> { } let (bootstrap, manifest) = resolve_bootstrap(&cli)?; - let node = create_client_node(bootstrap).await?; + let node = create_client_node(bootstrap, cli.allow_loopback).await?; // Build client with timeout let mut client = QuantumClient::new(QuantumConfig { @@ -309,9 +309,14 @@ fn resolve_evm_network( fn resolve_bootstrap( cli: &Cli, -) -> color_eyre::Result<(Vec, Option)> { +) -> color_eyre::Result<(Vec, Option)> { if !cli.bootstrap.is_empty() { - return Ok((cli.bootstrap.clone(), None)); + let addrs = cli + .bootstrap + .iter() + .map(|addr| saorsa_core::MultiAddr::quic(*addr)) + .collect(); + return Ok((addrs, None)); } if let Some(ref manifest_path) = cli.devnet_manifest { @@ -326,17 +331,17 @@ fn resolve_bootstrap( )) } -async fn create_client_node(bootstrap: Vec) -> Result, Error> { - let mut core_config = saorsa_core::NodeConfig::new() +async fn create_client_node( + bootstrap: Vec, + allow_loopback: bool, +) -> Result, Error> { + let mut core_config = saorsa_core::NodeConfig::builder() + .local(allow_loopback) + .max_message_size(MAX_WIRE_MESSAGE_SIZE) + .mode(saorsa_core::NodeMode::Client) + .build() .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?; - core_config.listen_addr = "0.0.0.0:0" - .parse() - .map_err(|e| Error::Config(format!("Invalid listen addr: {e}")))?; - core_config.listen_addrs = vec![core_config.listen_addr]; - core_config.enable_ipv6 = false; core_config.bootstrap_peers = bootstrap; - core_config.max_message_size = Some(MAX_WIRE_MESSAGE_SIZE); - core_config.mode = saorsa_core::NodeMode::Client; let node = P2PNode::new(core_config) .await diff --git a/src/client/mod.rs b/src/client/mod.rs index 9a261d59..704a2c6c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -62,4 +62,6 @@ pub use chunk_protocol::send_and_await_chunk_response; pub use data_types::{ compute_address, peer_id_to_xor_name, xor_distance, ChunkStats, DataChunk, XorName, }; -pub use quantum::{hex_node_id_to_encoded_peer_id, QuantumClient, QuantumConfig}; +pub use quantum::{ + hex_node_id_to_encoded_peer_id, PaidChunk, PreparedChunk, QuantumClient, QuantumConfig, +}; diff --git a/src/client/quantum.rs b/src/client/quantum.rs index 98402e88..e779416a 100644 --- a/src/client/quantum.rs +++ b/src/client/quantum.rs @@ -32,7 +32,7 @@ use evmlib::wallet::Wallet; use futures::stream::{FuturesUnordered, StreamExt}; use saorsa_core::identity::PeerId; use saorsa_core::P2PNode; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -47,6 +47,40 @@ const CLOSE_GROUP_SIZE: usize = 8; /// Default number of replicas for data redundancy. const DEFAULT_REPLICA_COUNT: u8 = 4; +/// A chunk that has been quoted but not yet paid or stored. +/// +/// Produced by [`QuantumClient::prepare_chunk_payment`] and consumed by +/// [`QuantumClient::batch_pay`] or [`QuantumClient::batch_pay_and_store`]. +pub struct PreparedChunk { + /// The raw chunk content. + pub content: Bytes, + /// Content-address (BLAKE3 hash). + pub address: XorName, + /// Peer ID + quote pairs for building `ProofOfPayment`. + pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>, + /// The payment structure (sorted quotes, median selected). + pub payment: SingleNodePayment, + /// The closest peer to the chunk address, pinned during quote collection + /// so that the storage target is always one of the paid peers. + pub target_peer: PeerId, +} + +/// A chunk that has been paid on-chain but not yet stored on the network. +/// +/// Produced by [`QuantumClient::batch_pay`]. Store via +/// [`QuantumClient::put_chunk_with_proof`]. +pub struct PaidChunk { + /// The raw chunk content. + pub content: Bytes, + /// Serialized payment proof (msgpack bytes). + pub proof_bytes: Vec, + /// Transaction hashes from this chunk's on-chain payment. + pub tx_hashes: Vec, + /// The closest peer to the chunk address, pinned during quote collection + /// so that the storage target is always one of the paid peers. + pub target_peer: PeerId, +} + /// Configuration for the quantum-resistant client. #[derive(Debug, Clone)] pub struct QuantumConfig { @@ -279,8 +313,9 @@ impl QuantumClient { let data_size = u64::try_from(content_size) .map_err(|e| Error::Network(format!("Content size too large: {e}")))?; - // Step 1: Request quotes from network nodes via DHT - let quotes_with_peers = self + // Step 1: Request quotes from network nodes via DHT. + // The closest peer is pinned here so we store to a peer that was paid. + let (target_peer, quotes_with_peers) = self .get_quotes_from_dht_for_address(&address, data_size) .await?; @@ -327,9 +362,7 @@ impl QuantumClient { let payment_proof = rmp_serde::to_vec(&proof) .map_err(|e| Error::Network(format!("Failed to serialize payment proof: {e}")))?; - // Step 6: Send chunk with payment proof to storage node - let target_peer = Self::pick_target_peer(node, &address).await?; - + // Step 6: Send chunk with payment proof to the peer pinned during quoting let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); let request = ChunkPutRequest::with_payment(address, content.to_vec(), payment_proof); let message = ChunkMessage { @@ -356,13 +389,16 @@ impl QuantumClient { /// Store a chunk with a pre-built payment proof, skipping the internal payment flow. /// - /// Use this when you have already obtained quotes and paid on-chain externally - /// (e.g. via [`SingleNodePayment::pay`]) and want to avoid a redundant payment cycle. + /// The `target_peer` should be the peer pinned during quote collection so that + /// the storage target is guaranteed to be one of the paid peers. Use the + /// `target_peer` field from [`PaidChunk`] or the first element returned by + /// [`get_quotes_from_dht`](Self::get_quotes_from_dht). /// /// # Arguments /// /// * `content` - The data to store /// * `proof` - A serialised [`ProofOfPayment`] (msgpack bytes) + /// * `target_peer` - The peer to send the chunk to (pinned during quoting) /// /// # Returns /// @@ -372,9 +408,13 @@ impl QuantumClient { /// /// Returns an error if: /// - P2P node is not configured - /// - No remote peers found near the target address /// - Storage operation fails - pub async fn put_chunk_with_proof(&self, content: Bytes, proof: Vec) -> Result { + pub async fn put_chunk_with_proof( + &self, + content: Bytes, + proof: Vec, + target_peer: &PeerId, + ) -> Result { let Some(ref node) = self.p2p_node else { return Err(Error::Network("P2P node not configured".into())); }; @@ -382,8 +422,6 @@ impl QuantumClient { let address = compute_address(&content); let content_size = content.len(); - let target_peer = Self::pick_target_peer(node, &address).await?; - let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); let request = ChunkPutRequest::with_payment(address, content.to_vec(), proof); let message = ChunkMessage { @@ -396,7 +434,7 @@ impl QuantumClient { Self::send_put_and_await( node, - &target_peer, + target_peer, message_bytes, request_id, self.config.timeout_secs, @@ -406,6 +444,193 @@ impl QuantumClient { .await } + /// Collect quotes for a chunk without paying. + /// + /// Returns a [`PreparedChunk`] containing all the information needed to + /// pay and store the chunk later. Use with [`batch_pay_and_store`](Self::batch_pay_and_store) + /// to pay for multiple chunks in a single EVM transaction. + /// + /// # Errors + /// + /// Returns an error if DHT lookup or quote collection fails. + pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result { + let content_len = content.len(); + debug!("Preparing payment for chunk ({content_len} bytes)"); + + self.p2p_node + .as_ref() + .ok_or_else(|| Error::Network("P2P node not configured".into()))?; + + self.wallet.as_ref().ok_or_else(|| { + Error::Payment( + "Wallet not configured - use with_wallet() to enable payments".to_string(), + ) + })?; + + let address = compute_address(&content); + let data_size = u64::try_from(content.len()) + .map_err(|e| Error::Network(format!("Content size too large: {e}")))?; + + let (target_peer, quotes_with_peers) = self + .get_quotes_from_dht_for_address(&address, data_size) + .await?; + + if quotes_with_peers.len() != REQUIRED_QUOTES { + return Err(Error::Payment(format!( + "Expected {REQUIRED_QUOTES} quotes but received {}", + quotes_with_peers.len() + ))); + } + + let mut peer_quotes: Vec<(EncodedPeerId, PaymentQuote)> = + Vec::with_capacity(quotes_with_peers.len()); + let mut quotes_with_prices: Vec<(PaymentQuote, Amount)> = + Vec::with_capacity(quotes_with_peers.len()); + + for (peer_id, quote, price) in quotes_with_peers { + let encoded_peer_id = hex_node_id_to_encoded_peer_id(&peer_id.to_hex())?; + peer_quotes.push((encoded_peer_id, quote.clone())); + quotes_with_prices.push((quote, price)); + } + + let payment = SingleNodePayment::from_quotes(quotes_with_prices)?; + + Ok(PreparedChunk { + content, + address, + peer_quotes, + payment, + target_peer, + }) + } + + /// Pay for multiple prepared chunks in a single EVM transaction. + /// + /// Returns [`PaidChunk`]s ready for storage via [`put_chunk_with_proof`](Self::put_chunk_with_proof). + /// Use this for pipelined uploads where stores from wave N overlap with + /// quotes for wave N+1. + /// + /// # Errors + /// + /// Returns an error if the EVM payment fails. + pub async fn batch_pay(&self, prepared: Vec) -> Result> { + let Some(ref wallet) = self.wallet else { + return Err(Error::Payment( + "Wallet not configured - use with_wallet() to enable payments".to_string(), + )); + }; + + if prepared.is_empty() { + return Ok(Vec::new()); + } + + let total_amount: Amount = prepared.iter().map(|p| p.payment.total_amount()).sum(); + let chunk_count = prepared.len(); + info!("Batch payment for {chunk_count} chunks: {total_amount} atto total"); + + let all_quote_payments: Vec<(ant_evm::QuoteHash, ant_evm::RewardsAddress, Amount)> = + prepared + .iter() + .flat_map(|p| &p.payment.quotes) + .map(|q| (q.quote_hash, q.rewards_address, q.amount)) + .collect(); + + let (tx_hash_map, _gas_info) = wallet.pay_for_quotes(all_quote_payments).await.map_err( + |evmlib::wallet::PayForQuotesError(err, _)| { + Error::Payment(format!("Batch payment failed: {err}")) + }, + )?; + + let unique_tx_count = { + let mut txs: Vec<_> = tx_hash_map.values().collect(); + txs.sort(); + txs.dedup(); + txs.len() + }; + info!("Batch payment successful: {unique_tx_count} on-chain transaction(s) for {chunk_count} chunks"); + + prepared + .into_iter() + .map(|prep| { + let chunk_tx_hashes = Self::collect_chunk_tx_hashes(&prep.payment, &tx_hash_map); + let proof = PaymentProof { + proof_of_payment: ProofOfPayment { + peer_quotes: prep.peer_quotes, + }, + tx_hashes: chunk_tx_hashes.clone(), + }; + let proof_bytes = rmp_serde::to_vec(&proof).map_err(|e| { + Error::Network(format!("Failed to serialize payment proof: {e}")) + })?; + Ok(PaidChunk { + content: prep.content, + proof_bytes, + tx_hashes: chunk_tx_hashes, + target_peer: prep.target_peer, + }) + }) + .collect() + } + + /// Pay for multiple chunks in a single EVM transaction, then store them. + /// + /// Convenience wrapper around [`batch_pay`](Self::batch_pay) followed by + /// concurrent [`put_chunk_with_proof`](Self::put_chunk_with_proof) calls. + /// + /// # Errors + /// + /// Returns an error if payment or any chunk storage fails. + pub async fn batch_pay_and_store( + &self, + prepared: Vec, + ) -> Result)>> { + let chunk_count = prepared.len(); + let paid_chunks = self.batch_pay(prepared).await?; + + let mut store_futures = FuturesUnordered::new(); + for (idx, paid) in paid_chunks.into_iter().enumerate() { + let tx_hashes = paid.tx_hashes.clone(); + let target_peer = paid.target_peer; + let fut = async move { + let address = self + .put_chunk_with_proof(paid.content, paid.proof_bytes, &target_peer) + .await?; + Ok::<_, Error>((idx, address, tx_hashes)) + }; + store_futures.push(fut); + } + + let mut results: Vec)>> = + vec![None; chunk_count]; + while let Some(result) = store_futures.next().await { + let (idx, address, tx_hashes) = result?; + results[idx] = Some((address, tx_hashes)); + } + + results + .into_iter() + .enumerate() + .map(|(i, opt)| { + opt.ok_or_else(|| { + Error::Network(format!("Missing store result for chunk index {i}")) + }) + }) + .collect() + } + + /// Extract transaction hashes relevant to a single chunk's payment. + fn collect_chunk_tx_hashes( + payment: &SingleNodePayment, + tx_hash_map: &BTreeMap, + ) -> Vec { + payment + .quotes + .iter() + .filter(|q| q.amount > Amount::ZERO) + .filter_map(|q| tx_hash_map.get(&q.quote_hash).copied()) + .collect() + } + /// Store a chunk on the saorsa network. /// /// Requires a wallet to be configured. Delegates to @@ -570,7 +795,7 @@ impl QuantumClient { pub async fn get_quotes_from_dht( &self, content: &[u8], - ) -> Result> { + ) -> Result<(PeerId, Vec<(PeerId, PaymentQuote, Amount)>)> { let address = compute_address(content); let data_size = u64::try_from(content.len()) .map_err(|e| Error::Network(format!("Content size too large: {e}")))?; @@ -604,7 +829,7 @@ impl QuantumClient { &self, address: &XorName, data_size: u64, - ) -> Result> { + ) -> Result<(PeerId, Vec<(PeerId, PaymentQuote, Amount)>)> { let Some(ref node) = self.p2p_node else { return Err(Error::Network("P2P node not configured".into())); }; @@ -666,11 +891,16 @@ impl QuantumClient { ); } + // Pin the closest peer as the storage target. This peer is always + // among the quoted set, so the payment proof will include it. + let closest_peer = remote_peers[0]; + if tracing::enabled!(tracing::Level::DEBUG) { debug!( - "Found {} remote peers, requesting quotes from first {}", + "Found {} remote peers, requesting quotes from first {} (closest: {})", remote_peers.len(), - REQUIRED_QUOTES + REQUIRED_QUOTES, + closest_peer ); } @@ -781,7 +1011,7 @@ impl QuantumClient { info!("Collected {quote_count} quotes for chunk {addr_hex}"); } - Ok(quotes_with_peers) + Ok((closest_peer, quotes_with_peers)) } } diff --git a/src/client/self_encrypt.rs b/src/client/self_encrypt.rs index f028ea98..50c5487c 100644 --- a/src/client/self_encrypt.rs +++ b/src/client/self_encrypt.rs @@ -13,26 +13,32 @@ //! uploaded. Only the holder of the `DataMap` can access the file. use crate::client::data_types::XorName as ChunkAddress; -use crate::client::quantum::QuantumClient; +use crate::client::quantum::{PaidChunk, PreparedChunk, QuantumClient}; use crate::error::{Error, Result}; use bytes::Bytes; use futures::stream::{FuturesUnordered, StreamExt}; use self_encryption::DataMap; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::future::Future; use std::hash::BuildHasher; use std::io::{BufReader, Read, Write}; use std::path::Path; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::runtime::Handle; -use tracing::{debug, info, warn}; +use tracing::{info, warn}; use xor_name::XorName; -/// Maximum number of concurrent chunk uploads. -const UPLOAD_CONCURRENCY: usize = 4; - /// Size of the read buffer used when streaming file data into the encryptor. const READ_BUFFER_SIZE: usize = 64 * 1024; +/// Maximum chunks per payment wave. +/// +/// Balances EVM gas efficiency (more chunks per tx = fewer on-chain transactions) +/// against pipeline responsiveness (smaller waves = earlier store overlap). +/// evmlib supports up to 256 non-zero payments per transaction. +const PAYMENT_WAVE_SIZE: usize = 64; + /// Shared error capture used by `open_encrypt_stream`. type ReadErrorCapture = Arc>>; @@ -127,18 +133,32 @@ fn write_stream_to_file( Ok(()) } -/// Encrypt a file using streaming self-encryption and upload chunks concurrently. +/// Encrypt a file using streaming self-encryption and upload chunks with +/// pipelined, wave-based EVM payment. +/// +/// The upload proceeds as follows: +/// 1. **Stream** encrypted chunks lazily from the file — at most one wave +/// of chunks lives in memory at a time. +/// 2. **Wave loop** — for each wave of `PAYMENT_WAVE_SIZE` chunks: +/// - **Quote** the wave concurrently, while draining completed stores +/// from the previous wave via `select!`. +/// - **Pay** the wave in a single EVM transaction. +/// - **Launch stores** for the wave (non-blocking, added to the shared +/// store pool). +/// 3. **Drain** — await any remaining in-flight stores. +/// 4. **`DataMap`** — extract the `DataMap` after the encryption stream is +/// exhausted. /// -/// Chunks are streamed lazily from the encryption iterator and uploaded with -/// bounded parallelism (`UPLOAD_CONCURRENCY` uploads in flight at once). -/// Peak memory is bounded by the concurrency limit, not the file size. +/// This gives us batched payments (no nonce collisions, fewer on-chain txs), +/// pipelining (stores from wave N overlap with quotes for wave N+1), and +/// bounded memory (only one wave of chunks buffered at a time). /// /// Returns the `DataMap` after all chunks are uploaded, plus the list of /// transaction hash strings from payment. /// /// # Errors /// -/// Returns an error if encryption fails, or any chunk upload fails. +/// Returns an error if encryption, quoting, payment, or storage fails. #[allow(clippy::too_many_lines)] pub async fn encrypt_and_upload_file( file_path: &Path, @@ -155,103 +175,141 @@ pub async fn encrypt_and_upload_file( ); let (mut stream, read_error) = open_encrypt_stream(file_path, file_size)?; - let mut all_tx_hashes: Vec = Vec::new(); - let mut chunk_num: usize = 0; - let mut uploaded_chunks: usize = 0; - + let mut chunk_count: usize = 0; + let mut duplicates_skipped: usize = 0; + + // Track chunk addresses already paid for to avoid duplicate payments + // across waves. Content-addressed chunks (BLAKE3) with identical content + // share the same address, so we only need to pay once. + let mut paid_addresses: HashSet = HashSet::new(); + + // Shared pool of in-flight store operations across all waves. + let mut store_futs: FuturesUnordered< + Pin> + Send + '_>>, + > = FuturesUnordered::new(); + + // Stream chunks lazily in waves — only one wave of content in memory at a time. + // The block scope ensures `chunks_iter` (which borrows `stream` mutably) + // is dropped before we call `stream.into_datamap()`. { - let mut in_flight = FuturesUnordered::new(); let mut chunks_iter = stream.chunks(); - let mut iter_exhausted = false; + let mut wave_idx: usize = 0; loop { - // Fill up to UPLOAD_CONCURRENCY uploads - while !iter_exhausted && in_flight.len() < UPLOAD_CONCURRENCY { - match chunks_iter.next() { - Some(chunk_result) => { - let (hash, content) = chunk_result - .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; - chunk_num += 1; - let num = chunk_num; - debug!("Uploading encrypted chunk {num}"); - let fut = async move { - let result = client.put_chunk_with_payment(content).await; - (num, hash, result) - }; - in_flight.push(fut); - } - None => { - iter_exhausted = true; - } + // Pull the next wave of chunks from the encryption stream, + // skipping any chunk whose address was already paid in a prior wave. + let mut wave: Vec = Vec::with_capacity(PAYMENT_WAVE_SIZE); + for chunk_result in chunks_iter.by_ref() { + let (hash, content) = chunk_result + .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + if !paid_addresses.insert(hash) { + duplicates_skipped += 1; + continue; + } + wave.push(content); + if wave.len() >= PAYMENT_WAVE_SIZE { + break; } } - if in_flight.is_empty() { + if wave.is_empty() { break; } - // Await the next completed upload - let (num, hash, result) = in_flight - .next() - .await - .ok_or_else(|| Error::Crypto("Upload stream unexpectedly empty".into()))?; - match result { - Ok((address, tx_hashes)) => { - // Always capture tx hashes, even on mismatch - all_tx_hashes.extend(tx_hashes.iter().map(|tx| format!("{tx:?}"))); - uploaded_chunks += 1; - if address != hash.0 { - // Drain remaining in-flight futures before returning - while let Some((_, _, res)) = in_flight.next().await { - if let Ok((_, txs)) = res { - all_tx_hashes.extend(txs.iter().map(|tx| format!("{tx:?}"))); - uploaded_chunks += 1; - } - } - if uploaded_chunks > 0 { - warn!( - "{uploaded_chunks} chunk(s) already uploaded before hash mismatch on chunk {num}; \ - tx_hashes so far: {all_tx_hashes:?}" - ); - } - return Err(Error::Crypto(format!( - "Hash mismatch for chunk {num}: self_encryption={} network={}", - hex::encode(hash.0), - hex::encode(address) - ))); - } - } - Err(e) => { - // Drain remaining in-flight futures so we don't lose paid chunks - while let Some((_, _, res)) = in_flight.next().await { - if let Ok((_, txs)) = res { - all_tx_hashes.extend(txs.iter().map(|tx| format!("{tx:?}"))); - uploaded_chunks += 1; - } - } - if uploaded_chunks > 0 { - warn!( - "{uploaded_chunks} chunk(s) already uploaded successfully before failure on chunk {num}; \ - tx_hashes so far: {all_tx_hashes:?}" - ); - } - return Err(e); - } + let wave_size = wave.len(); + chunk_count += wave_size; + info!( + "Wave {wave_idx}: quoting {wave_size} chunks ({} stores in flight)", + store_futs.len() + ); + + // Quote this wave concurrently, draining completed stores in parallel. + let prepared = quote_wave_pipelined(&wave, client, &mut store_futs).await?; + + // Pay for this wave (single EVM transaction). + let paid = client.batch_pay(prepared).await?; + + // Launch stores for this wave — content moves into the futures, + // freeing the wave buffer for the next iteration. + for paid_chunk in paid { + all_tx_hashes.extend(paid_chunk.tx_hashes.iter().map(|tx| format!("{tx:?}"))); + store_futs.push(Box::pin(store_paid_chunk(client, paid_chunk))); } + + wave_idx += 1; } } + // Drain remaining stores. + while let Some(result) = store_futs.next().await { + result?; + } + check_read_error(&read_error)?; let data_map = stream .into_datamap() .ok_or_else(|| Error::Crypto("DataMap not available after encryption".into()))?; - info!("All {chunk_num} encrypted chunks uploaded"); + if duplicates_skipped > 0 { + info!( + "All {chunk_count} unique encrypted chunks uploaded ({duplicates_skipped} duplicates skipped)" + ); + } else { + info!("All {chunk_count} encrypted chunks uploaded"); + } Ok((data_map, all_tx_hashes)) } +/// Store a single paid chunk on the network. +async fn store_paid_chunk(client: &QuantumClient, paid: PaidChunk) -> Result { + client + .put_chunk_with_proof(paid.content, paid.proof_bytes, &paid.target_peer) + .await +} + +/// Quote a wave of chunks while draining completed stores from prior waves. +/// +/// Uses `select!` to multiplex between collecting quotes for the current wave +/// and acknowledging completed stores, so stores from the previous wave make +/// progress concurrently with the current wave's DHT quote requests. +async fn quote_wave_pipelined<'a>( + wave: &[Bytes], + client: &'a QuantumClient, + store_futs: &mut FuturesUnordered< + Pin> + Send + 'a>>, + >, +) -> Result> { + let wave_len = wave.len(); + let mut quote_futs = FuturesUnordered::new(); + + for (idx, content) in wave.iter().enumerate() { + let content = content.clone(); + let fut = async move { (idx, client.prepare_chunk_payment(content).await) }; + quote_futs.push(fut); + } + + let mut results: Vec<(usize, PreparedChunk)> = Vec::with_capacity(wave_len); + + while results.len() < wave_len { + tokio::select! { + biased; + // Drain completed stores from previous waves to free resources. + Some(store_result) = store_futs.next(), if !store_futs.is_empty() => { + store_result?; + } + // Collect quotes for this wave. + Some((idx, quote_result)) = quote_futs.next() => { + results.push((idx, quote_result?)); + } + } + } + + results.sort_by_key(|(idx, _)| *idx); + Ok(results.into_iter().map(|(_, prep)| prep).collect()) +} + /// Encrypt a file from disk using `stream_encrypt`, returning the `DataMap` /// and a list of `(XorName, Bytes)` encrypted chunks. fn encrypt_file_to_chunks( diff --git a/src/devnet.rs b/src/devnet.rs index 3059f475..e8a6f0da 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -14,9 +14,11 @@ use ant_evm::RewardsAddress; use evmlib::Network as EvmNetwork; use rand::Rng; use saorsa_core::identity::NodeIdentity; -use saorsa_core::{IPDiversityConfig, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId}; +use saorsa_core::{ + IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId, +}; use serde::{Deserialize, Serialize}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -228,7 +230,7 @@ pub struct DevnetManifest { /// Node count. pub node_count: usize, /// Bootstrap addresses. - pub bootstrap: Vec, + pub bootstrap: Vec, /// Data directory. pub data_dir: PathBuf, /// Creation time in RFC3339. @@ -294,13 +296,12 @@ pub struct DevnetNode { label: String, peer_id: PeerId, port: u16, - address: SocketAddr, data_dir: PathBuf, p2p_node: Option>, ant_protocol: Option>, is_bootstrap: bool, state: Arc>, - bootstrap_addrs: Vec, + bootstrap_addrs: Vec, protocol_task: Option>, } @@ -465,11 +466,11 @@ impl Devnet { /// Get bootstrap addresses. #[must_use] - pub fn bootstrap_addrs(&self) -> Vec { + pub fn bootstrap_addrs(&self) -> Vec { self.nodes .iter() .take(self.config.bootstrap_count) - .map(|n| n.address) + .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port)))) .collect() } @@ -493,7 +494,7 @@ impl Devnet { let regular_count = self.config.node_count - self.config.bootstrap_count; info!("Starting {} regular nodes", regular_count); - let bootstrap_addrs: Vec = self + let bootstrap_addrs: Vec = self .nodes .get(0..self.config.bootstrap_count) .ok_or_else(|| { @@ -504,7 +505,7 @@ impl Devnet { )) })? .iter() - .map(|n| n.address) + .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port)))) .collect(); for i in self.config.bootstrap_count..self.config.node_count { @@ -521,12 +522,11 @@ impl Devnet { &self, index: usize, is_bootstrap: bool, - bootstrap_addrs: Vec, + bootstrap_addrs: Vec, ) -> Result { let index_u16 = u16::try_from(index) .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?; let port = self.config.base_port + index_u16; - let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); // Generate identity first so we can use peer_id as the directory name let identity = NodeIdentity::generate() @@ -553,7 +553,6 @@ impl Devnet { label, peer_id, port, - address, data_dir, p2p_node: None, ant_protocol: Some(Arc::new(ant_protocol)), @@ -621,10 +620,14 @@ impl Devnet { debug!("Starting node {} on port {}", node.index, node.port); *node.state.write().await = NodeState::Starting; - let mut core_config = CoreNodeConfig::new() + let mut core_config = CoreNodeConfig::builder() + .port(node.port) + .local(true) + .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE) + .build() .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?; - // Load the node identity for app-level message signing + // Load the node identity for app-level message signing. let identity = NodeIdentity::load_from_file( &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME), ) @@ -632,13 +635,7 @@ impl Devnet { .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?; core_config.node_identity = Some(Arc::new(identity)); - core_config.listen_addr = node.address; - core_config.listen_addrs = vec![node.address]; - core_config.enable_ipv6 = false; - core_config - .bootstrap_peers - .clone_from(&node.bootstrap_addrs); - core_config.max_message_size = Some(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE); + core_config.bootstrap_peers = node.bootstrap_addrs.clone(); core_config.diversity_config = Some(IPDiversityConfig::permissive()); let index = node.index; diff --git a/src/lib.rs b/src/lib.rs index 15bb5a70..42560d40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,10 +10,10 @@ //! ## Architecture //! //! `saorsa-node` delegates all core functionality to `saorsa-core`: -//! - Networking via `NetworkCoordinator` -//! - DHT via `TrustWeightedKademlia` -//! - Trust via `EigenTrustEngine` -//! - Security via `SecurityManager` +//! - Networking via `P2PNode` +//! - DHT via `AdaptiveDHT` +//! - Trust via `TrustEngine` +//! - Security via `IPDiversityConfig` //! //! ## Data Types //! diff --git a/src/node.rs b/src/node.rs index 8a3f1ea0..552e03c9 100644 --- a/src/node.rs +++ b/src/node.rs @@ -17,10 +17,9 @@ use evmlib::Network as EvmNetwork; use saorsa_core::identity::NodeIdentity; use saorsa_core::{ BootstrapConfig as CoreBootstrapConfig, BootstrapManager, - IPDiversityConfig as CoreDiversityConfig, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, - ProductionConfig as CoreProductionConfig, + IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, + P2PNode, }; -use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Semaphore; @@ -172,44 +171,35 @@ impl NodeBuilder { /// Build the saorsa-core `NodeConfig` from our config. fn build_core_config(config: &NodeConfig) -> Result { - // Determine listen address based on port and IP version - let port = config.port; - let listen_addr: SocketAddr = match config.ip_version { - IpVersion::Ipv4 | IpVersion::Dual => format!("0.0.0.0:{port}") - .parse() - .map_err(|e| Error::Config(format!("Invalid listen address: {e}")))?, - IpVersion::Ipv6 => format!("[::]:{port}") - .parse() - .map_err(|e| Error::Config(format!("Invalid listen address: {e}")))?, - }; - - let mut core_config = CoreNodeConfig::new() + let ipv6 = matches!(config.ip_version, IpVersion::Ipv6 | IpVersion::Dual); + let local = matches!(config.network_mode, NetworkMode::Development); + + let mut core_config = CoreNodeConfig::builder() + .port(config.port) + .ipv6(ipv6) + .local(local) + .max_message_size(config.max_message_size) + .build() .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?; - // Set listen address - core_config.listen_addr = listen_addr; - core_config.listen_addrs = vec![listen_addr]; - - // Enable IPv6 if configured - core_config.enable_ipv6 = matches!(config.ip_version, IpVersion::Ipv6 | IpVersion::Dual); - // Add bootstrap peers. - core_config.bootstrap_peers.clone_from(&config.bootstrap); - - // Forward max_message_size to the transport layer. - core_config.max_message_size = Some(config.max_message_size); + core_config.bootstrap_peers = config + .bootstrap + .iter() + .map(|addr| MultiAddr::quic(*addr)) + .collect(); // Propagate network-mode tuning into saorsa-core where supported. match config.network_mode { NetworkMode::Production => { - core_config.production_config = Some(CoreProductionConfig::default()); core_config.diversity_config = Some(CoreDiversityConfig::default()); } NetworkMode::Testnet => { - core_config.production_config = Some(CoreProductionConfig::default()); + // Testnet allows loopback so nodes can be co-located on one machine. + core_config.allow_loopback = true; let mut diversity = CoreDiversityConfig::testnet(); diversity.max_nodes_per_asn = config.testnet.max_nodes_per_asn; - diversity.max_nodes_per_64 = config.testnet.max_nodes_per_64; + diversity.max_nodes_per_ipv6_64 = config.testnet.max_nodes_per_64; diversity.enable_geolocation_check = config.testnet.enable_geo_checks; diversity.min_geographic_diversity = if config.testnet.enable_geo_checks { 3 @@ -226,7 +216,6 @@ impl NodeBuilder { } } NetworkMode::Development => { - core_config.production_config = None; core_config.diversity_config = Some(CoreDiversityConfig::permissive()); } } @@ -565,17 +554,11 @@ impl RunningNode { // Log bootstrap cache stats before shutdown if let Some(ref manager) = self.bootstrap_manager { - match manager.get_stats().await { - Ok(stats) => { - info!( - "Bootstrap cache shutdown: {} contacts, avg quality {:.2}", - stats.total_contacts, stats.average_quality_score - ); - } - Err(e) => { - debug!("Failed to get bootstrap cache stats: {e}"); - } - } + let stats = manager.stats().await; + info!( + "Bootstrap cache shutdown: {} peers, avg quality {:.2}", + stats.total_peers, stats.average_quality + ); } // Stop protocol routing task @@ -748,7 +731,6 @@ mod tests { ..Default::default() }; let core = NodeBuilder::build_core_config(&config).expect("core config"); - assert!(core.production_config.is_some()); assert!(core.diversity_config.is_some()); } @@ -759,7 +741,6 @@ mod tests { ..Default::default() }; let core = NodeBuilder::build_core_config(&config).expect("core config"); - assert!(core.production_config.is_none()); let diversity = core.diversity_config.expect("diversity"); assert!(diversity.is_relaxed()); } diff --git a/tests/e2e/complete_payment_e2e.rs b/tests/e2e/complete_payment_e2e.rs index b65db98c..a6c65d7c 100644 --- a/tests/e2e/complete_payment_e2e.rs +++ b/tests/e2e/complete_payment_e2e.rs @@ -120,13 +120,13 @@ async fn test_complete_payment_flow_live_nodes() -> Result<(), Box { + Ok((target_peer, quotes)) => { info!("Got {} quotes on attempt {attempt}", quotes.len()); - quotes_with_prices = Some(quotes); + quote_result = Some((target_peer, quotes)); break; } Err(e) => { @@ -139,7 +139,8 @@ async fn test_complete_payment_flow_live_nodes() -> Result<(), Box Result<(), Box { @@ -384,11 +389,11 @@ async fn test_forged_signature_rejection() -> Result<(), Box { - quotes_with_prices = Some(quotes); + Ok((target_peer, quotes)) => { + quote_result = Some((target_peer, quotes)); break; } Err(e) => { @@ -400,7 +405,8 @@ async fn test_forged_signature_rejection() -> Result<(), Box = Vec::with_capacity(quotes_with_prices.len()); @@ -442,7 +448,11 @@ async fn test_forged_signature_rejection() -> Result<(), Box Result<(), Box { + Ok((_target_peer, quotes)) => { info!("Collected {} quotes despite failures", quotes.len()); match client.put_chunk(Bytes::from(test_data.to_vec())).await { Ok(_address) => { diff --git a/tests/e2e/integration_tests.rs b/tests/e2e/integration_tests.rs index d140c2fa..c9734241 100644 --- a/tests/e2e/integration_tests.rs +++ b/tests/e2e/integration_tests.rs @@ -312,8 +312,10 @@ async fn test_quantum_client_chunk_round_trip() { // client-side early-rejection fix). let content = Bytes::from("quantum client e2e test payload"); let dummy_proof = vec![0u8; 64]; + let peers = node.connected_peers().await; + let target_peer = peers.first().expect("Node should have connected peers"); let address = client - .put_chunk_with_proof(content.clone(), dummy_proof) + .put_chunk_with_proof(content.clone(), dummy_proof, target_peer) .await .expect("QuantumClient::put_chunk_with_proof should succeed"); diff --git a/tests/e2e/live_testnet.rs b/tests/e2e/live_testnet.rs index 87affe1b..ffaa867f 100644 --- a/tests/e2e/live_testnet.rs +++ b/tests/e2e/live_testnet.rs @@ -3,6 +3,9 @@ //! These tests connect to the live saorsa testnet for comprehensive testing. //! They are designed to be run via shell scripts that set environment variables. //! When environment variables are not set, the tests skip gracefully. +//! +//! TODO: Rewrite to use `QuantumClient` — `dht_put`/`dht_get` were removed +//! from `saorsa-core` v0.16 (`P2PNode` no longer exposes raw DHT operations). #![allow( clippy::unwrap_used, @@ -13,17 +16,10 @@ clippy::too_many_lines )] -use saorsa_core::{NodeConfig as CoreNodeConfig, P2PNode}; +use saorsa_core::{MultiAddr, NodeConfig as CoreNodeConfig, P2PNode}; use std::env; -use std::fs::File; -use std::io::{BufRead, BufReader, Write}; use std::net::SocketAddr; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::Semaphore; - -type XorName = [u8; 32]; +use std::time::Duration; /// Get bootstrap addresses from environment or use defaults. fn get_bootstrap_addrs() -> Vec { @@ -41,12 +37,14 @@ async fn create_testnet_client() -> P2PNode { let bootstrap_addrs = get_bootstrap_addrs(); println!("Connecting to testnet via: {bootstrap_addrs:?}"); - let mut config = CoreNodeConfig::new().expect("Failed to create config"); - config.bootstrap_peers = bootstrap_addrs; - - // Use a random port for the client - config.listen_addr = "127.0.0.1:0".parse().unwrap(); - config.listen_addrs = vec![]; + let mut config = CoreNodeConfig::builder() + .local(true) + .build() + .expect("Failed to create config"); + config.bootstrap_peers = bootstrap_addrs + .iter() + .map(|addr| MultiAddr::quic(*addr)) + .collect(); let node = P2PNode::new(config) .await @@ -65,392 +63,32 @@ async fn create_testnet_client() -> P2PNode { node } -/// Compute content address (BLAKE3 hash). -fn compute_address(data: &[u8]) -> XorName { - saorsa_node::compute_address(data) -} - -/// Generate random chunk data. -fn generate_chunk(index: usize, size_kb: usize) -> Vec { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - let size = size_kb * 1024; - let mut data = vec![0u8; size]; - - // Use index to generate deterministic but unique data - let mut hasher = DefaultHasher::new(); - index.hash(&mut hasher); - let seed = hasher.finish(); - - for (i, byte) in data.iter_mut().enumerate() { - *byte = ((seed.wrapping_add(i as u64)) % 256) as u8; - } - - data -} - /// Load test: store thousands of chunks on the testnet. /// -/// Environment variables: -/// - `SAORSA_TEST_LIVE`: Must be set to "true" to run this test -/// - `SAORSA_TEST_CHUNK_COUNT`: Number of chunks to store (default: 1000) -/// - `SAORSA_TEST_CHUNK_SIZE_KB`: Size of each chunk in KB (default: 1) -/// - `SAORSA_TEST_CONCURRENCY`: Concurrent operations (default: 10) -/// - `SAORSA_TEST_ADDRESSES_FILE`: File to write chunk addresses to +/// Disabled until rewritten for `QuantumClient` (saorsa-core 0.16 removed `dht_put`/`dht_get`). #[tokio::test] +#[ignore = "needs rewrite: dht_put/dht_get removed in saorsa-core 0.16"] async fn run_load_test() { - if env::var("SAORSA_TEST_LIVE").as_deref() != Ok("true") { - println!("Skipping: SAORSA_TEST_LIVE not set to 'true'"); - return; - } - - let chunk_count: usize = env::var("SAORSA_TEST_CHUNK_COUNT") - .unwrap_or_else(|_| "1000".to_string()) - .parse() - .expect("Invalid SAORSA_TEST_CHUNK_COUNT"); - - let chunk_size_kb: usize = env::var("SAORSA_TEST_CHUNK_SIZE_KB") - .unwrap_or_else(|_| "1".to_string()) - .parse() - .expect("Invalid SAORSA_TEST_CHUNK_SIZE_KB"); - - let concurrency: usize = env::var("SAORSA_TEST_CONCURRENCY") - .unwrap_or_else(|_| "10".to_string()) - .parse() - .expect("Invalid SAORSA_TEST_CONCURRENCY"); - - let addresses_file = env::var("SAORSA_TEST_ADDRESSES_FILE") - .unwrap_or_else(|_| "chunk-addresses.txt".to_string()); - - println!("=== Load Test Configuration ==="); - println!("Chunk count: {chunk_count}"); - println!("Chunk size: {chunk_size_kb}KB"); - println!("Concurrency: {concurrency}"); - println!("Addresses file: {addresses_file}"); - println!(); - - let node = Arc::new(create_testnet_client().await); - let semaphore = Arc::new(Semaphore::new(concurrency)); - let stored_count = Arc::new(AtomicUsize::new(0)); - let failed_count = Arc::new(AtomicUsize::new(0)); - - // Open file for writing addresses - let file = Arc::new(std::sync::Mutex::new( - File::create(&addresses_file).expect("Failed to create addresses file"), - )); - - let start_time = Instant::now(); - - println!("=== Storing {chunk_count} chunks ==="); - - let mut handles = vec![]; - - for i in 0..chunk_count { - let node = Arc::clone(&node); - let semaphore = Arc::clone(&semaphore); - let stored = Arc::clone(&stored_count); - let failed = Arc::clone(&failed_count); - let file = Arc::clone(&file); - - let handle = tokio::spawn(async move { - let _permit = semaphore.acquire().await.expect("Semaphore closed"); - - let data = generate_chunk(i, chunk_size_kb); - let address = compute_address(&data); - - match node.dht_put(address, data).await { - Ok(()) => { - stored.fetch_add(1, Ordering::SeqCst); - - // Write address to file - let hex_addr = hex::encode(address); - if let Ok(mut f) = file.lock() { - writeln!(f, "{hex_addr}").ok(); - } - - if i % 100 == 0 { - println!("Stored chunk {} / {chunk_count}", i + 1); - } - } - Err(e) => { - failed.fetch_add(1, Ordering::SeqCst); - eprintln!("Failed to store chunk {i}: {e}"); - } - } - }); - - handles.push(handle); - } - - // Wait for all operations - for handle in handles { - let _ = handle.await; - } - - let duration = start_time.elapsed(); - let stored = stored_count.load(Ordering::SeqCst); - let failed = failed_count.load(Ordering::SeqCst); - - println!(); - println!("=== Load Test Results ==="); - println!("Duration: {duration:?}"); - println!("Stored: {stored} / {chunk_count}"); - println!("Failed: {failed}"); - println!( - "Throughput: {:.2} chunks/sec", - stored as f64 / duration.as_secs_f64() - ); - println!("Addresses written to: {addresses_file}"); - - // Cleanup - if let Err(e) = node.shutdown().await { - eprintln!("Error shutting down node: {e}"); - } - - assert!( - failed == 0, - "Some chunks failed to store: {failed} / {chunk_count}" - ); + let _node = create_testnet_client().await; + unimplemented!("rewrite with QuantumClient"); } /// Verify chunks: check that all stored chunks are retrievable. /// -/// Environment variables: -/// - `SAORSA_TEST_LIVE`: Must be set to "true" to run this test -/// - `SAORSA_TEST_ADDRESSES_FILE`: File containing chunk addresses to verify -/// - `SAORSA_TEST_SAMPLE_SIZE`: Number of chunks to sample (default: all) +/// Disabled until rewritten for `QuantumClient` (saorsa-core 0.16 removed `dht_put`/`dht_get`). #[tokio::test] +#[ignore = "needs rewrite: dht_put/dht_get removed in saorsa-core 0.16"] async fn run_verify_chunks() { - if env::var("SAORSA_TEST_LIVE").as_deref() != Ok("true") { - println!("Skipping: SAORSA_TEST_LIVE not set to 'true'"); - return; - } - - let addresses_file = - env::var("SAORSA_TEST_ADDRESSES_FILE").expect("SAORSA_TEST_ADDRESSES_FILE not set"); - - let sample_size: Option = env::var("SAORSA_TEST_SAMPLE_SIZE") - .ok() - .and_then(|s| s.parse().ok()); - - println!("=== Chunk Verification ==="); - println!("Addresses file: {addresses_file}"); - - // Read addresses from file - let file = File::open(&addresses_file).expect("Failed to open addresses file"); - let reader = BufReader::new(file); - let addresses: Vec = reader - .lines() - .filter_map(|line| { - line.ok().and_then(|s| { - let bytes = hex::decode(s.trim()).ok()?; - if bytes.len() == 32 { - let mut addr = [0u8; 32]; - addr.copy_from_slice(&bytes); - Some(addr) - } else { - None - } - }) - }) - .collect(); - - let total_addresses = addresses.len(); - println!("Total addresses: {total_addresses}"); - - // Sample if requested - let addresses_to_verify: Vec = if let Some(sample) = sample_size { - use rand::seq::SliceRandom; - let mut rng = rand::thread_rng(); - let mut sampled = addresses; - sampled.shuffle(&mut rng); - sampled.into_iter().take(sample).collect() - } else { - addresses - }; - - let addresses_len = addresses_to_verify.len(); - println!("Verifying: {addresses_len} chunks"); - println!(); - - let node = Arc::new(create_testnet_client().await); - let verified_count = Arc::new(AtomicUsize::new(0)); - let missing_count = Arc::new(AtomicUsize::new(0)); - let error_count = Arc::new(AtomicUsize::new(0)); - - let semaphore = Arc::new(Semaphore::new(20)); // Higher concurrency for reads - let start_time = Instant::now(); - - let mut handles = vec![]; - - for (i, address) in addresses_to_verify.iter().enumerate() { - let node = Arc::clone(&node); - let semaphore = Arc::clone(&semaphore); - let verified = Arc::clone(&verified_count); - let missing = Arc::clone(&missing_count); - let errors = Arc::clone(&error_count); - let addr = *address; - let total = addresses_to_verify.len(); - - let handle = tokio::spawn(async move { - let _permit = semaphore.acquire().await.expect("Semaphore closed"); - - match node.dht_get(addr).await { - Ok(Some(_data)) => { - verified.fetch_add(1, Ordering::SeqCst); - } - Ok(None) => { - missing.fetch_add(1, Ordering::SeqCst); - eprintln!("MISSING: {}", hex::encode(addr)); - } - Err(e) => { - errors.fetch_add(1, Ordering::SeqCst); - eprintln!("ERROR retrieving {}: {e}", hex::encode(addr)); - } - } - - if (i + 1) % 100 == 0 { - println!("Verified {} / {total}", i + 1); - } - }); - - handles.push(handle); - } - - // Wait for all operations - for handle in handles { - let _ = handle.await; - } - - let duration = start_time.elapsed(); - let verified = verified_count.load(Ordering::SeqCst); - let missing = missing_count.load(Ordering::SeqCst); - let errors = error_count.load(Ordering::SeqCst); - - println!(); - println!("=== Verification Results ==="); - println!("Duration: {duration:?}"); - println!("verified: {verified}"); - println!("total: {addresses_len}"); - println!("Missing: {missing}"); - println!("Errors: {errors}"); - println!( - "Availability: {:.2}%", - (verified as f64 / addresses_len as f64) * 100.0 - ); - - // Cleanup - if let Err(e) = node.shutdown().await { - eprintln!("Error shutting down node: {e}"); - } - - // Test passes if 100% available - if missing == 0 && errors == 0 { - println!("PASSED: All chunks are available!"); - } else { - panic!("FAILED: {missing} missing, {errors} errors out of {addresses_len} total"); - } + let _node = create_testnet_client().await; + unimplemented!("rewrite with QuantumClient"); } /// Comprehensive data test: store, retrieve, and verify. /// -/// This test stores a moderate number of chunks and immediately verifies -/// they can be retrieved from different parts of the network. -/// -/// Set `SAORSA_TEST_EXTERNAL=true` to run this test. +/// Disabled until rewritten for `QuantumClient` (saorsa-core 0.16 removed `dht_put`/`dht_get`). #[tokio::test] +#[ignore = "needs rewrite: dht_put/dht_get removed in saorsa-core 0.16"] async fn run_comprehensive_data_tests() { - if env::var("SAORSA_TEST_EXTERNAL").is_err() { - println!("Skipping: SAORSA_TEST_EXTERNAL not set"); - return; - } - - println!("=== Comprehensive Data Tests ==="); - println!(); - - let node = Arc::new(create_testnet_client().await); - - // Test 1: Store and retrieve various chunk sizes - println!("--- Test 1: Various Chunk Sizes ---"); - let sizes_kb = [1, 4, 16, 64, 256]; - - for size_kb in sizes_kb { - let data = generate_chunk(size_kb, size_kb); - let address = compute_address(&data); - - println!("Storing {size_kb}KB chunk..."); - node.dht_put(address, data.clone()) - .await - .expect("Failed to store chunk"); - - // Small delay to allow replication - tokio::time::sleep(Duration::from_millis(500)).await; - - println!("Retrieving {size_kb}KB chunk..."); - let retrieved = node - .dht_get(address) - .await - .expect("Failed to retrieve chunk") - .expect("Chunk not found"); - - assert_eq!(data, retrieved, "Data mismatch for {size_kb}KB chunk"); - println!(" OK: {size_kb}KB chunk verified"); - } - - // Test 2: Concurrent storage and retrieval - println!(); - println!("--- Test 2: Concurrent Operations ---"); - let concurrent_count = 50; - - let mut addresses = vec![]; - let mut handles = vec![]; - - for i in 0..concurrent_count { - let node = Arc::clone(&node); - let handle = tokio::spawn(async move { - let data = generate_chunk(1000 + i, 4); - let address = compute_address(&data); - - node.dht_put(address, data).await.expect("Store failed"); - address - }); - handles.push(handle); - } - - for handle in handles { - let addr = handle.await.expect("Task panicked"); - addresses.push(addr); - } - - println!("Stored {concurrent_count} chunks concurrently"); - - // Verify all can be retrieved - tokio::time::sleep(Duration::from_secs(2)).await; - - let mut verified = 0; - for addr in &addresses { - if node.dht_get(*addr).await.expect("Get failed").is_some() { - verified += 1; - } - } - - let addresses_len = addresses.len(); - println!("Retrieved {verified} / {addresses_len} chunks"); - assert_eq!(verified, addresses_len, "Not all chunks were retrievable"); - - // Test 3: Network distribution check - println!(); - println!("--- Test 3: Network Distribution ---"); - let peer_count = node.peer_count().await; - println!("Connected to {peer_count} peers"); - assert!(peer_count >= 3, "Should be connected to at least 3 peers"); - - // Cleanup - if let Err(e) = node.shutdown().await { - eprintln!("Error shutting down node: {e}"); - } - - println!(); - println!("=== All Comprehensive Tests Passed ==="); + let _node = create_testnet_client().await; + unimplemented!("rewrite with QuantumClient"); } diff --git a/tests/e2e/payment_flow.rs b/tests/e2e/payment_flow.rs index 1aa11eb5..3645255d 100644 --- a/tests/e2e/payment_flow.rs +++ b/tests/e2e/payment_flow.rs @@ -456,7 +456,7 @@ async fn test_quote_collection_via_dht() -> Result<(), Box Result<(), Box> // =========================================================================== /// Helper: get quotes from DHT with retries (up to 5 attempts, exponential backoff). +/// +/// Returns the target peer (closest to the chunk address, pinned during quoting) +/// alongside the quotes. async fn get_quotes_with_retries( client: &QuantumClient, test_data: &[u8], ) -> Result< - Vec<( + ( saorsa_core::identity::PeerId, - ant_evm::PaymentQuote, - ant_evm::Amount, - )>, + Vec<( + saorsa_core::identity::PeerId, + ant_evm::PaymentQuote, + ant_evm::Amount, + )>, + ), String, > { let mut last_err = String::new(); for attempt in 1..=5u32 { match client.get_quotes_from_dht(test_data).await { - Ok(quotes) => { + Ok((target_peer, quotes)) => { info!("Got {} quotes on attempt {attempt}", quotes.len()); - return Ok(quotes); + return Ok((target_peer, quotes)); } Err(e) => { last_err = format!("{e}"); @@ -340,7 +346,7 @@ async fn test_attack_forged_ml_dsa_signature() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box { @@ -475,7 +489,11 @@ async fn test_attack_replay_different_chunk() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box { @@ -638,7 +660,7 @@ async fn test_attack_double_spend_same_proof() -> Result<(), Box Result<(), Box Result<(), Box>, /// Bootstrap addresses this node connects to. - pub bootstrap_addrs: Vec, + pub bootstrap_addrs: Vec, /// ML-DSA-65 identity used for quote signing. /// @@ -481,8 +478,10 @@ impl TestNode { // Compute the chunk address let address = Self::compute_chunk_address(data); - // Get quotes from the network (includes peer IDs for proof of payment) - let quotes_with_peers = client + // Get quotes from the network (includes peer IDs for proof of payment). + // The target_peer is the closest peer pinned during quoting — we store to + // this peer to guarantee the storage target was paid. + let (target_peer, quotes_with_peers) = client .get_quotes_from_dht(data) .await .map_err(|e| TestnetError::Storage(format!("Failed to get quotes: {e}")))?; @@ -526,7 +525,7 @@ impl TestNode { // Use put_chunk_with_proof to send the pre-built proof, avoiding a // redundant quote+pay cycle that put_chunk_with_payment would perform. client - .put_chunk_with_proof(Bytes::from(data.to_vec()), proof_bytes) + .put_chunk_with_proof(Bytes::from(data.to_vec()), proof_bytes, &target_peer) .await .map_err(|e| TestnetError::Storage(format!("Client PUT error: {e}"))) } @@ -1109,12 +1108,12 @@ impl TestNetwork { let regular_count = self.config.node_count - self.config.bootstrap_count; info!("Starting {} regular nodes", regular_count); - let bootstrap_addrs: Vec = self + let bootstrap_addrs: Vec = self .nodes .get(0..self.config.bootstrap_count) .unwrap_or_default() .iter() - .map(|n| n.address) + .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port)))) .collect(); for i in self.config.bootstrap_count..self.config.node_count { @@ -1139,13 +1138,12 @@ impl TestNetwork { &self, index: usize, is_bootstrap: bool, - bootstrap_addrs: Vec, + bootstrap_addrs: Vec, ) -> Result { // Safe: node_count is validated in TestNetwork::new() to fit in u16 let index_u16 = u16::try_from(index) .map_err(|_| TestnetError::Config(format!("Node index {index} exceeds u16::MAX")))?; let port = self.config.base_port + index_u16; - let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let node_id = format!("test_node_{index}"); let data_dir = self.config.test_data_dir.join(&node_id); @@ -1170,7 +1168,6 @@ impl TestNetwork { index, node_id, port, - address, data_dir, p2p_node: None, ant_protocol: Some(Arc::new(ant_protocol)), @@ -1269,31 +1266,17 @@ impl TestNetwork { debug!("Starting node {} on port {}", node.index, node.port); *node.state.write().await = NodeState::Starting; - // Build configuration for saorsa-core P2PNode - let mut core_config = CoreNodeConfig::new() + // Build configuration for saorsa-core P2PNode. + // .local(true) auto-enables allow_loopback for test nodes on 127.0.0.1. + let mut core_config = CoreNodeConfig::builder() + .port(node.port) + .local(true) + .connection_timeout(Duration::from_secs(TEST_CORE_CONNECTION_TIMEOUT_SECS)) + .max_message_size(saorsa_node::ant_protocol::MAX_WIRE_MESSAGE_SIZE) + .build() .map_err(|e| TestnetError::Core(format!("Failed to create core config: {e}")))?; - core_config.listen_addr = node.address; - core_config.listen_addrs = vec![node.address]; - core_config.enable_ipv6 = false; // Disable IPv6 for local testing to avoid dual-stack binding issues - core_config.connection_timeout = Duration::from_secs(TEST_CORE_CONNECTION_TIMEOUT_SECS); - core_config - .bootstrap_peers - .clone_from(&node.bootstrap_addrs); - // Override the transport-layer message size to accommodate max-size - // chunks (4 MiB payload + serialization overhead = 5 MiB wire). - core_config.max_message_size = Some(saorsa_node::ant_protocol::MAX_WIRE_MESSAGE_SIZE); - // Generate a node identity so auto identity announce works on connect. - let identity = NodeIdentity::generate().map_err(|e| { - TestnetError::Core(format!( - "Failed to generate identity for node {}: {e}", - node.index - )) - })?; - core_config.node_identity = Some(Arc::new(identity)); - - // Allow localhost peers in DHT routing for test environments - // This prevents diversity filters from excluding peers on 127.0.0.1 + core_config.bootstrap_peers = node.bootstrap_addrs.clone(); core_config.diversity_config = Some(CoreDiversityConfig::permissive()); // Inject the ML-DSA identity so the P2PNode's transport peer ID