diff --git a/aa-core/src/userop/deployment.rs b/aa-core/src/userop/deployment.rs index d66f436..6145f64 100644 --- a/aa-core/src/userop/deployment.rs +++ b/aa-core/src/userop/deployment.rs @@ -42,6 +42,15 @@ pub trait DeploymentLock: Send + Sync { chain_id: u64, account_address: &Address, ) -> impl Future> + Send; + + /// Release a deployment lock only if it still holds the given `lock_id`. + /// Atomic compare-and-delete; returns true if a matching lock was removed. + fn release_lock_if_owner( + &self, + chain_id: u64, + account_address: &Address, + lock_id: &str, + ) -> impl Future> + Send; } pub enum DeploymentStatus { @@ -104,9 +113,16 @@ where if is_deployed { return Ok(DeploymentStatus::Deployed); } + + // Stale lock, not deployed: previous holder died without releasing. + // Reclaim only the lock we observed, so we don't delete a lock that + // another worker acquired while we were checking chain state. + self.lock + .release_lock_if_owner(chain_id, account_address, &lock_id) + .await?; + return Ok(DeploymentStatus::NotDeployed); } - // Either fresh lock or stale but not deployed return Ok(DeploymentStatus::BeingDeployed { stale: is_stale, lock_id, diff --git a/executors/src/external_bundler/deployment.rs b/executors/src/external_bundler/deployment.rs index ac8a969..14b4fc1 100644 --- a/executors/src/external_bundler/deployment.rs +++ b/executors/src/external_bundler/deployment.rs @@ -7,13 +7,17 @@ use serde::{Deserialize, Serialize}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use twmq::{ error::TwmqError, - redis::{AsyncCommands, Pipeline, aio::ConnectionManager}, + redis::{AsyncCommands, Pipeline, SetExpiry, SetOptions, aio::ConnectionManager}, }; use uuid::Uuid; const CACHE_PREFIX: &str = "deployment_cache"; const LOCK_PREFIX: &str = "deployment_lock"; +/// Fallback TTL so a lock that's never explicitly released (e.g. worker crash) +/// can't block the account forever. +const LOCK_TTL_SECONDS: u64 = 300; + #[derive(Clone)] pub struct RedisDeploymentCache { connection_manager: twmq::redis::aio::ConnectionManager, @@ -165,9 +169,13 @@ impl DeploymentLock for RedisDeploymentLock { message: format!("Serialization failed: {e}"), })?; - // Use SET NX EX for atomic acquire + // SET NX EX: atomic acquire with a fallback expiry. + let opts = SetOptions::default() + .conditional_set(twmq::redis::ExistenceCheck::NX) + .with_expiration(SetExpiry::EX(LOCK_TTL_SECONDS)); + let result: Option = - conn.set_nx(&key, &lock_data_str) + conn.set_options(&key, &lock_data_str, opts) .await .map_err(|e| EngineError::InternalError { message: format!("Lock acquire failed: {e}"), @@ -212,4 +220,38 @@ impl DeploymentLock for RedisDeploymentLock { Ok(deleted > 0) } + + async fn release_lock_if_owner( + &self, + chain_id: u64, + account_address: &Address, + lock_id: &str, + ) -> Result { + let mut conn = self.conn().clone(); + let key = self.lock_key(chain_id, account_address); + + // Atomic compare-and-delete: only DEL if the stored lock's lock_id matches. + let script = twmq::redis::Script::new( + r#" + local v = redis.call('GET', KEYS[1]) + if not v then return 0 end + local ok, data = pcall(cjson.decode, v) + if ok and data.lock_id == ARGV[1] then + return redis.call('DEL', KEYS[1]) + end + return 0 + "#, + ); + + let deleted: i64 = script + .key(&key) + .arg(lock_id) + .invoke_async(&mut conn) + .await + .map_err(|e| EngineError::InternalError { + message: format!("Failed to release lock for account {account_address}: {e}"), + })?; + + Ok(deleted > 0) + } }