Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion aa-core/src/userop/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ pub trait DeploymentLock: Send + Sync {
chain_id: u64,
account_address: &Address,
) -> impl Future<Output = Result<bool, EngineError>> + Send;

/// Release a deployment lock only if it still holds the given `lock_id`.
/// Atomic compare-and-delete; returns true if a matching lock was removed.
fn release_lock_if_owner(
&self,
chain_id: u64,
account_address: &Address,
lock_id: &str,
) -> impl Future<Output = Result<bool, EngineError>> + Send;
}

pub enum DeploymentStatus {
Expand Down Expand Up @@ -104,9 +113,16 @@ where
if is_deployed {
return Ok(DeploymentStatus::Deployed);
}

// Stale lock, not deployed: previous holder died without releasing.
// Reclaim only the lock we observed, so we don't delete a lock that
// another worker acquired while we were checking chain state.
self.lock
.release_lock_if_owner(chain_id, account_address, &lock_id)
.await?;
return Ok(DeploymentStatus::NotDeployed);
Comment thread
0xFirekeeper marked this conversation as resolved.
}

// Either fresh lock or stale but not deployed
return Ok(DeploymentStatus::BeingDeployed {
stale: is_stale,
lock_id,
Expand Down
48 changes: 45 additions & 3 deletions executors/src/external_bundler/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use twmq::{
error::TwmqError,
redis::{AsyncCommands, Pipeline, aio::ConnectionManager},
redis::{AsyncCommands, Pipeline, SetExpiry, SetOptions, aio::ConnectionManager},
};
use uuid::Uuid;

const CACHE_PREFIX: &str = "deployment_cache";
const LOCK_PREFIX: &str = "deployment_lock";

/// Fallback TTL so a lock that's never explicitly released (e.g. worker crash)
/// can't block the account forever.
const LOCK_TTL_SECONDS: u64 = 300;

#[derive(Clone)]
pub struct RedisDeploymentCache {
connection_manager: twmq::redis::aio::ConnectionManager,
Expand Down Expand Up @@ -165,9 +169,13 @@ impl DeploymentLock for RedisDeploymentLock {
message: format!("Serialization failed: {e}"),
})?;

// Use SET NX EX for atomic acquire
// SET NX EX: atomic acquire with a fallback expiry.
let opts = SetOptions::default()
.conditional_set(twmq::redis::ExistenceCheck::NX)
.with_expiration(SetExpiry::EX(LOCK_TTL_SECONDS));

let result: Option<String> =
conn.set_nx(&key, &lock_data_str)
conn.set_options(&key, &lock_data_str, opts)
.await
.map_err(|e| EngineError::InternalError {
message: format!("Lock acquire failed: {e}"),
Expand Down Expand Up @@ -212,4 +220,38 @@ impl DeploymentLock for RedisDeploymentLock {

Ok(deleted > 0)
}

async fn release_lock_if_owner(
&self,
chain_id: u64,
account_address: &Address,
lock_id: &str,
) -> Result<bool, EngineError> {
let mut conn = self.conn().clone();
let key = self.lock_key(chain_id, account_address);

// Atomic compare-and-delete: only DEL if the stored lock's lock_id matches.
let script = twmq::redis::Script::new(
r#"
local v = redis.call('GET', KEYS[1])
if not v then return 0 end
local ok, data = pcall(cjson.decode, v)
if ok and data.lock_id == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"#,
);

let deleted: i64 = script
.key(&key)
.arg(lock_id)
.invoke_async(&mut conn)
.await
.map_err(|e| EngineError::InternalError {
message: format!("Failed to release lock for account {account_address}: {e}"),
})?;

Ok(deleted > 0)
}
}