Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions architecture/gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ The storage schema is intentionally narrow:
| `version` | Optional monotonically increasing version for scoped records. |
| `status` | Optional workflow state for records such as policy revisions or draft policy chunks. |
| `dedup_key` and `hit_count` | Optional policy-advisor fields for coalescing repeated observations. |
| `resource_version` | Monotonically increasing counter for optimistic concurrency control. Incremented atomically on each update. |
| `payload` | Prost-encoded protobuf payload for the full domain object. |
| `created_at_ms` and `updated_at_ms` | Gateway timestamps used for ordering and list output. |
| `labels` | JSON object carrying Kubernetes-style object labels for filtering and organization. |
Expand Down Expand Up @@ -113,6 +114,57 @@ default WAL journal mode), which mirror the same sensitive contents.
Persisted state includes sandboxes, providers, SSH sessions, policy revisions,
settings, inference configuration, and deployment records.

### Optimistic Concurrency (CAS)

Every object row carries a `resource_version` that the database increments
atomically on each write. Concurrent mutations use compare-and-swap (CAS): the
writer reads the current version, applies changes, and writes back with a
`WHERE resource_version = <expected>` guard. If another writer updated the row
in between, the guard fails and the caller receives a `Conflict` error.

This matters for HA deployments where multiple gateway replicas share the same
Postgres database, and for single-node deployments where concurrent gRPC
handlers or the reconciler mutate the same sandbox.

**When to use CAS** -- any mutation that merges caller-supplied fields into an
existing object:

- Provider credential and config updates (merge maps).
- Sandbox provider attach/detach (append/remove from a list).
- Policy version bumps and draft operations.
- Compute status updates (sandbox phase transitions and reconciliation).

**When CAS is not needed** -- create operations that generate a unique ID
(conflicts are caught by the primary key constraint), unconditional deletes,
and idempotent overwrites where the full payload is self-contained.

The `update_message_cas` helper makes a single CAS attempt: it fetches the
current object, applies a mutation closure, and writes with a
`MatchResourceVersion` condition. On conflict the persistence layer returns a
`Conflict` error, which gRPC handlers map to `ABORTED` status so clients can
read fresh state and retry.

The helper accepts an `expected_version` parameter that selects between two
modes:

- **Server-driven** (`expected_version = 0`): the helper uses the version it
just read from the database. Internal operations (reconciler, policy status
reports, compute phase transitions) use this mode because the caller does
not track versions.
- **Client-driven** (`expected_version != 0`): the helper validates that the
caller's version matches the current database version before applying the
mutation. If they diverge it returns `Conflict` without attempting the
write. Client-facing operations that carry an `expected_resource_version`
field use this mode: `AttachSandboxProvider`, `DetachSandboxProvider`,
`UpdateProvider`, and `UpdateConfig` (policy backfill path).

Settings updates are an exception: they use a Tokio `Mutex` instead of CAS
because settings operations require multi-step validation that is simpler under
an exclusive lock than within a CAS write.

The `resource_version` is surfaced to clients through `ObjectMeta` in proto
responses. Database migrations backfill existing rows with version 1.

Policy and runtime settings are delivered together through the effective sandbox
config path. A gateway-global policy can override sandbox-scoped policy. The
sandbox supervisor polls for config revisions and hot-reloads dynamic policy
Expand Down
81 changes: 75 additions & 6 deletions crates/openshell-cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,11 @@ pub async fn sandbox_get(
println!(" {} {}", "Id:".dimmed(), id);
println!(" {} {}", "Name:".dimmed(), name);
println!(" {} {}", "Phase:".dimmed(), phase_name(sandbox.phase));
println!(
" {} {}",
"Resource version:".dimmed(),
sandbox.metadata.as_ref().map_or(0, |m| m.resource_version)
);

// Display labels if present
if let Some(metadata) = &sandbox.metadata
Expand Down Expand Up @@ -2888,14 +2893,38 @@ pub async fn sandbox_provider_attach(
tls: &TlsOptions,
) -> Result<()> {
let mut client = grpc_client(server, tls).await?;
let response = client

// Fetch current sandbox to get resource_version for CAS
let sandbox = client
.get_sandbox(GetSandboxRequest {
name: name.to_string(),
})
.await
.into_diagnostic()?
.into_inner()
.sandbox
.ok_or_else(|| miette::miette!("sandbox not found"))?;

let resource_version = sandbox.metadata.as_ref().map_or(0, |m| m.resource_version);

let response = match client
.attach_sandbox_provider(AttachSandboxProviderRequest {
sandbox_name: name.to_string(),
provider_name: provider.to_string(),
expected_resource_version: resource_version,
})
.await
.into_diagnostic()?
.into_inner();
{
Ok(response) => response.into_inner(),
Err(status) if status.code() == Code::Aborted => {
return Err(miette::miette!(
"Failed to attach provider: sandbox was modified by another operation.\n\
Please retry the command."
)
.with_source_code(status.message().to_string()));
}
Err(e) => return Err(e).into_diagnostic(),
};

if response.attached {
println!(
Expand All @@ -2917,14 +2946,38 @@ pub async fn sandbox_provider_detach(
tls: &TlsOptions,
) -> Result<()> {
let mut client = grpc_client(server, tls).await?;
let response = client

// Fetch current sandbox to get resource_version for CAS
let sandbox = client
.get_sandbox(GetSandboxRequest {
name: name.to_string(),
})
.await
.into_diagnostic()?
.into_inner()
.sandbox
.ok_or_else(|| miette::miette!("sandbox not found"))?;

let resource_version = sandbox.metadata.as_ref().map_or(0, |m| m.resource_version);

let response = match client
.detach_sandbox_provider(DetachSandboxProviderRequest {
sandbox_name: name.to_string(),
provider_name: provider.to_string(),
expected_resource_version: resource_version,
})
.await
.into_diagnostic()?
.into_inner();
{
Ok(response) => response.into_inner(),
Err(status) if status.code() == Code::Aborted => {
return Err(miette::miette!(
"Failed to detach provider: sandbox was modified by another operation.\n\
Please retry the command."
)
.with_source_code(status.message().to_string()));
}
Err(e) => return Err(e).into_diagnostic(),
};

if response.detached {
println!(
Expand Down Expand Up @@ -3259,6 +3312,7 @@ async fn auto_create_provider(
name: exact_name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.to_string(),
credentials: discovered.credentials.clone(),
Expand Down Expand Up @@ -3299,6 +3353,7 @@ async fn auto_create_provider(
name: name.clone(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.to_string(),
credentials: discovered.credentials.clone(),
Expand Down Expand Up @@ -3711,6 +3766,7 @@ pub async fn provider_create(
name: name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.clone(),
credentials: credential_map,
Expand Down Expand Up @@ -3755,6 +3811,11 @@ pub async fn provider_get(server: &str, name: &str, tls: &TlsOptions) -> Result<
println!(" {} {}", "Id:".dimmed(), provider.object_id());
println!(" {} {}", "Name:".dimmed(), provider.object_name());
println!(" {} {}", "Type:".dimmed(), provider.r#type);
println!(
" {} {}",
"Resource version:".dimmed(),
provider.metadata.as_ref().map_or(0, |m| m.resource_version)
);
println!(
" {} {}",
"Credential keys:".dimmed(),
Expand Down Expand Up @@ -4211,6 +4272,7 @@ pub async fn provider_update(
name: name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: String::new(),
credentials: credential_map,
Expand Down Expand Up @@ -4765,6 +4827,7 @@ pub async fn sandbox_policy_set_global(
delete_setting: false,
global: true,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -4963,6 +5026,7 @@ pub async fn gateway_setting_set(
delete_setting: false,
global: true,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -4997,6 +5061,7 @@ pub async fn sandbox_setting_set(
delete_setting: false,
global: false,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5031,6 +5096,7 @@ pub async fn gateway_setting_delete(
delete_setting: true,
global: true,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5065,6 +5131,7 @@ pub async fn sandbox_setting_delete(
delete_setting: true,
global: false,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down Expand Up @@ -5123,6 +5190,7 @@ pub async fn sandbox_policy_set(
delete_setting: false,
global: false,
merge_operations: vec![],
expected_resource_version: 0,
})
.await
.into_diagnostic()?;
Expand Down Expand Up @@ -5297,6 +5365,7 @@ pub async fn sandbox_policy_update(
delete_setting: false,
global: false,
merge_operations: plan.merge_operations,
expected_resource_version: 0,
})
.await
.into_diagnostic()?
Expand Down
2 changes: 2 additions & 0 deletions crates/openshell-cli/tests/ensure_providers_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl TestOpenShell {
name: name.to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
r#type: provider_type.to_string(),
credentials: HashMap::new(),
Expand Down Expand Up @@ -377,6 +378,7 @@ impl OpenShell for TestOpenShell {
name: provider_metadata.name,
created_at_ms: existing_metadata.created_at_ms,
labels: existing_metadata.labels,
resource_version: 0,
}),
r#type: existing.r#type,
credentials: merge(existing.credentials, provider.credentials),
Expand Down
27 changes: 22 additions & 5 deletions crates/openshell-cli/tests/provider_commands_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use openshell_core::proto::{
HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse,
ListSandboxProvidersRequest, ListSandboxProvidersResponse, ListSandboxesRequest,
ListSandboxesResponse, Provider, ProviderProfile, ProviderResponse, RevokeSshSessionRequest,
RevokeSshSessionResponse, SandboxResponse, SandboxStreamEvent, ServiceStatus,
RevokeSshSessionResponse, Sandbox, SandboxResponse, SandboxStreamEvent, ServiceStatus,
SupervisorMessage, UpdateProviderRequest, WatchSandboxRequest,
};
use openshell_core::{ObjectId, ObjectName};
Expand Down Expand Up @@ -111,9 +111,25 @@ impl OpenShell for TestOpenShell {

async fn get_sandbox(
&self,
_request: tonic::Request<GetSandboxRequest>,
request: tonic::Request<GetSandboxRequest>,
) -> Result<Response<SandboxResponse>, Status> {
Ok(Response::new(SandboxResponse::default()))
let name = request.into_inner().name;
// Return a minimal sandbox with metadata for CAS operations
Ok(Response::new(SandboxResponse {
sandbox: Some(Sandbox {
metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta {
id: format!("sb-{name}"),
name,
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 1,
}),
spec: None,
status: None,
phase: 0,
current_policy_version: 0,
}),
}))
}

async fn list_sandboxes(
Expand Down Expand Up @@ -183,7 +199,7 @@ impl OpenShell for TestOpenShell {
providers.push(request.provider_name.clone());
true
};
let sandbox = openshell_core::proto::Sandbox {
let sandbox = Sandbox {
metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta {
name: request.sandbox_name,
..Default::default()
Expand Down Expand Up @@ -220,7 +236,7 @@ impl OpenShell for TestOpenShell {
let before_len = providers.len();
providers.retain(|name| name != &request.provider_name);
let detached = providers.len() != before_len;
let sandbox = openshell_core::proto::Sandbox {
let sandbox = Sandbox {
metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta {
name: request.sandbox_name,
..Default::default()
Expand Down Expand Up @@ -475,6 +491,7 @@ impl OpenShell for TestOpenShell {
name: provider_metadata.name,
created_at_ms: existing_metadata.created_at_ms,
labels: existing_metadata.labels,
resource_version: 0,
}),
r#type: existing.r#type,
credentials: merge(existing.credentials, provider.credentials),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl OpenShell for TestOpenShell {
name: sandbox_name,
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
phase: SandboxPhase::Provisioning as i32,
..Sandbox::default()
Expand All @@ -140,6 +141,7 @@ impl OpenShell for TestOpenShell {
name,
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
phase: SandboxPhase::Ready as i32,
..Sandbox::default()
Expand Down Expand Up @@ -354,6 +356,7 @@ impl OpenShell for TestOpenShell {
name: sandbox_id.trim_start_matches("id-").to_string(),
created_at_ms: 0,
labels: HashMap::new(),
resource_version: 0,
}),
phase: SandboxPhase::Provisioning as i32,
..Sandbox::default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl OpenShell for TestOpenShell {
name,
created_at_ms: 0,
labels: std::collections::HashMap::new(),
resource_version: 0,
}),
..Default::default()
}),
Expand Down
2 changes: 1 addition & 1 deletion crates/openshell-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub mod settings;

pub use config::{ComputeDriverKind, Config, OidcConfig, TlsConfig};
pub use error::{ComputeDriverError, Error, Result};
pub use metadata::{ObjectId, ObjectLabels, ObjectName};
pub use metadata::{GetResourceVersion, ObjectId, ObjectLabels, ObjectName, SetResourceVersion};

/// Build version string derived from git metadata.
///
Expand Down
Loading
Loading