diff --git a/TODO.md b/TODO.md index 10f42aa..ab63bb6 100644 --- a/TODO.md +++ b/TODO.md @@ -3,13 +3,13 @@ ## Marketplace Integration: Agent Registration & Local Deploy ### Agent Self-Registration (for curl one-liner and manual install entry points) -- [ ] **`POST /api/v1/register`** (local endpoint on Status Panel) — Triggered after install.sh completes +- [x] **`POST /api/v1/register`** (local endpoint on Status Panel) — Triggered after install.sh completes - Accept `{ purchase_token, stack_id }` from install script - Collect server fingerprint (hostname, IP, OS, CPU, RAM, disk) - Call Stacker Server: `POST /api/v1/agents/register { purchase_token, server_fingerprint, stack_id }` - Store returned `agent_id`, `deployment_hash`, `dashboard_url` locally - Begin heartbeat loop to Stacker Server -- [ ] **Local `stacker deploy` trigger** — After registration, Status Panel invokes Stacker CLI locally +- [x] **Local `stacker deploy` trigger** — After registration, Status Panel invokes Stacker CLI locally - `stacker deploy --from /opt/stacker/stacks/{stack_id}/` (the downloaded archive) - Monitor deploy progress, report status back to Stacker Server via existing agent report endpoint - No Install Service involved — fully local execution @@ -17,31 +17,63 @@ ### Dashboard Linking (optional, user-initiated) - [x] Provide web UI page at `http://localhost:{STATUS_PORT}/link` to connect Status Panel to TryDirect dashboard - [x] Support unlinking from dashboard (agent continues to work standalone) -- [ ] **Login-based linking flow (Entry Point C):** +- [x] **Login-based linking flow (Entry Point C):** - User logs in with TryDirect email + password from Status Panel UI - Status Panel calls Stacker: `POST /api/v1/auth/login { email, password }` → returns `session_token` + user's deployments - User selects a deployment from the list → Status Panel calls Stacker: `POST /api/v1/agents/link { session_token, deployment_id, server_fingerprint }` - Stacker validates session, checks user owns the deployment, issues `agent_id` + `agent_token` - No purchase_token needed — user's identity is the trust anchor - `purchase_token` flow retained only for headless Entry Point B (curl one-liner) -- [ ] Add "Use Standalone" option for users without TryDirect account (skip linking entirely) +- [x] Add "Use Standalone" option for users without TryDirect account (skip linking entirely) ### Standalone Status Panel Entry Point (Phase 2) -- [ ] **"Deploy a Stack" page** in Status Panel web UI +- [x] **"Deploy a Stack" page** in Status Panel web UI - Browse available stacks from marketplace API: `GET /api/v1/marketplace/stacks` - User selects stack → Status Panel downloads archive + calls `stacker deploy` locally - This enables Entry Point C: user installs Status Panel first, then deploys stacks from its UI ### Notifications Relay -- [ ] Forward marketplace notifications (stack published, update available) from Stacker Server to Status Panel UI -- [ ] Show "Update Available" badge when a newer version of the deployed stack exists +- [x] Forward marketplace notifications (stack published, update available) from Stacker Server to Status Panel UI +- [x] Show "Update Available" badge when a newer version of the deployed stack exists --- -- Align build and runtime images so the compiled `status` binary links against the same glibc version (or older) as production. -- Add a musl-based build target and image variant to provide a statically linked binary that avoids glibc drift. +- ~~Align build and runtime images so the compiled `status` binary links against the same glibc version (or older) as production.~~ ✅ Done — Dockerfiles use `clux/muslrust:stable` → `gcr.io/distroless/cc`, musl avoids glibc drift. +- ~~Add a musl-based build target and image variant to provide a statically linked binary that avoids glibc drift.~~ ✅ Done — CI builds `x86_64-unknown-linux-musl` target, releases musl binary. - Update CI to build/test using the production base image to prevent future GLIBC_x.y.z mismatches. - Add a simple container start-up check that surfaces linker/runtime errors early in the pipeline. +## Missing Features Implementation Plan (2026-04) + +### Phase 1 - Reliability and Production Readiness +- [x] **[status-auth-refresh]** Refresh agent auth immediately on 401/403 and retry polling/report calls with backoff. + - Wire the retry path into the polling loop instead of waiting for the periodic refresh task. + - Define the Vault path/role contract for `status_panel_token` and document failure handling. +- [x] **[status-alerting]** Add outbound alert delivery for unhealthy containers, command failures, and host-level incidents. + - Webhook delivery with env-configured thresholds (`ALERT_WEBHOOK_URL`, CPU/memory/disk thresholds). + - Includes alert deduplication, severity escalation, and recovery notifications. +- [x] **[status-command-provenance]** Surface which control plane executed each action (`status_panel` vs `compose_agent`). + - Expose provenance in command reports, health metrics, and `/capabilities`-driven diagnostics. + - Publish and implement the separate token/cache schema for `compose_agent_token`. +- [ ] **[status-ssl-renewal]** Automate SSL certificate renewal for hosts that enable HTTPS. + - Add renewal scheduling, renewal result logging, and certificate reload without manual intervention. + +### Phase 2 - Data Safety and Day-2 Operations +- [ ] **[status-volume-backups]** Add scheduled backup and restore support for Docker volumes. + - Support policy-driven backups for stateful services, retention, restore validation, and signed metadata. + - Reuse existing backup/security primitives where possible instead of introducing a separate backup path. + +### Phase 3 - Standalone and Dashboard UX +- [x] **[status-login-linking]** Complete the login-based dashboard linking flow and standalone mode. + - Finish the UI + daemon wiring for email/password linking to an owned deployment. + - Add "Use Standalone" so the panel is usable without a TryDirect account. +- [x] **[status-deploy-stack-ui]** Build the local "Deploy a Stack" flow in Status Panel. + - Browse marketplace stacks, download the selected archive, and trigger local `stacker deploy`. + - Show deployment progress, update availability, and compatibility checks in the local UI. + +### Cross-Project Coordination +- [ ] Coordinate `status-deploy-stack-ui` with Stacker marketplace archive/download validation. +- [ ] Coordinate `status-command-provenance` and future pipe execution with the Stacker control-plane roadmap. + ## Status Panel Agent Commands (Pull Model) **Key principle**: Agent polls Stacker; Stacker never pushes to the agent. Agent is responsible for adding HMAC headers on its outbound calls. @@ -51,14 +83,14 @@ - [x] Restart: restart container by app_code, then emit updated state in report payload; include errors array on failure. - [x] Reporting: call Stacker `POST /api/v1/agent/commands/report` with HMAC headers (`X-Agent-Id`, `X-Timestamp`, `X-Request-Id`, `X-Agent-Signature`) signed using Vault token. - [x] Wire agent to poll loop: `GET /api/v1/agent/commands/wait/{deployment_hash}` with HMAC headers. -- [ ] On 401/403, refresh token from Vault and retry with backoff (which Vault path/role should we use for the agent token?). +- [x] On 401/403, refresh token from Vault and retry with backoff (TokenProvider with Vault → env fallback, 10s cooldown). - [x] Ensure agent generates HMAC signature for every outbound request (wait + report + app status); no secrets expected from Stacker side. ## Compose Agent Sidecar - [x] Ship a separate `compose-agent` container (Docker Compose + MCP Gateway) deployed alongside the Status Panel container; Service file should ensure it mounts the Docker socket while Status Panel does not. - [x] Implement watchdog to restart only the compose container on failure/glibc mismatch without touching the Status Panel daemon; prove via integration test. -- [ ] Expose health metrics indicating which control plane executed each command (`status_panel` vs `compose_agent`) so ops can track rollout and fallbacks. -- [ ] Publish Vault secret schema: `secret/agent/{hash}/status_panel_token` and `secret/agent/{hash}/compose_agent_token`; refresh + cache them independently. +- [x] Expose health metrics indicating which control plane executed each command (`status_panel` vs `compose_agent`) so ops can track rollout and fallbacks. +- [x] Publish Vault secret schema: `secret/agent/{hash}/status_panel_token` and `secret/agent/{hash}/compose_agent_token`; refresh + cache them independently. - [x] Add config flag to disable compose agent (legacy mode) and emit warning log so Blog receives `compose_agent=false` via `/capabilities`. ## Kata Containers Support (Stacker Server) diff --git a/docs/AGENT_ROTATION_GUIDE.md b/docs/AGENT_ROTATION_GUIDE.md index 95b5373..4da38dc 100644 --- a/docs/AGENT_ROTATION_GUIDE.md +++ b/docs/AGENT_ROTATION_GUIDE.md @@ -143,3 +143,131 @@ spawn(refresh_loop(vault.clone(), deployment_hash.clone(), cache.clone())); - Action: check request headers, clock skew, and signature; ensure using current token - Symptoms: Vault errors - Action: verify `VAULT_ADDRESS`, `VAULT_TOKEN`, network connectivity, and KV path prefix + +--- + +## Auth Refresh on 401/403 — Implementation Details + +### Problem + +When the agent token expires or is rotated server-side, all outbound requests +(polling, reporting, notifications) receive 401/403 from Stacker. Previously +these were treated as generic errors with fixed backoff, causing prolonged +downtime until manual restart. + +### Solution: `TokenProvider` + Retry Helpers + +Two new modules handle automatic recovery: + +| Module | Path | Purpose | +|--------|------|---------| +| `TokenProvider` | `src/security/token_provider.rs` | Shared mutable token with on-demand refresh | +| `RetryClient` | `src/transport/retry.rs` | HTTP helpers that detect 401/403 and retry | + +### Request Flow + +``` +Daemon / Notification Poller + │ + ▼ +┌───────────────────┐ +│ TokenProvider │ .get() → current token +│ .get() │ +└────────┬──────────┘ + ▼ +┌───────────────────┐ +│ Build signed │ build_signed_headers(agent_id, token, body) +│ HMAC headers │ → Bearer + X-Agent-Signature + X-Timestamp +└────────┬──────────┘ + ▼ +┌───────────────────┐ +│ Send HTTP │ signed_get_with_retry / signed_post_with_retry +│ request │ +└────────┬──────────┘ + ▼ +┌────── Status code? ──────┐ +│ │ │ +200/204 401/403 5xx / network error +│ │ │ +✅ Done ▼ ▼ + ┌──────────────┐ Exponential backoff + │ TokenProvider │ 2s → 4s → 8s → … 60s cap + │ .refresh() │ retry up to 3× + └──────┬───────┘ + │ + ├─ 1. Try Vault: + │ vault_client.fetch_agent_token(deployment_hash) + │ + ├─ 2. If Vault fails or returns same token: + │ re-read AGENT_TOKEN from environment + │ + ├─ 3. Cooldown: 10s between refresh attempts + │ (prevents hammering Vault on repeated failures) + │ + ▼ + Retry request once with new token + │ + ┌────┴────┐ + 200 401 again + │ │ + ✅ Done Propagate error + (token truly invalid) +``` + +### TokenProvider API + +```rust +use crate::security::token_provider::TokenProvider; + +// Create (both daemon and serve mode) +let tp = TokenProvider::new(initial_token, Some(vault_client), deployment_hash); +// or +let tp = TokenProvider::from_env(Some(vault_client)); + +tp.get().await // → current token (Arc>) +tp.refresh().await // → Ok(true) if token changed, Ok(false) if unchanged +tp.swap(new).await // → direct swap (used by background rotation task) +``` + +### Wired Consumers + +| Consumer | File | Mechanism | +|----------|------|-----------| +| Daemon polling (`wait_for_command`) | `src/agent/daemon.rs` | `wait_for_command_with_retry` (auth-only retry) | +| Daemon reporting (`report_result`) | `src/agent/daemon.rs` | `report_result_with_retry` (full retry) | +| Daemon app status | `src/agent/daemon.rs` | `update_app_status_with_retry` (full retry) | +| Notification poller | `src/comms/notifications.rs` | Explicit 401/403 check → `refresh()` → 5s backoff | + +### RetryConfig Presets + +```rust +use crate::transport::retry::RetryConfig; + +RetryConfig::default() // 1 auth retry + 3 server retries (2–60s backoff) +RetryConfig::auth_only() // 1 auth retry + 0 server retries (for long-poll) +``` + +### Refresh Strategy + +1. **Vault first** — If `VaultClient` is configured, call + `fetch_agent_token(deployment_hash)`. If it returns a different token, + swap it in and retry. +2. **Environment fallback** — If Vault is unavailable or returns the same + token, re-read `AGENT_TOKEN` from the process environment. This covers + cases where an orchestrator (Docker, systemd) injects a new token via + env without restarting the process. +3. **Cooldown** — A 10-second minimum gap between refresh attempts prevents + hammering Vault during cascading failures. +4. **Single retry** — After refreshing, the request is retried exactly once. + If it still gets 401/403, the error propagates (the token is truly invalid + and requires operator intervention). + +### Environment Variables + +| Variable | Default | Purpose | +|----------|---------|---------| +| `AGENT_TOKEN` | _(empty)_ | Bearer token for Stacker API auth | +| `DEPLOYMENT_HASH` | `"default"` | Vault path isolation key | +| `VAULT_ADDRESS` | _(none)_ | Vault server URL (enables Vault refresh) | +| `VAULT_TOKEN` | _(none)_ | Vault auth token | +| `VAULT_AGENT_PATH_PREFIX` | `"status_panel"` | Vault KV path prefix | diff --git a/src/agent/daemon.rs b/src/agent/daemon.rs index c492906..a3a60d0 100644 --- a/src/agent/daemon.rs +++ b/src/agent/daemon.rs @@ -13,7 +13,11 @@ use crate::commands::executor::CommandExecutor; use crate::commands::firewall::FirewallPolicy; use crate::commands::validator::CommandValidator; use crate::commands::TimeoutStrategy; -use crate::monitoring::{spawn_heartbeat, MetricsCollector, MetricsSnapshot, MetricsStore}; +use crate::monitoring::{ + spawn_heartbeat, ControlPlane, MetricsCollector, MetricsSnapshot, MetricsStore, +}; +use crate::security::token_provider::TokenProvider; +use crate::security::vault_client::VaultClient; use crate::transport::{http_polling, CommandResult}; use serde_json::{json, Value}; @@ -28,12 +32,14 @@ pub async fn run(config_path: String) -> Result<()> { .or(Some(cfg.compose_agent_enabled)) .unwrap_or(false); - let control_plane = std::env::var("CONTROL_PLANE") - .ok() - .or(cfg.control_plane.clone()) - .unwrap_or_else(|| "status_panel".to_string()); + let control_plane = ControlPlane::from_value( + std::env::var("CONTROL_PLANE") + .ok() + .as_deref() + .or(cfg.control_plane.as_deref()), + ); - if !compose_agent_enabled && control_plane == "status_panel" { + if !compose_agent_enabled && control_plane == ControlPlane::StatusPanel { warn!("compose_agent=false - running in legacy mode (Status Panel handles all operations)"); } else if compose_agent_enabled { info!("compose_agent=true - compose-agent sidecar handling Docker operations"); @@ -51,7 +57,25 @@ pub async fn run(config_path: String) -> Result<()> { .map(Duration::from_secs) .unwrap_or(Duration::from_secs(10)); - let heartbeat_handle = spawn_heartbeat(collector, store, metrics_interval, tx, webhook.clone()); + let alert_manager = { + let cfg = crate::monitoring::alerting::AlertConfig::from_env(); + let mgr = crate::monitoring::alerting::AlertManager::new(cfg); + if mgr.is_enabled() { + info!("outbound alerting enabled"); + Some(std::sync::Arc::new(mgr)) + } else { + None + } + }; + + let heartbeat_handle = spawn_heartbeat( + collector, + store, + metrics_interval, + tx, + webhook.clone(), + alert_manager, + ); info!( interval_secs = metrics_interval.as_secs(), webhook = webhook.as_deref().unwrap_or("none"), @@ -81,6 +105,10 @@ pub async fn run(config_path: String) -> Result<()> { warn!("AGENT_TOKEN is not set; authenticated dashboard requests will fail"); } + // Build a shared token provider (Vault → env fallback on 401/403) + let vault_client = VaultClient::from_env().ok().flatten(); + let token_provider = TokenProvider::new(agent_token, vault_client, deployment_hash.clone()); + info!( dashboard_url = %dashboard_url, agent_id = %agent_id, @@ -98,11 +126,12 @@ pub async fn run(config_path: String) -> Result<()> { dashboard_url, deployment_hash, agent_id, - agent_token, + token_provider, polling_timeout, polling_backoff, command_timeout, firewall_policy, + control_plane, }; // Spawn the long-polling loop @@ -126,11 +155,12 @@ struct PollingContext { dashboard_url: String, deployment_hash: String, agent_id: String, - agent_token: String, + token_provider: TokenProvider, polling_timeout: u64, polling_backoff: u64, command_timeout: u64, firewall_policy: FirewallPolicy, + control_plane: ControlPlane, } /// Long-polling loop: continuously waits for commands and executes them @@ -138,11 +168,11 @@ async fn polling_loop(ctx: PollingContext) { let executor = CommandExecutor::new(); loop { - match http_polling::wait_for_command( + match http_polling::wait_for_command_with_retry( &ctx.dashboard_url, &ctx.deployment_hash, &ctx.agent_id, - &ctx.agent_token, + &ctx.token_provider, ctx.polling_timeout, None, ) @@ -211,6 +241,7 @@ async fn execute_and_report( result: None, error: Some(e.to_string()), completed_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + executed_by: Some(ctx.control_plane.to_string()), ..CommandResult::default() } } @@ -232,6 +263,7 @@ async fn execute_and_report( result: None, error: Some(format!("Command validation failed: {}", e)), completed_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + executed_by: Some(ctx.control_plane.to_string()), ..CommandResult::default() } } else { @@ -254,6 +286,7 @@ async fn execute_and_report( })), error: None, completed_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + executed_by: Some(ctx.control_plane.to_string()), ..CommandResult::default() }, Err(e) => CommandResult { @@ -262,6 +295,7 @@ async fn execute_and_report( result: None, error: Some(e.to_string()), completed_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + executed_by: Some(ctx.control_plane.to_string()), ..CommandResult::default() }, } @@ -276,6 +310,7 @@ async fn execute_and_report( result: None, error: Some(format!("Invalid command parameters: {}", e)), completed_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + executed_by: Some(ctx.control_plane.to_string()), ..CommandResult::default() } } @@ -287,16 +322,17 @@ async fn execute_and_report( status = %cmd_result.status, "reporting command result to stacker" ); - http_polling::report_result( + http_polling::report_result_with_retry( &ctx.dashboard_url, &ctx.agent_id, - &ctx.agent_token, + &ctx.token_provider, &cmd_result.command_id, &ctx.deployment_hash, &cmd_result.status, &cmd_result.result, &cmd_result.error, &cmd_result.completed_at, + cmd_result.executed_by.as_deref(), ) .await?; info!( @@ -305,10 +341,10 @@ async fn execute_and_report( ); if let Some(app_status) = build_app_status_update(&cmd_result) { - if let Err(e) = http_polling::update_app_status( + if let Err(e) = http_polling::update_app_status_with_retry( &ctx.dashboard_url, &ctx.agent_id, - &ctx.agent_token, + &ctx.token_provider, &app_status, ) .await diff --git a/src/commands/stacker.rs b/src/commands/stacker.rs index ecc5bde..5aa3335 100644 --- a/src/commands/stacker.rs +++ b/src/commands/stacker.rs @@ -7,7 +7,6 @@ use serde::Deserialize; use serde::Serialize; #[cfg(any(feature = "docker", test))] use serde_json::json; -#[cfg(any(feature = "docker", test))] use serde_json::Value; #[cfg(feature = "docker")] use std::collections::{HashMap, HashSet}; @@ -37,6 +36,168 @@ pub enum ContainerRuntime { Kata, } +#[cfg(all(test, feature = "docker"))] +mod trigger_pipe_handler_tests { + use super::*; + use mockito::{Matcher, Server}; + + fn make_trigger_agent_command() -> AgentCommand { + AgentCommand { + id: "cmd-trigger".into(), + command_id: "cmd-trigger".into(), + name: "trigger_pipe".into(), + params: json!({}), + deployment_hash: Some("dep-123".into()), + app_code: None, + } + } + + #[tokio::test] + async fn handle_trigger_pipe_posts_mapped_payload_to_external_target() { + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/webhook/pipe") + .match_body(Matcher::Exact(r#"{"email":"dev@try.direct"}"#.into())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"{"accepted":true}"#) + .create_async() + .await; + + let agent_cmd = make_trigger_agent_command(); + let data = TriggerPipeCommand { + deployment_hash: "dep-123".into(), + pipe_instance_id: "11111111-1111-1111-1111-111111111111".into(), + input_data: Some(json!({ "user": { "email": "dev@try.direct" } })), + source_container: None, + source_endpoint: "/".into(), + source_method: "GET".into(), + target_url: Some(server.url()), + target_container: None, + target_endpoint: "/webhook/pipe".into(), + target_method: "POST".into(), + field_mapping: Some(json!({ "email": "$.user.email" })), + trigger_type: "manual".into(), + }; + + let result = handle_trigger_pipe(&agent_cmd, &data) + .await + .expect("trigger_pipe should execute"); + + mock.assert_async().await; + assert_eq!(result.status, "completed"); + assert!(result.error.is_none()); + + let body = result.result.expect("trigger_pipe result body"); + assert_eq!(body["success"], true); + assert_eq!(body["mapped_data"], json!({ "email": "dev@try.direct" })); + assert_eq!(body["target_response"]["status"], 200); + assert_eq!(body["target_response"]["body"], json!({ "accepted": true })); + } + + #[tokio::test] + async fn handle_trigger_pipe_requires_external_target_url() { + let agent_cmd = make_trigger_agent_command(); + let data = TriggerPipeCommand { + deployment_hash: "dep-123".into(), + pipe_instance_id: "11111111-1111-1111-1111-111111111111".into(), + input_data: Some(json!({ "user": { "email": "dev@try.direct" } })), + source_container: None, + source_endpoint: "/".into(), + source_method: "GET".into(), + target_url: None, + target_container: None, + target_endpoint: "/webhook/pipe".into(), + target_method: "POST".into(), + field_mapping: Some(json!({ "email": "$.user.email" })), + trigger_type: "manual".into(), + }; + + let result = handle_trigger_pipe(&agent_cmd, &data) + .await + .expect("trigger_pipe should return structured failure"); + + assert_eq!(result.status, "failed"); + assert_eq!( + result.error.as_deref(), + Some("trigger_pipe requires target_url or target_container") + ); + } + + #[test] + fn build_trigger_pipe_container_command_posts_json_payload() { + let command = build_trigger_pipe_container_command( + "/webhook/pipe", + "POST", + &json!({ "email": "dev@try.direct", "name": "O'Reilly" }), + ); + + assert!(command.contains("curl -sS -X POST")); + assert!(command.contains("http://127.0.0.1/webhook/pipe")); + assert!(command.contains("\"email\":\"dev@try.direct\"")); + assert!(!command.contains("\"name\":\"O'Reilly\"")); + assert!(command.contains("Reilly")); + assert!(command.contains("%{http_code}")); + } + + #[test] + fn build_trigger_pipe_source_command_fetches_json_payload() { + let command = build_trigger_pipe_source_command("/source/data", "get"); + + assert!(command.contains("curl -sS -X GET")); + assert!(command.contains("http://127.0.0.1/source/data")); + assert!(command.contains("%{http_code}")); + } + + #[test] + fn build_trigger_pipe_container_command_normalizes_invalid_method() { + let command = + build_trigger_pipe_container_command("/webhook/pipe", "POST; rm -rf /", &json!({})); + + assert!(command.contains("curl -sS -X POST ")); + assert!(!command.contains("rm -rf")); + } + + #[test] + fn normalize_trigger_pipe_method_falls_back_to_default() { + assert_eq!(normalize_trigger_pipe_method(" patch ", "POST"), "PATCH"); + assert_eq!( + normalize_trigger_pipe_method("POST;echo nope", "POST"), + "POST" + ); + assert_eq!(normalize_trigger_pipe_method("", "GET"), "GET"); + } + + #[tokio::test] + async fn handle_trigger_pipe_requires_input_or_source_details() { + let agent_cmd = make_trigger_agent_command(); + let data = TriggerPipeCommand { + deployment_hash: "dep-123".into(), + pipe_instance_id: "11111111-1111-1111-1111-111111111111".into(), + input_data: None, + source_container: None, + source_endpoint: "/source/data".into(), + source_method: "GET".into(), + target_url: None, + target_container: Some("target-app".into()), + target_endpoint: "/webhook/pipe".into(), + target_method: "POST".into(), + field_mapping: Some(json!({ "email": "$.user.email" })), + trigger_type: "manual".into(), + }; + + let result = handle_trigger_pipe(&agent_cmd, &data) + .await + .expect("trigger_pipe should return structured failure"); + + assert_eq!(result.status, "failed"); + assert_eq!( + result.error.as_deref(), + Some("trigger_pipe requires input_data or source_container") + ); + } +} + impl std::fmt::Display for ContainerRuntime { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -138,6 +299,7 @@ pub enum StackerCommand { ListContainers(ListContainersCommand), ConfigureFirewall(ConfigureFirewallCommand), ProbeEndpoints(ProbeEndpointsCommand), + TriggerPipe(TriggerPipeCommand), } #[cfg_attr(not(feature = "docker"), allow(dead_code))] @@ -224,6 +386,62 @@ pub struct ErrorSummaryCommand { redact: bool, } +#[cfg_attr(not(feature = "docker"), allow(dead_code))] +#[derive(Debug, Clone, Deserialize)] +pub struct TriggerPipeCommand { + #[serde(default)] + deployment_hash: String, + pipe_instance_id: String, + #[serde(default)] + input_data: Option, + #[serde(default)] + source_container: Option, + #[serde(default = "default_pipe_source_endpoint")] + source_endpoint: String, + #[serde(default = "default_pipe_source_method")] + source_method: String, + #[serde(default)] + target_url: Option, + #[serde(default)] + target_container: Option, + #[serde(default = "default_pipe_target_endpoint")] + target_endpoint: String, + #[serde(default = "default_pipe_target_method")] + target_method: String, + #[serde(default)] + field_mapping: Option, + #[serde(default = "default_pipe_trigger_type")] + trigger_type: String, +} + +fn default_pipe_source_endpoint() -> String { + "/".to_string() +} + +fn default_pipe_source_method() -> String { + "GET".to_string() +} + +fn default_pipe_target_endpoint() -> String { + "/".to_string() +} + +fn default_pipe_target_method() -> String { + "POST".to_string() +} + +fn normalize_trigger_pipe_method(method: &str, default_method: &str) -> String { + let normalized = trimmed(method).to_ascii_uppercase(); + match normalized.as_str() { + "GET" | "POST" | "PUT" | "PATCH" | "DELETE" | "HEAD" | "OPTIONS" => normalized, + _ => default_method.to_string(), + } +} + +fn default_pipe_trigger_type() -> String { + "manual".to_string() +} + /// Command to fetch app configuration from Vault #[cfg_attr(not(feature = "docker"), allow(dead_code))] #[derive(Debug, Clone, Deserialize)] @@ -483,6 +701,9 @@ pub struct ProbeEndpointsCommand { /// Timeout per probe request in seconds #[serde(default = "default_probe_timeout")] probe_timeout: u32, + /// Whether to capture sample responses from discovered endpoints + #[serde(default)] + capture_samples: bool, } fn default_probe_protocols() -> Vec { @@ -645,6 +866,13 @@ pub fn parse_stacker_command(cmd: &AgentCommand) -> Result { + let payload: TriggerPipeCommand = serde_json::from_value(unwrap_params(&cmd.params)) + .context("invalid trigger_pipe payload")?; + let payload = payload.normalize().with_command_context(cmd); + payload.validate()?; + Ok(Some(StackerCommand::TriggerPipe(payload))) + } _ => Ok(None), } } @@ -952,6 +1180,52 @@ impl ErrorSummaryCommand { } } +impl TriggerPipeCommand { + fn normalize(mut self) -> Self { + self.deployment_hash = trimmed(&self.deployment_hash); + self.pipe_instance_id = trimmed(&self.pipe_instance_id); + self.source_container = self.source_container.map(|value| trimmed(&value)); + self.source_endpoint = trimmed(&self.source_endpoint); + if self.source_endpoint.is_empty() { + self.source_endpoint = default_pipe_source_endpoint(); + } + self.source_method = + normalize_trigger_pipe_method(&self.source_method, &default_pipe_source_method()); + self.target_url = self.target_url.map(|value| trimmed(&value)); + self.target_container = self.target_container.map(|value| trimmed(&value)); + self.target_endpoint = trimmed(&self.target_endpoint); + if self.target_endpoint.is_empty() { + self.target_endpoint = "/".to_string(); + } + self.target_method = + normalize_trigger_pipe_method(&self.target_method, &default_pipe_target_method()); + self.trigger_type = trimmed(&self.trigger_type).to_lowercase(); + if self.trigger_type.is_empty() { + self.trigger_type = default_pipe_trigger_type(); + } + self + } + + fn with_command_context(mut self, agent_cmd: &AgentCommand) -> Self { + if self.deployment_hash.is_empty() { + if let Some(hash) = &agent_cmd.deployment_hash { + self.deployment_hash = hash.clone(); + } + } + self + } + + fn validate(&self) -> Result<()> { + if self.deployment_hash.is_empty() { + bail!("deployment_hash is required"); + } + if self.pipe_instance_id.is_empty() { + bail!("pipe_instance_id is required"); + } + Ok(()) + } +} + impl FetchConfigCommand { fn normalize(mut self) -> Self { self.deployment_hash = trimmed(&self.deployment_hash); @@ -1541,12 +1815,403 @@ async fn execute_with_docker( StackerCommand::ServerResources(data) => handle_server_resources(agent_cmd, data).await, StackerCommand::ListContainers(data) => handle_list_containers(agent_cmd, data).await, StackerCommand::ProbeEndpoints(data) => handle_probe_endpoints(agent_cmd, data).await, + StackerCommand::TriggerPipe(data) => handle_trigger_pipe(agent_cmd, data).await, StackerCommand::ConfigureFirewall(data) => { firewall::handle_configure_firewall(agent_cmd, data, firewall_policy).await } } } +#[cfg(feature = "docker")] +fn extract_json_path_value(source: &Value, path: &str) -> Value { + let trimmed = path.trim(); + if !trimmed.starts_with("$.") { + return Value::Null; + } + + let mut current = source; + for segment in trimmed.trim_start_matches("$.").split('.') { + if segment.is_empty() { + continue; + } + match current { + Value::Object(map) => match map.get(segment) { + Some(value) => current = value, + None => return Value::Null, + }, + _ => return Value::Null, + } + } + + current.clone() +} + +#[cfg(feature = "docker")] +fn apply_pipe_field_mapping(source: &Value, field_mapping: Option<&Value>) -> Value { + let Some(Value::Object(mapping)) = field_mapping else { + return source.clone(); + }; + + if mapping.is_empty() { + return source.clone(); + } + + let mut mapped = serde_json::Map::new(); + for (key, rule) in mapping { + let value = match rule { + Value::String(path) if path.starts_with("$.") => extract_json_path_value(source, path), + other => other.clone(), + }; + mapped.insert(key.clone(), value); + } + Value::Object(mapped) +} + +#[cfg(feature = "docker")] +fn build_pipe_target_url(base: &str, endpoint: &str) -> String { + let trimmed_base = base.trim_end_matches('/'); + let trimmed_endpoint = endpoint.trim(); + if trimmed_endpoint.is_empty() || trimmed_endpoint == "/" { + return format!("{}/", trimmed_base); + } + format!( + "{}/{}", + trimmed_base, + trimmed_endpoint.trim_start_matches('/') + ) +} + +#[cfg(feature = "docker")] +fn shell_escape_single_quotes(value: &str) -> String { + value.replace('\'', r#"'\"'\"'"#) +} + +#[cfg(feature = "docker")] +fn build_trigger_pipe_container_command(endpoint: &str, method: &str, payload: &Value) -> String { + let json_payload = serde_json::to_string(payload).unwrap_or_else(|_| "{}".to_string()); + let escaped_payload = shell_escape_single_quotes(&json_payload); + let normalized_method = normalize_trigger_pipe_method(method, "POST"); + let url = build_pipe_target_url("http://127.0.0.1", endpoint); + let escaped_url = shell_escape_single_quotes(&url); + format!( + "curl -sS -X {} -H 'Content-Type: application/json' --data-raw '{}' -w '\\n%{{http_code}}' '{}'", + normalized_method, escaped_payload, escaped_url + ) +} + +#[cfg(feature = "docker")] +fn build_trigger_pipe_source_command(endpoint: &str, method: &str) -> String { + let normalized_method = normalize_trigger_pipe_method(method, "GET"); + let url = build_pipe_target_url("http://127.0.0.1", endpoint); + let escaped_url = shell_escape_single_quotes(&url); + format!( + "curl -sS -X {} -w '\\n%{{http_code}}' '{}'", + normalized_method, escaped_url + ) +} + +#[cfg(feature = "docker")] +async fn send_trigger_pipe_request( + url: &str, + method: &str, + payload: &Value, +) -> Result<(u16, Value)> { + let method = reqwest::Method::from_bytes(method.as_bytes()) + .with_context(|| format!("invalid target_method '{}'", method))?; + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .context("building trigger_pipe http client")?; + + let response = client + .request(method, url) + .json(payload) + .send() + .await + .with_context(|| format!("sending trigger_pipe request to {}", url))?; + + let status = response.status().as_u16(); + let body_text = response + .text() + .await + .context("reading trigger_pipe response body")?; + let body = if body_text.trim().is_empty() { + Value::Null + } else { + serde_json::from_str(&body_text).unwrap_or(Value::String(body_text)) + }; + + Ok((status, body)) +} + +#[cfg(feature = "docker")] +async fn fetch_trigger_pipe_source_request( + container: &str, + endpoint: &str, + method: &str, +) -> Result<(u16, Value)> { + let command = build_trigger_pipe_source_command(endpoint, method); + let (exit_code, stdout, stderr) = docker::exec_in_container_with_output(container, &command) + .await + .with_context(|| { + format!( + "fetching trigger_pipe source inside container {}", + container + ) + })?; + + if exit_code != 0 { + bail!( + "source container request failed with code {}: {}", + exit_code, + stderr.trim() + ); + } + + let mut lines = stdout.lines().collect::>(); + let status_line = lines.pop().unwrap_or("000").trim(); + let status = status_line.parse::().unwrap_or(0); + let body_text = lines.join("\n"); + let body = if body_text.trim().is_empty() { + Value::Null + } else { + serde_json::from_str(&body_text).unwrap_or(Value::String(body_text)) + }; + + Ok((status, body)) +} + +#[cfg(feature = "docker")] +async fn send_trigger_pipe_container_request( + container: &str, + endpoint: &str, + method: &str, + payload: &Value, +) -> Result<(u16, Value)> { + let command = build_trigger_pipe_container_command(endpoint, method, payload); + let (exit_code, stdout, stderr) = docker::exec_in_container_with_output(container, &command) + .await + .with_context(|| { + format!( + "sending trigger_pipe request inside container {}", + container + ) + })?; + + if exit_code != 0 { + bail!( + "target container request failed with code {}: {}", + exit_code, + stderr.trim() + ); + } + + let mut lines = stdout.lines().collect::>(); + let status_line = lines.pop().unwrap_or("000").trim(); + let status = status_line.parse::().unwrap_or(0); + let body_text = lines.join("\n"); + let body = if body_text.trim().is_empty() { + Value::Null + } else { + serde_json::from_str(&body_text).unwrap_or(Value::String(body_text)) + }; + + Ok((status, body)) +} + +#[cfg(feature = "docker")] +async fn handle_trigger_pipe( + agent_cmd: &AgentCommand, + data: &TriggerPipeCommand, +) -> Result { + let mut result = base_result(agent_cmd, &data.deployment_hash, "", "trigger_pipe"); + let source_data = match data.input_data.clone() { + Some(value) => value, + None => match data + .source_container + .as_deref() + .filter(|value| !value.is_empty()) + { + Some(container) => match fetch_trigger_pipe_source_request( + container, + &data.source_endpoint, + &data.source_method, + ) + .await + { + Ok((status_code, response_body)) if (200..300).contains(&status_code) => { + response_body + } + Ok((status_code, response_body)) => { + let error = format!("source fetch failed with status {}", status_code); + result.status = "failed".into(); + result.result = Some(json!({ + "type": "trigger_pipe", + "deployment_hash": data.deployment_hash, + "pipe_instance_id": data.pipe_instance_id, + "success": false, + "source_data": response_body, + "mapped_data": Value::Null, + "target_response": Value::Null, + "error": error, + "triggered_at": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + "trigger_type": data.trigger_type, + })); + result.error = Some(error); + return Ok(result); + } + Err(err) => { + let error = format!("failed to fetch trigger_pipe source: {}", err); + result.status = "failed".into(); + result.result = Some(json!({ + "type": "trigger_pipe", + "deployment_hash": data.deployment_hash, + "pipe_instance_id": data.pipe_instance_id, + "success": false, + "source_data": Value::Null, + "mapped_data": Value::Null, + "target_response": Value::Null, + "error": error, + "triggered_at": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + "trigger_type": data.trigger_type, + })); + result.error = Some(error); + return Ok(result); + } + }, + None => { + let error = "trigger_pipe requires input_data or source_container"; + result.status = "failed".into(); + result.result = Some(json!({ + "type": "trigger_pipe", + "deployment_hash": data.deployment_hash, + "pipe_instance_id": data.pipe_instance_id, + "success": false, + "source_data": Value::Null, + "mapped_data": Value::Null, + "target_response": Value::Null, + "error": error, + "triggered_at": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + "trigger_type": data.trigger_type, + })); + result.error = Some(error.into()); + return Ok(result); + } + }, + }; + + let mapped_data = apply_pipe_field_mapping(&source_data, data.field_mapping.as_ref()); + let target = match ( + data.target_url.as_deref().filter(|value| !value.is_empty()), + data.target_container + .as_deref() + .filter(|value| !value.is_empty()), + ) { + (Some(value), _) => Ok(( + "external", + build_pipe_target_url(value, &data.target_endpoint), + )), + (None, Some(value)) => Ok(("container", value.to_string())), + (None, None) => { + let error = "trigger_pipe requires target_url or target_container"; + result.status = "failed".into(); + result.result = Some(json!({ + "type": "trigger_pipe", + "deployment_hash": data.deployment_hash, + "pipe_instance_id": data.pipe_instance_id, + "success": false, + "source_data": source_data, + "mapped_data": mapped_data, + "target_response": Value::Null, + "error": error, + "triggered_at": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + "trigger_type": data.trigger_type, + })); + result.error = Some(error.into()); + Err(()) + } + }; + if target.is_err() { + return Ok(result); + } + let (target_mode, target_value) = target.unwrap(); + + let send_result = match target_mode { + "external" => { + send_trigger_pipe_request(&target_value, &data.target_method, &mapped_data).await + } + "container" => { + send_trigger_pipe_container_request( + &target_value, + &data.target_endpoint, + &data.target_method, + &mapped_data, + ) + .await + } + _ => unreachable!(), + }; + + match send_result { + Ok((status_code, response_body)) if (200..300).contains(&status_code) => { + result.status = "completed".into(); + result.result = Some(json!({ + "type": "trigger_pipe", + "deployment_hash": data.deployment_hash, + "pipe_instance_id": data.pipe_instance_id, + "success": true, + "source_data": source_data, + "mapped_data": mapped_data, + "target_response": { + "status": status_code, + "body": response_body, + }, + "triggered_at": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + "trigger_type": data.trigger_type, + })); + } + Ok((status_code, response_body)) => { + let error = format!("target request failed with status {}", status_code); + result.status = "failed".into(); + result.result = Some(json!({ + "type": "trigger_pipe", + "deployment_hash": data.deployment_hash, + "pipe_instance_id": data.pipe_instance_id, + "success": false, + "source_data": source_data, + "mapped_data": mapped_data, + "target_response": { + "status": status_code, + "body": response_body, + }, + "error": error, + "triggered_at": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + "trigger_type": data.trigger_type, + })); + result.error = Some(error); + } + Err(err) => { + let error = err.to_string(); + result.status = "failed".into(); + result.result = Some(json!({ + "type": "trigger_pipe", + "deployment_hash": data.deployment_hash, + "pipe_instance_id": data.pipe_instance_id, + "success": false, + "source_data": source_data, + "mapped_data": mapped_data, + "target_response": Value::Null, + "error": error, + "triggered_at": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + "trigger_type": data.trigger_type, + })); + result.error = Some(error); + } + } + + Ok(result) +} + #[cfg(feature = "docker")] async fn handle_health(agent_cmd: &AgentCommand, data: &HealthCommand) -> Result { let mut result = base_result(agent_cmd, &data.deployment_hash, &data.app_code, "health"); @@ -4452,7 +5117,7 @@ async fn get_container_ports(container_name: &str) -> Result> { } #[cfg(any(feature = "docker", test))] -fn extract_openapi_operations(spec: &Value) -> Vec { +fn extract_openapi_operations(spec: &Value, capture_samples: bool) -> Vec { let mut operations = Vec::new(); if let Some(paths) = spec.get("paths").and_then(|p| p.as_object()) { @@ -4476,12 +5141,21 @@ fn extract_openapi_operations(spec: &Value) -> Vec { // Extract field names from request body schema let fields = extract_request_fields(spec, details); - operations.push(json!({ + let mut op = json!({ "path": path, "method": method_upper, "summary": summary, "fields": fields, - })); + }); + + // Extract sample response from OpenAPI spec examples + if capture_samples { + if let Some(sample) = extract_response_example(spec, details) { + op["sample_response"] = sample; + } + } + + operations.push(op); } } } @@ -4490,6 +5164,67 @@ fn extract_openapi_operations(spec: &Value) -> Vec { operations } +/// Extract a sample response from an OpenAPI operation's response schema. +/// Looks for: responses -> 200 -> content -> application/json -> example/schema/examples +#[cfg(any(feature = "docker", test))] +fn extract_response_example(spec: &Value, operation: &Value) -> Option { + let responses = operation.get("responses")?; + + // Try 200, 201, then default + let response = responses + .get("200") + .or_else(|| responses.get("201")) + .or_else(|| responses.get("default"))?; + + // OpenAPI 3.x: content -> application/json -> example or schema -> example + if let Some(content) = response.get("content") { + if let Some(json_content) = content.get("application/json") { + // Direct example on the media type + if let Some(example) = json_content.get("example") { + return Some(example.clone()); + } + // Examples (named) — take the first one + if let Some(examples) = json_content.get("examples").and_then(|e| e.as_object()) { + if let Some((_, first)) = examples.iter().next() { + if let Some(value) = first.get("value") { + return Some(value.clone()); + } + } + } + // Schema example + if let Some(schema) = json_content.get("schema") { + if let Some(example) = schema.get("example") { + return Some(example.clone()); + } + // Resolve $ref if present + if let Some(ref_path) = schema.get("$ref").and_then(|r| r.as_str()) { + if let Some(resolved) = resolve_ref(spec, ref_path) { + if let Some(example) = resolved.get("example") { + return Some(example.clone()); + } + } + } + } + } + } + + // Swagger 2.x: examples -> application/json + if let Some(examples) = response.get("examples") { + if let Some(json_example) = examples.get("application/json") { + return Some(json_example.clone()); + } + } + + // Swagger 2.x: schema -> example + if let Some(schema) = response.get("schema") { + if let Some(example) = schema.get("example") { + return Some(example.clone()); + } + } + + None +} + #[cfg(any(feature = "docker", test))] fn extract_request_fields(spec: &Value, operation: &Value) -> Vec { let mut fields = Vec::new(); @@ -4687,7 +5422,8 @@ async fn handle_probe_endpoints( if !protocols_detected.contains(&"openapi".to_string()) { protocols_detected.push("openapi".to_string()); } - let operations = extract_openapi_operations(&spec); + let operations = + extract_openapi_operations(&spec, data.capture_samples); endpoints.push(json!({ "protocol": "openapi", "base_url": format!("http://{}:{}", data.app_code, port), @@ -4754,12 +5490,48 @@ async fn handle_probe_endpoints( if !protocols_detected.contains(&"rest".to_string()) { protocols_detected.push("rest".to_string()); } - endpoints.push(json!({ + + // Capture sample response body for REST endpoints + let mut sample_response = None; + if data.capture_samples && code == "200" { + let escaped_url = shell_escape_single_quotes(&format!( + "http://localhost:{}{}", + port, path + )); + let body_cmd = format!( + "curl -sf -m {} '{}' 2>/dev/null || true", + data.probe_timeout, escaped_url + ); + if let Ok(Ok((0, body, _))) = tokio::time::timeout( + std::time::Duration::from_secs((data.probe_timeout + 2) as u64), + docker::exec_in_container_with_output(&target_name, &body_cmd), + ) + .await + { + let body = body.trim(); + if !body.is_empty() { + // Try to parse as JSON; fall back to string + sample_response = Some( + serde_json::from_str::(body) + .unwrap_or_else(|_| json!(body)), + ); + } + } + } + + let mut ep = json!({ "protocol": "rest", "base_url": format!("http://{}:{}", data.app_code, port), "spec_url": path, "operations": [], - })); + }); + + // Attach sample_response at endpoint level for REST heuristic + if let Some(sample) = sample_response { + ep["sample_response"] = sample; + } + + endpoints.push(ep); } } _ => continue, @@ -4954,6 +5726,19 @@ mod tests { }), StackerCommand::ListContainers ); + stacker_test!( + parses_trigger_pipe_command, + "trigger_pipe", + json!({ + "params": { + "pipe_instance_id": "11111111-1111-1111-1111-111111111111", + "input_data": { + "invoice_id": "inv-1" + } + } + }), + StackerCommand::TriggerPipe + ); stacker_test!( parses_stacker_list_containers_command, "stacker.list_containers", @@ -4980,6 +5765,105 @@ mod tests { assert!(parsed.is_none()); } + #[test] + fn parses_trigger_pipe_external_target_fields() { + let cmd = AgentCommand { + id: "cmd-trigger".into(), + command_id: "cmd-trigger".into(), + name: "trigger_pipe".into(), + params: json!({ + "params": { + "pipe_instance_id": "11111111-1111-1111-1111-111111111111", + "target_url": "https://example.com", + "target_endpoint": "/webhook/pipe", + "target_method": "post", + "field_mapping": { "email": "$.user.email" }, + "trigger_type": "manual", + "input_data": { "user": { "email": "dev@try.direct" } } + } + }), + deployment_hash: Some("dep-123".into()), + app_code: None, + }; + + let parsed = parse_stacker_command(&cmd).unwrap(); + match parsed { + Some(StackerCommand::TriggerPipe(data)) => { + assert_eq!(data.deployment_hash, "dep-123"); + assert_eq!(data.target_url.as_deref(), Some("https://example.com")); + assert_eq!(data.target_endpoint, "/webhook/pipe"); + assert_eq!(data.target_method, "POST"); + assert_eq!(data.trigger_type, "manual"); + assert_eq!(data.field_mapping, Some(json!({ "email": "$.user.email" }))); + } + other => panic!("Expected TriggerPipe command, got {:?}", other), + } + } + + #[test] + fn parses_trigger_pipe_internal_target_fields() { + let cmd = AgentCommand { + id: "cmd-trigger".into(), + command_id: "cmd-trigger".into(), + name: "trigger_pipe".into(), + params: json!({ + "params": { + "pipe_instance_id": "11111111-1111-1111-1111-111111111111", + "target_container": "target-app", + "target_endpoint": "/hooks/pipe", + "target_method": "post", + "field_mapping": { "email": "$.user.email" }, + "input_data": { "user": { "email": "dev@try.direct" } } + } + }), + deployment_hash: Some("dep-123".into()), + app_code: None, + }; + + let parsed = parse_stacker_command(&cmd).unwrap(); + match parsed { + Some(StackerCommand::TriggerPipe(data)) => { + assert_eq!(data.target_container.as_deref(), Some("target-app")); + assert_eq!(data.target_endpoint, "/hooks/pipe"); + assert_eq!(data.target_method, "POST"); + } + other => panic!("Expected TriggerPipe command, got {:?}", other), + } + } + + #[test] + fn parses_trigger_pipe_source_fetch_fields() { + let cmd = AgentCommand { + id: "cmd-trigger".into(), + command_id: "cmd-trigger".into(), + name: "trigger_pipe".into(), + params: json!({ + "params": { + "pipe_instance_id": "11111111-1111-1111-1111-111111111111", + "source_container": "source-app", + "source_endpoint": "/source/data", + "source_method": "get", + "target_container": "target-app", + "target_endpoint": "/hooks/pipe", + "target_method": "post" + } + }), + deployment_hash: Some("dep-123".into()), + app_code: None, + }; + + let parsed = parse_stacker_command(&cmd).unwrap(); + match parsed { + Some(StackerCommand::TriggerPipe(data)) => { + assert_eq!(data.source_container.as_deref(), Some("source-app")); + assert_eq!(data.source_endpoint, "/source/data"); + assert_eq!(data.source_method, "GET"); + assert_eq!(data.target_container.as_deref(), Some("target-app")); + } + other => panic!("Expected TriggerPipe command, got {:?}", other), + } + } + // --- ContainerRuntime tests --- #[test] @@ -5707,6 +6591,7 @@ mod probe_endpoints_command_tests { container: Some(" crm-web ".to_string()), protocols: vec![" OpenAPI ".to_string(), " REST ".to_string()], probe_timeout: 5, + capture_samples: false, }; let normalized = cmd.normalize(); assert_eq!(normalized.deployment_hash, "abc123"); @@ -5723,6 +6608,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec![], probe_timeout: 5, + capture_samples: false, }; let normalized = cmd.normalize(); assert_eq!(normalized.protocols, vec!["openapi", "rest"]); @@ -5736,6 +6622,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string(), " ".to_string(), "".to_string()], probe_timeout: 5, + capture_samples: false, }; let normalized = cmd.normalize(); assert_eq!(normalized.protocols, vec!["openapi"]); @@ -5749,6 +6636,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec![" ".to_string(), "".to_string()], probe_timeout: 5, + capture_samples: false, }; let normalized = cmd.normalize(); assert_eq!(normalized.protocols, vec!["openapi", "rest"]); @@ -5762,6 +6650,7 @@ mod probe_endpoints_command_tests { container: Some(" ".to_string()), protocols: vec!["openapi".to_string()], probe_timeout: 5, + capture_samples: false, }; let normalized = cmd.normalize(); assert!(normalized.container.is_none()); @@ -5777,6 +6666,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string()], probe_timeout: 5, + capture_samples: false, }; let agent_cmd = AgentCommand { id: "test-id".into(), @@ -5798,6 +6688,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string()], probe_timeout: 5, + capture_samples: false, }; let agent_cmd = AgentCommand { id: "test-id".into(), @@ -5819,6 +6710,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string()], probe_timeout: 5, + capture_samples: false, }; let agent_cmd = AgentCommand { id: "test-id".into(), @@ -5843,6 +6735,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string()], probe_timeout: 5, + capture_samples: false, }; let result = cmd.validate(); assert!(result.is_err()); @@ -5857,6 +6750,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string()], probe_timeout: 5, + capture_samples: false, }; let result = cmd.validate(); assert!(result.is_err()); @@ -5871,6 +6765,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string(), "invalid_proto".to_string()], probe_timeout: 5, + capture_samples: false, }; let result = cmd.validate(); assert!(result.is_err()); @@ -5894,6 +6789,7 @@ mod probe_endpoints_command_tests { "rest".to_string(), ], probe_timeout: 5, + capture_samples: false, }; assert!(cmd.validate().is_ok()); } @@ -5906,6 +6802,7 @@ mod probe_endpoints_command_tests { container: None, protocols: vec!["openapi".to_string()], probe_timeout: 5, + capture_samples: false, }; assert!(cmd.validate().is_ok()); } @@ -5940,7 +6837,7 @@ mod probe_endpoints_command_tests { } }); - let ops = extract_openapi_operations(&spec); + let ops = extract_openapi_operations(&spec, false); assert_eq!(ops.len(), 2); // Find the GET operation @@ -5976,7 +6873,7 @@ mod probe_endpoints_command_tests { } }); - let ops = extract_openapi_operations(&spec); + let ops = extract_openapi_operations(&spec, false); assert_eq!(ops.len(), 1); assert_eq!(ops[0]["method"], "GET"); } @@ -5988,7 +6885,7 @@ mod probe_endpoints_command_tests { "paths": {} }); - let ops = extract_openapi_operations(&spec); + let ops = extract_openapi_operations(&spec, false); assert!(ops.is_empty()); } @@ -5999,7 +6896,7 @@ mod probe_endpoints_command_tests { "info": { "title": "test" } }); - let ops = extract_openapi_operations(&spec); + let ops = extract_openapi_operations(&spec, false); assert!(ops.is_empty()); } @@ -6014,11 +6911,192 @@ mod probe_endpoints_command_tests { } }); - let ops = extract_openapi_operations(&spec); + let ops = extract_openapi_operations(&spec, false); assert_eq!(ops.len(), 1); assert_eq!(ops[0]["summary"], ""); } + #[test] + fn extract_openapi_operations_capture_samples_from_example() { + let spec = json!({ + "openapi": "3.0.0", + "paths": { + "/api/v1/posts": { + "get": { + "summary": "List posts", + "responses": { + "200": { + "content": { + "application/json": { + "example": [ + {"id": 1, "title": "Hello World", "author": 42} + ] + } + } + } + } + } + } + } + }); + + // Without capture_samples + let ops = extract_openapi_operations(&spec, false); + assert_eq!(ops.len(), 1); + assert!(ops[0].get("sample_response").is_none()); + + // With capture_samples + let ops = extract_openapi_operations(&spec, true); + assert_eq!(ops.len(), 1); + let sample = &ops[0]["sample_response"]; + assert!(sample.is_array()); + assert_eq!(sample[0]["id"], 1); + assert_eq!(sample[0]["title"], "Hello World"); + } + + #[test] + fn extract_openapi_operations_capture_samples_from_schema_example() { + let spec = json!({ + "openapi": "3.0.0", + "paths": { + "/api/users": { + "get": { + "summary": "Get users", + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "type": "object", + "example": {"id": 1, "name": "Alice"} + } + } + } + } + } + } + } + } + }); + + let ops = extract_openapi_operations(&spec, true); + assert_eq!(ops[0]["sample_response"]["name"], "Alice"); + } + + #[test] + fn extract_openapi_operations_capture_samples_swagger2() { + let spec = json!({ + "swagger": "2.0", + "paths": { + "/api/items": { + "get": { + "summary": "List items", + "responses": { + "200": { + "examples": { + "application/json": [ + {"id": 1, "name": "Widget"} + ] + } + } + } + } + } + } + }); + + let ops = extract_openapi_operations(&spec, true); + assert_eq!(ops[0]["sample_response"][0]["name"], "Widget"); + } + + #[test] + fn extract_openapi_operations_capture_samples_with_ref() { + let spec = json!({ + "openapi": "3.0.0", + "paths": { + "/api/posts": { + "get": { + "summary": "List posts", + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/PostList" + } + } + } + } + } + } + } + }, + "components": { + "schemas": { + "PostList": { + "type": "array", + "example": [{"id": 1, "title": "First Post"}] + } + } + } + }); + + let ops = extract_openapi_operations(&spec, true); + assert_eq!(ops[0]["sample_response"][0]["title"], "First Post"); + } + + #[test] + fn probe_endpoints_command_capture_samples_defaults_false() { + let cmd: ProbeEndpointsCommand = serde_json::from_value(json!({ + "app_code": "wordpress" + })) + .unwrap(); + assert!(!cmd.capture_samples); + } + + #[test] + fn probe_endpoints_command_capture_samples_true() { + let cmd: ProbeEndpointsCommand = serde_json::from_value(json!({ + "app_code": "wordpress", + "capture_samples": true + })) + .unwrap(); + assert!(cmd.capture_samples); + } + + #[test] + fn extract_response_example_from_direct_example() { + let spec = json!({}); + let operation = json!({ + "responses": { + "200": { + "content": { + "application/json": { + "example": {"id": 1, "name": "Test"} + } + } + } + } + }); + let sample = extract_response_example(&spec, &operation); + assert!(sample.is_some()); + assert_eq!(sample.unwrap()["name"], "Test"); + } + + #[test] + fn extract_response_example_returns_none_when_missing() { + let spec = json!({}); + let operation = json!({ + "responses": { + "200": { + "description": "Success" + } + } + }); + let sample = extract_response_example(&spec, &operation); + assert!(sample.is_none()); + } + // ==================== EXTRACT_REQUEST_FIELDS TESTS ==================== #[test] diff --git a/src/comms/local_api.rs b/src/comms/local_api.rs index 9ef3e4b..ea273f6 100644 --- a/src/comms/local_api.rs +++ b/src/comms/local_api.rs @@ -45,8 +45,10 @@ use crate::commands::{ execute_stacker_command, parse_stacker_command, CommandValidator, DockerOperation, TimeoutStrategy, }; +use crate::comms::notifications::{self, MarkReadRequest, NotificationStore, UnreadCountResponse}; use crate::monitoring::{ - spawn_heartbeat, MetricsCollector, MetricsSnapshot, MetricsStore, MetricsTx, + spawn_heartbeat, CommandExecutionMetrics, CommandMetricsStore, ControlPlane, MetricsCollector, + MetricsSnapshot, MetricsStore, MetricsTx, }; use crate::security::audit_log::AuditLogger; use crate::security::auth::{Credentials, SessionStore, SessionUser}; @@ -108,6 +110,7 @@ pub struct AppState { pub with_ui: bool, pub metrics_collector: Arc, pub metrics_store: MetricsStore, + pub command_metrics: CommandMetricsStore, pub metrics_tx: MetricsTx, pub metrics_webhook: Option, pub backup_path: Option, @@ -123,6 +126,7 @@ pub struct AppState { pub update_jobs: UpdateJobs, pub firewall_policy: FirewallPolicy, pub login_limiter: RateLimiter, + pub notification_store: NotificationStore, } impl AppState { @@ -160,6 +164,7 @@ impl AppState { with_ui, metrics_collector: Arc::new(MetricsCollector::new()), metrics_store: Arc::new(tokio::sync::RwLock::new(MetricsSnapshot::default())), + command_metrics: Arc::new(tokio::sync::RwLock::new(CommandExecutionMetrics::default())), metrics_tx: broadcast::channel(32).0, metrics_webhook: std::env::var("METRICS_WEBHOOK").ok(), backup_path: std::env::var("BACKUP_PATH").ok(), @@ -187,6 +192,7 @@ impl AppState { update_jobs: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), firewall_policy, login_limiter: RateLimiter::new_per_minute(5), + notification_store: notifications::new_notification_store(), } } } @@ -233,6 +239,7 @@ pub struct HealthResponse { pub status: String, pub token_age_seconds: u64, pub last_refresh_ok: Option, + pub command_metrics: CommandExecutionMetrics, } // ---- Marketplace types ---- @@ -301,13 +308,36 @@ async fn health(State(state): State) -> impl IntoResponse { None }; + let command_metrics = state.command_metrics.read().await.clone(); + Json(HealthResponse { status: "ok".to_string(), token_age_seconds, last_refresh_ok, + command_metrics, }) } +async fn command_metrics_handler(State(state): State) -> impl IntoResponse { + Json(state.command_metrics.read().await.clone()) +} + +async fn record_command_execution(state: &SharedState, executed_by: &str) { + let control_plane = ControlPlane::from_value(Some(executed_by)); + let mut metrics = state.command_metrics.write().await; + metrics.record_execution(control_plane); +} + +async fn attach_command_provenance( + state: &SharedState, + mut result: CommandResult, + executed_by: &str, +) -> CommandResult { + record_command_execution(state, executed_by).await; + result.executed_by = Some(executed_by.to_string()); + result +} + // Login form (GET) async fn login_page(State(state): State) -> impl IntoResponse { if state.with_ui { @@ -1425,12 +1455,35 @@ async fn unlink_handler(State(state): State) -> impl IntoResponse { } } +// ---- Notification API handlers ---- + +async fn notifications_list(State(state): State) -> impl IntoResponse { + let summary = notifications::get_summary(&state.notification_store).await; + Json(summary) +} + +async fn notifications_mark_read( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + notifications::mark_read(&state.notification_store, &req.ids, req.all).await; + Json(json!({"status": "ok"})) +} + +async fn notifications_unread_count(State(state): State) -> impl IntoResponse { + let count = notifications::get_unread_count(&state.notification_store).await; + Json(UnreadCountResponse { + unread_count: count, + }) +} + pub fn create_router(state: SharedState) -> Router { let mut router = Router::new() .route("/health", get(health)) .route("/capabilities", get(capabilities_handler)) .route("/metrics", get(metrics_handler)) .route("/metrics/stream", get(metrics_ws_handler)) + .route("/api/v1/diagnostics/commands", get(command_metrics_handler)) // Self-update endpoints .route("/api/self/version", get(self_version)) .route("/api/self/update/start", post(self_update_start)) @@ -1458,6 +1511,15 @@ pub fn create_router(state: SharedState) -> Router { .route("/link/select", post(link_select_handler)) .route("/link/unlink", post(unlink_handler)); + // Notifications + router = router + .route("/api/v1/notifications", get(notifications_list)) + .route("/api/v1/notifications/read", post(notifications_mark_read)) + .route( + "/api/v1/notifications/unread-count", + get(notifications_unread_count), + ); + #[cfg(feature = "docker")] { router = router @@ -1863,6 +1925,9 @@ async fn commands_report( .into_response() } }; + if let Some(executed_by) = res.executed_by.as_deref() { + record_command_execution(&state, executed_by).await; + } info!(command_id = %res.command_id, status = %res.status, "command result reported"); (StatusCode::OK, Json(json!({"accepted": true}))).into_response() } @@ -1896,10 +1961,18 @@ async fn commands_execute( .into_response() } }; + let executed_by = ControlPlane::from_value( + std::env::var("CONTROL_PLANE") + .ok() + .as_deref() + .or(state.config.control_plane.as_deref()), + ) + .to_string(); if let Some(stacker_cmd) = parsed_stacker_cmd { match execute_stacker_command(&cmd, &stacker_cmd, &state.firewall_policy).await { Ok(result) => { - return Json(result).into_response(); + return Json(attach_command_provenance(&state, result, &executed_by).await) + .into_response(); } Err(e) => { error!( @@ -1936,7 +2009,10 @@ async fn commands_execute( } #[cfg(feature = "docker")] match execute_docker_operation(&cmd.command_id, op).await { - Ok(result) => return Json(result).into_response(), + Ok(result) => { + return Json(attach_command_provenance(&state, result, &executed_by).await) + .into_response() + } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, @@ -1983,7 +2059,10 @@ async fn commands_execute( let executor = CommandExecutor::new(); match executor.execute(&cmd, strategy).await { - Ok(exec) => Json(exec.to_command_result()).into_response(), + Ok(exec) => { + Json(attach_command_provenance(&state, exec.to_command_result(), &executed_by).await) + .into_response() + } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e.to_string()})), @@ -2094,14 +2173,64 @@ pub async fn serve(config: Config, port: u16, with_ui: bool) -> Result<()> { .and_then(|s| s.parse::().ok()) .map(Duration::from_secs) .unwrap_or(Duration::from_secs(30)); + + let alert_manager = { + let cfg = crate::monitoring::alerting::AlertConfig::from_env(); + let mgr = crate::monitoring::alerting::AlertManager::new(cfg); + if mgr.is_enabled() { + tracing::info!("outbound alerting enabled"); + Some(Arc::new(mgr)) + } else { + tracing::debug!("outbound alerting disabled (ALERT_WEBHOOK_URL not set)"); + None + } + }; + spawn_heartbeat( state.metrics_collector.clone(), state.metrics_store.clone(), heartbeat_interval, state.metrics_tx.clone(), state.metrics_webhook.clone(), + alert_manager, ); + // Spawn notification poller if dashboard connection is configured + { + let dashboard_url = + std::env::var("DASHBOARD_URL").unwrap_or_else(|_| "http://localhost:5000".to_string()); + let agent_id = std::env::var("AGENT_ID").unwrap_or_default(); + let agent_token = std::env::var("AGENT_TOKEN").unwrap_or_default(); + + if !agent_token.is_empty() { + // Build a TokenProvider so the poller can refresh on 401/403 + let token_provider = crate::security::token_provider::TokenProvider::from_env( + state.vault_client.clone(), + ); + + let poll_interval = std::env::var("NOTIFICATION_POLL_SECS") + .ok() + .and_then(|s| s.parse::().ok()) + .map(Duration::from_secs) + .unwrap_or(Duration::from_secs(300)); + + let deployment_hash = + std::env::var("DEPLOYMENT_HASH").unwrap_or_else(|_| "default".to_string()); + + notifications::spawn_notification_poller( + dashboard_url, + agent_id, + token_provider, + deployment_hash, + state.notification_store.clone(), + poll_interval, + ); + info!("Notification poller spawned"); + } else { + info!("Notification poller skipped (no AGENT_TOKEN configured)"); + } + } + // Periodic cleanup of rate limiter, login limiter, replay protection, and expired sessions { let state_cleanup = state.clone(); @@ -2137,3 +2266,68 @@ pub async fn serve(config: Config, port: u16, with_ui: bool) -> Result<()> { axum::serve(listener, app).into_future().await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use axum::body::to_bytes; + use serde_json::Value; + + fn test_state(control_plane: Option<&str>) -> SharedState { + Arc::new(AppState::new( + Arc::new(Config { + domain: None, + subdomains: None, + apps_info: None, + reqdata: crate::agent::config::ReqData { + email: "ops@example.com".to_string(), + }, + ssl: None, + compose_agent_enabled: false, + control_plane: control_plane.map(str::to_string), + firewall: None, + }), + false, + None, + )) + } + + #[tokio::test] + async fn health_includes_command_metrics() { + let state = test_state(Some("compose_agent")); + record_command_execution(&state, "compose_agent").await; + + let response = health(State(state)).await.into_response(); + let body = to_bytes(response.into_body(), usize::MAX) + .await + .expect("health body"); + let payload: Value = serde_json::from_slice(&body).expect("health json"); + + assert_eq!(payload["command_metrics"]["compose_agent_count"], 1); + assert_eq!(payload["command_metrics"]["total_count"], 1); + assert_eq!( + payload["command_metrics"]["last_control_plane"], + Value::String("compose_agent".to_string()) + ); + } + + #[tokio::test] + async fn command_metrics_handler_returns_snapshot() { + let state = test_state(Some("status_panel")); + record_command_execution(&state, "status_panel").await; + + let response = command_metrics_handler(State(state)).await.into_response(); + let body = to_bytes(response.into_body(), usize::MAX) + .await + .expect("metrics body"); + let payload: Value = serde_json::from_slice(&body).expect("metrics json"); + + assert_eq!(payload["status_panel_count"], 1); + assert_eq!(payload["compose_agent_count"], 0); + assert_eq!(payload["total_count"], 1); + assert_eq!( + payload["last_control_plane"], + Value::String("status_panel".to_string()) + ); + } +} diff --git a/src/comms/mod.rs b/src/comms/mod.rs index f2536a5..a6a3499 100644 --- a/src/comms/mod.rs +++ b/src/comms/mod.rs @@ -1 +1,2 @@ pub mod local_api; +pub mod notifications; diff --git a/src/comms/notifications.rs b/src/comms/notifications.rs new file mode 100644 index 0000000..357d2d3 --- /dev/null +++ b/src/comms/notifications.rs @@ -0,0 +1,358 @@ +use std::sync::Arc; +use std::time::Duration; + +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tracing::{debug, error, info, warn}; + +use crate::security::token_provider::TokenProvider; +use crate::transport::http_polling::build_signed_headers; + +// ---- Types ---- + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum NotificationKind { + StackUpdateAvailable, + StackPublished, + SystemNotice, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Notification { + pub id: String, + pub kind: NotificationKind, + pub title: String, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub stack_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stack_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub new_version: Option, + pub created_at: String, + #[serde(default)] + pub read: bool, +} + +#[derive(Debug, Serialize)] +pub struct NotificationSummary { + pub unread_count: usize, + pub notifications: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct MarkReadRequest { + #[serde(default)] + pub ids: Vec, + #[serde(default)] + pub all: bool, +} + +#[derive(Debug, Serialize)] +pub struct UnreadCountResponse { + pub unread_count: usize, +} + +pub type NotificationStore = Arc>>; + +pub fn new_notification_store() -> NotificationStore { + Arc::new(RwLock::new(Vec::new())) +} + +// ---- Store operations ---- + +pub async fn get_unread_count(store: &NotificationStore) -> usize { + let notifications = store.read().await; + notifications.iter().filter(|n| !n.read).count() +} + +pub async fn get_summary(store: &NotificationStore) -> NotificationSummary { + let notifications = store.read().await; + let unread_count = notifications.iter().filter(|n| !n.read).count(); + NotificationSummary { + unread_count, + notifications: notifications.clone(), + } +} + +pub async fn mark_read(store: &NotificationStore, ids: &[String], all: bool) { + let mut notifications = store.write().await; + for n in notifications.iter_mut() { + if all || ids.contains(&n.id) { + n.read = true; + } + } +} + +/// Merge incoming notifications into the store, deduplicating by id. +/// New notifications are prepended (most recent first). +pub async fn merge_notifications(store: &NotificationStore, incoming: Vec) { + let mut notifications = store.write().await; + for n in incoming { + if !notifications.iter().any(|existing| existing.id == n.id) { + notifications.insert(0, n); + } + } + // Cap at 100 notifications to prevent unbounded growth + notifications.truncate(100); +} + +// ---- Poller ---- + +#[derive(Debug, Deserialize)] +struct StackerNotificationsResponse { + notifications: Vec, +} + +pub fn spawn_notification_poller( + dashboard_url: String, + agent_id: String, + token_provider: TokenProvider, + deployment_hash: String, + store: NotificationStore, + interval: Duration, +) -> JoinHandle<()> { + tokio::spawn(async move { + let client = Client::builder() + .timeout(Duration::from_secs(15)) + .build() + .expect("failed to build HTTP client for notification poller"); + + let mut suppressed_404 = false; + let mut backoff_secs = 0u64; + + info!( + interval_secs = interval.as_secs(), + "notification poller started" + ); + + loop { + tokio::time::sleep(if backoff_secs > 0 { + Duration::from_secs(backoff_secs) + } else { + interval + }) + .await; + + let url = format!( + "{}/api/v1/agent/notifications?deployment_hash={}", + dashboard_url, deployment_hash + ); + + let token = token_provider.get().await; + let headers = match build_signed_headers(&agent_id, &token, &[]) { + Ok(h) => h, + Err(e) => { + error!(error = %e, "failed to build HMAC headers for notification poll"); + backoff_secs = (backoff_secs * 2).clamp(5, 300); + continue; + } + }; + + match client.get(&url).headers(headers).send().await { + Ok(resp) => { + let status = resp.status().as_u16(); + + // Handle 401/403: refresh token and retry on next iteration + if status == 401 || status == 403 { + warn!( + status, + "auth error from notifications endpoint; refreshing token" + ); + if let Err(e) = token_provider.refresh().await { + warn!(error = %e, "token refresh failed"); + } + backoff_secs = 5; // short backoff before retry with new token + continue; + } + + backoff_secs = 0; + match status { + 200 => { + suppressed_404 = false; + match resp.json::().await { + Ok(body) => { + let count = body.notifications.len(); + if count > 0 { + debug!(count, "received notifications from Stacker"); + merge_notifications(&store, body.notifications).await; + } + } + Err(e) => { + warn!(error = %e, "failed to parse notifications response"); + } + } + } + 204 => { + // No new notifications + } + 404 => { + if !suppressed_404 { + info!("Stacker notifications endpoint not available (404), will retry silently"); + suppressed_404 = true; + } + } + _ => { + warn!(status, "unexpected status from notifications endpoint"); + } + } + } + Err(e) => { + debug!(error = %e, "notification poll failed (network)"); + backoff_secs = (backoff_secs * 2).clamp(5, 300); + } + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_notification(id: &str, kind: NotificationKind) -> Notification { + Notification { + id: id.to_string(), + kind, + title: format!("Test {}", id), + message: "Test message".to_string(), + stack_id: Some("stack-1".to_string()), + stack_name: Some("MyStack".to_string()), + new_version: Some("2.0".to_string()), + created_at: "2026-04-12T00:00:00Z".to_string(), + read: false, + } + } + + #[test] + fn notification_kind_serialization() { + let json = serde_json::to_string(&NotificationKind::StackUpdateAvailable).unwrap(); + assert_eq!(json, r#""stack_update_available""#); + + let json = serde_json::to_string(&NotificationKind::StackPublished).unwrap(); + assert_eq!(json, r#""stack_published""#); + + let json = serde_json::to_string(&NotificationKind::SystemNotice).unwrap(); + assert_eq!(json, r#""system_notice""#); + } + + #[test] + fn notification_kind_deserialization() { + let kind: NotificationKind = serde_json::from_str(r#""stack_update_available""#).unwrap(); + assert_eq!(kind, NotificationKind::StackUpdateAvailable); + } + + #[test] + fn notification_roundtrip() { + let n = sample_notification("n1", NotificationKind::StackPublished); + let json = serde_json::to_string(&n).unwrap(); + let deserialized: Notification = serde_json::from_str(&json).unwrap(); + assert_eq!(deserialized.id, "n1"); + assert_eq!(deserialized.kind, NotificationKind::StackPublished); + assert!(!deserialized.read); + } + + #[test] + fn notification_read_defaults_false() { + let json = + r#"{"id":"x","kind":"system_notice","title":"t","message":"m","created_at":"now"}"#; + let n: Notification = serde_json::from_str(json).unwrap(); + assert!(!n.read); + } + + #[tokio::test] + async fn store_merge_deduplicates() { + let store = new_notification_store(); + let n1 = sample_notification("n1", NotificationKind::StackUpdateAvailable); + let n2 = sample_notification("n2", NotificationKind::StackPublished); + + merge_notifications(&store, vec![n1.clone(), n2]).await; + assert_eq!(store.read().await.len(), 2); + + // Merge again with duplicate id + let n1_dup = sample_notification("n1", NotificationKind::SystemNotice); + let n3 = sample_notification("n3", NotificationKind::SystemNotice); + merge_notifications(&store, vec![n1_dup, n3]).await; + assert_eq!(store.read().await.len(), 3); + + // Original n1 should still be StackUpdateAvailable (not replaced) + let locked = store.read().await; + let found = locked.iter().find(|n| n.id == "n1").unwrap(); + assert_eq!(found.kind, NotificationKind::StackUpdateAvailable); + } + + #[tokio::test] + async fn store_unread_count() { + let store = new_notification_store(); + let n1 = sample_notification("n1", NotificationKind::StackUpdateAvailable); + let mut n2 = sample_notification("n2", NotificationKind::StackPublished); + n2.read = true; + + merge_notifications(&store, vec![n1, n2]).await; + assert_eq!(get_unread_count(&store).await, 1); + } + + #[tokio::test] + async fn mark_read_by_ids() { + let store = new_notification_store(); + merge_notifications( + &store, + vec![ + sample_notification("n1", NotificationKind::StackUpdateAvailable), + sample_notification("n2", NotificationKind::StackPublished), + sample_notification("n3", NotificationKind::SystemNotice), + ], + ) + .await; + + mark_read(&store, &["n1".to_string(), "n3".to_string()], false).await; + + let locked = store.read().await; + assert!(locked.iter().find(|n| n.id == "n1").unwrap().read); + assert!(!locked.iter().find(|n| n.id == "n2").unwrap().read); + assert!(locked.iter().find(|n| n.id == "n3").unwrap().read); + } + + #[tokio::test] + async fn mark_read_all() { + let store = new_notification_store(); + merge_notifications( + &store, + vec![ + sample_notification("n1", NotificationKind::StackUpdateAvailable), + sample_notification("n2", NotificationKind::StackPublished), + ], + ) + .await; + + mark_read(&store, &[], true).await; + assert_eq!(get_unread_count(&store).await, 0); + } + + #[tokio::test] + async fn store_caps_at_100() { + let store = new_notification_store(); + let batch: Vec = (0..120) + .map(|i| sample_notification(&format!("n{}", i), NotificationKind::SystemNotice)) + .collect(); + merge_notifications(&store, batch).await; + assert_eq!(store.read().await.len(), 100); + } + + #[tokio::test] + async fn get_summary_returns_correct_data() { + let store = new_notification_store(); + let mut n1 = sample_notification("n1", NotificationKind::StackUpdateAvailable); + n1.read = true; + let n2 = sample_notification("n2", NotificationKind::StackPublished); + + merge_notifications(&store, vec![n1, n2]).await; + + let summary = get_summary(&store).await; + assert_eq!(summary.unread_count, 1); + assert_eq!(summary.notifications.len(), 2); + } +} diff --git a/src/monitoring/alerting.rs b/src/monitoring/alerting.rs new file mode 100644 index 0000000..c6d44c6 --- /dev/null +++ b/src/monitoring/alerting.rs @@ -0,0 +1,573 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use reqwest::Client; +use serde::Serialize; +use tokio::sync::RwLock; +use tracing::{info, warn}; + +use crate::monitoring::MetricsSnapshot; + +// ---- Types ---- + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum AlertSeverity { + Warning, + Critical, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum AlertKind { + HighCpu, + HighMemory, + HighDisk, +} + +impl std::fmt::Display for AlertKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AlertKind::HighCpu => write!(f, "high_cpu"), + AlertKind::HighMemory => write!(f, "high_memory"), + AlertKind::HighDisk => write!(f, "high_disk"), + } + } +} + +/// An alert event ready for dispatch. +#[derive(Debug, Clone, Serialize)] +pub struct Alert { + pub kind: AlertKind, + pub severity: AlertSeverity, + pub message: String, + pub value: f32, + pub threshold: f32, + pub recovered: bool, + pub timestamp_ms: u128, + #[serde(skip_serializing_if = "Option::is_none")] + pub agent_id: Option, +} + +/// Threshold configuration for a single metric. +#[derive(Debug, Clone, Copy)] +pub struct Threshold { + pub warning: f32, + pub critical: f32, +} + +impl Threshold { + fn evaluate(&self, value: f32) -> Option { + if value >= self.critical { + Some(AlertSeverity::Critical) + } else if value >= self.warning { + Some(AlertSeverity::Warning) + } else { + None + } + } +} + +/// Alert system configuration. +#[derive(Debug, Clone)] +pub struct AlertConfig { + pub webhook_url: Option, + pub cpu: Threshold, + pub memory: Threshold, + pub disk: Threshold, +} + +impl AlertConfig { + /// Build config from environment variables. + /// + /// | Variable | Default | Description | + /// |----------|---------|-------------| + /// | `ALERT_WEBHOOK_URL` | _(none)_ | Webhook endpoint; alerting disabled if unset | + /// | `ALERT_CPU_WARNING` | 80 | CPU % warning threshold | + /// | `ALERT_CPU_CRITICAL` | 95 | CPU % critical threshold | + /// | `ALERT_MEMORY_WARNING` | 80 | Memory % warning threshold | + /// | `ALERT_MEMORY_CRITICAL` | 95 | Memory % critical threshold | + /// | `ALERT_DISK_WARNING` | 80 | Disk % warning threshold | + /// | `ALERT_DISK_CRITICAL` | 95 | Disk % critical threshold | + pub fn from_env() -> Self { + let parse = |var: &str, default: f32| -> f32 { + std::env::var(var) + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(default) + }; + + Self { + webhook_url: std::env::var("ALERT_WEBHOOK_URL").ok(), + cpu: Threshold { + warning: parse("ALERT_CPU_WARNING", 80.0), + critical: parse("ALERT_CPU_CRITICAL", 95.0), + }, + memory: Threshold { + warning: parse("ALERT_MEMORY_WARNING", 80.0), + critical: parse("ALERT_MEMORY_CRITICAL", 95.0), + }, + disk: Threshold { + warning: parse("ALERT_DISK_WARNING", 80.0), + critical: parse("ALERT_DISK_CRITICAL", 95.0), + }, + } + } +} + +// ---- Alert State Tracker (deduplication + recovery) ---- + +/// Tracks which alerts are currently active so we avoid duplicates and detect recovery. +#[derive(Debug, Clone)] +struct ActiveAlert { + severity: AlertSeverity, + #[allow(dead_code)] + fired_at_ms: u128, +} + +/// Evaluates metrics against thresholds, deduplicates, and detects recovery. +#[derive(Debug)] +pub struct AlertManager { + config: AlertConfig, + active: RwLock>, + agent_id: Option, +} + +pub type SharedAlertManager = Arc; + +impl AlertManager { + pub fn new(config: AlertConfig) -> Self { + let agent_id = std::env::var("AGENT_ID").ok(); + Self { + config, + active: RwLock::new(HashMap::new()), + agent_id, + } + } + + /// Returns `true` if alerting is enabled (webhook URL configured). + pub fn is_enabled(&self) -> bool { + self.config + .webhook_url + .as_ref() + .is_some_and(|u| !u.is_empty()) + } + + /// Read-only access to the alert configuration. + pub fn config(&self) -> &AlertConfig { + &self.config + } + + /// Evaluate a metrics snapshot and return any new, escalated, or recovery alerts. + pub async fn evaluate(&self, snapshot: &MetricsSnapshot) -> Vec { + let checks: [(AlertKind, f32, &Threshold); 3] = [ + (AlertKind::HighCpu, snapshot.cpu_usage_pct, &self.config.cpu), + ( + AlertKind::HighMemory, + snapshot.memory_used_pct, + &self.config.memory, + ), + ( + AlertKind::HighDisk, + snapshot.disk_used_pct, + &self.config.disk, + ), + ]; + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + + let mut alerts = Vec::new(); + let mut active = self.active.write().await; + + for (kind, value, threshold) in checks { + match threshold.evaluate(value) { + Some(severity) => { + // Check if already firing at this severity (dedup) + let should_fire = match active.get(&kind) { + Some(existing) => existing.severity != severity, + None => true, + }; + + if should_fire { + let label = match kind { + AlertKind::HighCpu => "CPU usage", + AlertKind::HighMemory => "Memory usage", + AlertKind::HighDisk => "Disk usage", + }; + let threshold_val = match severity { + AlertSeverity::Warning => threshold.warning, + AlertSeverity::Critical => threshold.critical, + }; + alerts.push(Alert { + kind, + severity, + message: format!( + "{} at {:.1}% (threshold: {:.0}%)", + label, value, threshold_val + ), + value, + threshold: threshold_val, + recovered: false, + timestamp_ms: now_ms, + agent_id: self.agent_id.clone(), + }); + + active.insert( + kind, + ActiveAlert { + severity, + fired_at_ms: now_ms, + }, + ); + } + } + None => { + // Value dropped below all thresholds — recovery + if active.remove(&kind).is_some() { + let label = match kind { + AlertKind::HighCpu => "CPU usage", + AlertKind::HighMemory => "Memory usage", + AlertKind::HighDisk => "Disk usage", + }; + alerts.push(Alert { + kind, + severity: AlertSeverity::Warning, + message: format!("{} recovered to {:.1}%", label, value), + value, + threshold: threshold.warning, + recovered: true, + timestamp_ms: now_ms, + agent_id: self.agent_id.clone(), + }); + } + } + } + } + + alerts + } +} + +// ---- Webhook Dispatcher ---- + +/// Payload sent to the alert webhook. +#[derive(Debug, Serialize)] +struct WebhookPayload { + alerts: Vec, + agent_id: Option, + timestamp_ms: u128, +} + +/// Dispatch alerts to the configured webhook with retry and backoff. +pub async fn dispatch_alerts( + client: &Client, + webhook_url: &str, + alerts: Vec, + agent_id: Option, +) { + if alerts.is_empty() { + return; + } + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + + let payload = WebhookPayload { + alerts, + agent_id: agent_id.clone(), + timestamp_ms: now_ms, + }; + + let max_retries: u8 = 3; + let mut delay = Duration::from_secs(1); + + for attempt in 1..=max_retries { + let mut req = client.post(webhook_url).json(&payload); + if let Some(aid) = agent_id.as_ref() { + req = req.header("X-Agent-Id", aid); + } + + match req.send().await { + Ok(resp) => { + let status = resp.status(); + if status.is_success() { + info!(count = payload.alerts.len(), "alerts dispatched to webhook"); + return; + } + if status.is_client_error() { + warn!( + attempt, + status = %status, + "alert webhook client error; not retrying" + ); + return; + } + warn!( + attempt, + status = %status, + "alert webhook server error; retrying" + ); + } + Err(e) => { + warn!(attempt, error = %e, "alert webhook dispatch failed; retrying"); + } + } + + tokio::time::sleep(delay).await; + delay = (delay * 2).min(Duration::from_secs(16)); + } + + warn!("alert dispatch exhausted retries"); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config(warning: f32, critical: f32) -> AlertConfig { + AlertConfig { + webhook_url: Some("http://test/alerts".into()), + cpu: Threshold { warning, critical }, + memory: Threshold { warning, critical }, + disk: Threshold { warning, critical }, + } + } + + fn snapshot_with(cpu: f32, mem: f32, disk: f32) -> MetricsSnapshot { + MetricsSnapshot { + timestamp_ms: 1700000000000, + cpu_usage_pct: cpu, + memory_total_bytes: 16_000_000_000, + memory_used_bytes: 8_000_000_000, + memory_used_pct: mem, + disk_total_bytes: 500_000_000_000, + disk_used_bytes: 250_000_000_000, + disk_used_pct: disk, + } + } + + #[test] + fn threshold_evaluate_below() { + let t = Threshold { + warning: 80.0, + critical: 95.0, + }; + assert_eq!(t.evaluate(50.0), None); + } + + #[test] + fn threshold_evaluate_warning() { + let t = Threshold { + warning: 80.0, + critical: 95.0, + }; + assert_eq!(t.evaluate(85.0), Some(AlertSeverity::Warning)); + } + + #[test] + fn threshold_evaluate_critical() { + let t = Threshold { + warning: 80.0, + critical: 95.0, + }; + assert_eq!(t.evaluate(96.0), Some(AlertSeverity::Critical)); + } + + #[test] + fn threshold_evaluate_exact_boundary() { + let t = Threshold { + warning: 80.0, + critical: 95.0, + }; + assert_eq!(t.evaluate(80.0), Some(AlertSeverity::Warning)); + assert_eq!(t.evaluate(95.0), Some(AlertSeverity::Critical)); + } + + #[tokio::test] + async fn no_alerts_when_all_normal() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + let snap = snapshot_with(30.0, 40.0, 50.0); + let alerts = mgr.evaluate(&snap).await; + assert!(alerts.is_empty()); + } + + #[tokio::test] + async fn fires_warning_on_high_cpu() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + let snap = snapshot_with(85.0, 40.0, 50.0); + let alerts = mgr.evaluate(&snap).await; + assert_eq!(alerts.len(), 1); + assert_eq!(alerts[0].kind, AlertKind::HighCpu); + assert_eq!(alerts[0].severity, AlertSeverity::Warning); + assert!(!alerts[0].recovered); + } + + #[tokio::test] + async fn fires_critical_on_high_memory() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + let snap = snapshot_with(30.0, 96.0, 50.0); + let alerts = mgr.evaluate(&snap).await; + assert_eq!(alerts.len(), 1); + assert_eq!(alerts[0].kind, AlertKind::HighMemory); + assert_eq!(alerts[0].severity, AlertSeverity::Critical); + } + + #[tokio::test] + async fn dedup_same_severity() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + let snap = snapshot_with(85.0, 40.0, 50.0); + + let first = mgr.evaluate(&snap).await; + assert_eq!(first.len(), 1); + + // Same severity again → deduplicated + let second = mgr.evaluate(&snap).await; + assert!(second.is_empty()); + } + + #[tokio::test] + async fn escalation_fires_new_alert() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + + // Warning + let alerts = mgr.evaluate(&snapshot_with(85.0, 40.0, 50.0)).await; + assert_eq!(alerts.len(), 1); + assert_eq!(alerts[0].severity, AlertSeverity::Warning); + + // Escalate to critical + let alerts = mgr.evaluate(&snapshot_with(96.0, 40.0, 50.0)).await; + assert_eq!(alerts.len(), 1); + assert_eq!(alerts[0].severity, AlertSeverity::Critical); + } + + #[tokio::test] + async fn recovery_fires_alert() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + + // Fire + let _ = mgr.evaluate(&snapshot_with(85.0, 40.0, 50.0)).await; + + // Recover + let alerts = mgr.evaluate(&snapshot_with(50.0, 40.0, 50.0)).await; + assert_eq!(alerts.len(), 1); + assert!(alerts[0].recovered); + assert_eq!(alerts[0].kind, AlertKind::HighCpu); + } + + #[tokio::test] + async fn multiple_alerts_at_once() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + let snap = snapshot_with(96.0, 90.0, 85.0); + let alerts = mgr.evaluate(&snap).await; + assert_eq!(alerts.len(), 3); + } + + #[tokio::test] + async fn recovery_only_fires_once() { + let mgr = AlertManager::new(test_config(80.0, 95.0)); + + let _ = mgr.evaluate(&snapshot_with(85.0, 40.0, 50.0)).await; + let recovery = mgr.evaluate(&snapshot_with(50.0, 40.0, 50.0)).await; + assert_eq!(recovery.len(), 1); + + // No more recovery alerts + let again = mgr.evaluate(&snapshot_with(50.0, 40.0, 50.0)).await; + assert!(again.is_empty()); + } + + #[test] + fn alert_kind_display() { + assert_eq!(AlertKind::HighCpu.to_string(), "high_cpu"); + assert_eq!(AlertKind::HighMemory.to_string(), "high_memory"); + assert_eq!(AlertKind::HighDisk.to_string(), "high_disk"); + } + + #[test] + fn alert_serialization() { + let alert = Alert { + kind: AlertKind::HighCpu, + severity: AlertSeverity::Critical, + message: "CPU at 96%".into(), + value: 96.0, + threshold: 95.0, + recovered: false, + timestamp_ms: 1700000000000, + agent_id: Some("agent-1".into()), + }; + let json = serde_json::to_string(&alert).unwrap(); + assert!(json.contains("\"kind\":\"high_cpu\"")); + assert!(json.contains("\"severity\":\"critical\"")); + assert!(json.contains("\"recovered\":false")); + } + + #[test] + fn alert_config_defaults() { + // Don't set env vars — test defaults + let cfg = AlertConfig { + webhook_url: None, + cpu: Threshold { + warning: 80.0, + critical: 95.0, + }, + memory: Threshold { + warning: 80.0, + critical: 95.0, + }, + disk: Threshold { + warning: 80.0, + critical: 95.0, + }, + }; + assert!(cfg.webhook_url.is_none()); + assert_eq!(cfg.cpu.warning, 80.0); + assert_eq!(cfg.disk.critical, 95.0); + } + + #[test] + fn alert_manager_disabled_without_webhook() { + let config = AlertConfig { + webhook_url: None, + cpu: Threshold { + warning: 80.0, + critical: 95.0, + }, + memory: Threshold { + warning: 80.0, + critical: 95.0, + }, + disk: Threshold { + warning: 80.0, + critical: 95.0, + }, + }; + let mgr = AlertManager::new(config); + assert!(!mgr.is_enabled()); + } + + #[test] + fn alert_manager_enabled_with_webhook() { + let config = AlertConfig { + webhook_url: Some("http://hooks.example.com/alerts".into()), + cpu: Threshold { + warning: 80.0, + critical: 95.0, + }, + memory: Threshold { + warning: 80.0, + critical: 95.0, + }, + disk: Threshold { + warning: 80.0, + critical: 95.0, + }, + }; + let mgr = AlertManager::new(config); + assert!(mgr.is_enabled()); + } +} diff --git a/src/monitoring/mod.rs b/src/monitoring/mod.rs index 9a41984..5bb1573 100644 --- a/src/monitoring/mod.rs +++ b/src/monitoring/mod.rs @@ -1,3 +1,5 @@ +pub mod alerting; + use reqwest::Client; use serde::Serialize; use std::sync::Arc; @@ -8,6 +10,8 @@ use tokio::sync::{Mutex, RwLock}; use tokio::task::JoinHandle; use tracing::info; +use crate::monitoring::alerting::{dispatch_alerts, SharedAlertManager}; + #[derive(Debug, Clone, Serialize, Default)] pub struct MetricsSnapshot { pub timestamp_ms: u128, @@ -28,6 +32,15 @@ pub enum ControlPlane { ComposeAgent, } +impl ControlPlane { + pub fn from_value(value: Option<&str>) -> Self { + match value { + Some("compose_agent") => ControlPlane::ComposeAgent, + _ => ControlPlane::StatusPanel, + } + } +} + impl std::fmt::Display for ControlPlane { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -146,8 +159,14 @@ pub fn spawn_heartbeat( interval: Duration, tx: MetricsTx, webhook: Option, + alert_manager: Option, ) -> JoinHandle<()> { - let client = webhook.as_ref().map(|_| Client::new()); + let client = webhook.as_ref().map(|_| Client::new()).or_else(|| { + alert_manager + .as_ref() + .filter(|m| m.is_enabled()) + .map(|_| Client::new()) + }); let agent_id = std::env::var("AGENT_ID").ok(); tokio::spawn(async move { loop { @@ -184,7 +203,6 @@ pub fn spawn_heartbeat( tracing::debug!(attempt, status = %status, "metrics webhook push succeeded"); break; } else if status.is_client_error() { - // Do not retry on client-side errors (e.g., 401/403/404) tracing::warn!(attempt, status = %status, "metrics webhook push client error; not retrying"); break; } else { @@ -196,19 +214,32 @@ pub fn spawn_heartbeat( } } - // Jitter derived from current time to avoid herd effects let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.subsec_nanos()) .unwrap_or(0); let jitter = Duration::from_millis(50 + (nanos % 200) as u64); tokio::time::sleep(delay + jitter).await; - // Exponential backoff capped at ~8s delay = delay.saturating_mul(2).min(Duration::from_secs(8)); } }); } + // Evaluate alert thresholds and dispatch if needed + if let (Some(mgr), Some(http)) = (alert_manager.as_ref(), client.as_ref()) { + if mgr.is_enabled() { + let alerts = mgr.evaluate(&snapshot).await; + if !alerts.is_empty() { + let http = http.clone(); + let url = mgr.config().webhook_url.clone().unwrap_or_default(); + let agent = agent_id.clone(); + tokio::spawn(async move { + dispatch_alerts(&http, &url, alerts, agent).await; + }); + } + } + } + info!( cpu = snapshot.cpu_usage_pct, mem_used_bytes = snapshot.memory_used_bytes, @@ -277,6 +308,23 @@ mod tests { assert_ne!(ControlPlane::StatusPanel, ControlPlane::ComposeAgent); } + #[test] + fn control_plane_from_value_defaults_to_status_panel() { + assert_eq!( + ControlPlane::from_value(Some("compose_agent")), + ControlPlane::ComposeAgent + ); + assert_eq!( + ControlPlane::from_value(Some("status_panel")), + ControlPlane::StatusPanel + ); + assert_eq!( + ControlPlane::from_value(Some("unexpected")), + ControlPlane::StatusPanel + ); + assert_eq!(ControlPlane::from_value(None), ControlPlane::StatusPanel); + } + #[test] fn command_execution_metrics_default() { let metrics = CommandExecutionMetrics::default(); diff --git a/src/security/mod.rs b/src/security/mod.rs index c2f5e55..86631ab 100644 --- a/src/security/mod.rs +++ b/src/security/mod.rs @@ -10,5 +10,6 @@ pub mod validation; // Vault integration for token rotation pub mod token_cache; +pub mod token_provider; pub mod token_refresh; pub mod vault_client; diff --git a/src/security/token_provider.rs b/src/security/token_provider.rs new file mode 100644 index 0000000..61f8bfc --- /dev/null +++ b/src/security/token_provider.rs @@ -0,0 +1,193 @@ +use std::sync::Arc; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +use super::vault_client::VaultClient; + +/// Minimum seconds between consecutive refresh attempts to prevent hammering. +const REFRESH_COOLDOWN_SECS: i64 = 10; + +/// Provides shared, refreshable access to the agent token. +/// +/// When a 401/403 is received from Stacker, callers invoke `refresh()` which: +/// 1. Checks a cooldown to avoid hammering Vault or env re-reads. +/// 2. Tries Vault (if configured) to get a new token. +/// 3. Falls back to re-reading `AGENT_TOKEN` from the environment. +/// 4. Returns whether the token actually changed. +#[derive(Debug, Clone)] +pub struct TokenProvider { + token: Arc>, + vault_client: Option, + deployment_hash: String, + last_refresh: Arc>>>, +} + +impl TokenProvider { + pub fn new( + initial_token: String, + vault_client: Option, + deployment_hash: String, + ) -> Self { + Self { + token: Arc::new(RwLock::new(initial_token)), + vault_client, + deployment_hash, + last_refresh: Arc::new(RwLock::new(None)), + } + } + + /// Build a provider from environment variables, optionally attaching a Vault client. + pub fn from_env(vault_client: Option) -> Self { + let token = std::env::var("AGENT_TOKEN").unwrap_or_default(); + let deployment_hash = + std::env::var("DEPLOYMENT_HASH").unwrap_or_else(|_| "default".to_string()); + Self::new(token, vault_client, deployment_hash) + } + + /// Get the current token value. + pub async fn get(&self) -> String { + self.token.read().await.clone() + } + + /// Attempt to refresh the token after a 401/403. + /// + /// Returns `Ok(true)` if the token was actually changed, `Ok(false)` if + /// it stayed the same (cooldown, Vault returned same token, no env change). + pub async fn refresh(&self) -> Result { + // Cooldown check + { + let last = self.last_refresh.read().await; + if let Some(t) = *last { + let elapsed = (Utc::now() - t).num_seconds(); + if elapsed < REFRESH_COOLDOWN_SECS { + debug!( + elapsed, + cooldown = REFRESH_COOLDOWN_SECS, + "token refresh skipped (cooldown)" + ); + return Ok(false); + } + } + } + + // Record this attempt + { + let mut last = self.last_refresh.write().await; + *last = Some(Utc::now()); + } + + let old_token = self.token.read().await.clone(); + + // Strategy 1: Vault + if let Some(vault) = &self.vault_client { + match vault.fetch_agent_token(&self.deployment_hash, None).await { + Ok(new_token) if new_token != old_token => { + let mut token = self.token.write().await; + *token = new_token; + info!("Agent token refreshed from Vault after auth error"); + return Ok(true); + } + Ok(_) => { + debug!("Vault returned same token; trying env fallback"); + } + Err(e) => { + warn!(error = %e, "Vault token refresh failed; trying env fallback"); + } + } + } + + // Strategy 2: re-read AGENT_TOKEN from environment + let env_token = std::env::var("AGENT_TOKEN").unwrap_or_default(); + if !env_token.is_empty() && env_token != old_token { + let mut token = self.token.write().await; + *token = env_token; + info!("Agent token refreshed from environment after auth error"); + return Ok(true); + } + + debug!("No new token available after refresh attempt"); + Ok(false) + } + + /// Directly swap the token (used by background rotation tasks). + pub async fn swap(&self, new_token: String) { + let mut token = self.token.write().await; + if *token != new_token { + *token = new_token; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::EnvGuard; + use std::sync::{Mutex, OnceLock}; + + /// Serializes tests that mutate AGENT_TOKEN env var. + fn env_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) + } + + #[tokio::test] + async fn get_returns_initial_token() { + let tp = TokenProvider::new("tok123".into(), None, "hash".into()); + assert_eq!(tp.get().await, "tok123"); + } + + #[tokio::test] + async fn swap_updates_token() { + let tp = TokenProvider::new("old".into(), None, "hash".into()); + tp.swap("new".into()).await; + assert_eq!(tp.get().await, "new"); + } + + #[tokio::test] + async fn refresh_without_vault_reads_env() { + let _guard = env_lock().lock().unwrap(); + let _env = EnvGuard::set("AGENT_TOKEN", "env_refreshed_tp"); + let tp = TokenProvider::new("stale".into(), None, "hash".into()); + + let changed = tp.refresh().await.unwrap(); + assert!(changed); + assert_eq!(tp.get().await, "env_refreshed_tp"); + } + + #[tokio::test] + async fn refresh_respects_cooldown() { + let _guard = env_lock().lock().unwrap(); + let _env = EnvGuard::set("AGENT_TOKEN", "fresh_tp"); + let tp = TokenProvider::new("stale".into(), None, "hash".into()); + + let first = tp.refresh().await.unwrap(); + assert!(first); + + // Second attempt within cooldown should be skipped + std::env::set_var("AGENT_TOKEN", "even_fresher_tp"); + let second = tp.refresh().await.unwrap(); + assert!(!second); + assert_eq!(tp.get().await, "fresh_tp"); + } + + #[tokio::test] + async fn refresh_noop_when_env_same() { + let _guard = env_lock().lock().unwrap(); + let _env = EnvGuard::set("AGENT_TOKEN", "same"); + let tp = TokenProvider::new("same".into(), None, "hash".into()); + + let changed = tp.refresh().await.unwrap(); + assert!(!changed); + } + + #[tokio::test] + async fn clone_shares_state() { + let tp = TokenProvider::new("a".into(), None, "h".into()); + let tp2 = tp.clone(); + tp2.swap("b".into()).await; + assert_eq!(tp.get().await, "b"); + } +} diff --git a/src/transport/http_polling.rs b/src/transport/http_polling.rs index a29b288..4845f50 100644 --- a/src/transport/http_polling.rs +++ b/src/transport/http_polling.rs @@ -11,6 +11,8 @@ use tracing::{debug, trace}; use uuid::Uuid; use crate::security::request_signer::compute_signature_base64; +use crate::security::token_provider::TokenProvider; +use crate::transport::retry::{signed_get_with_retry, signed_post_with_retry, RetryConfig}; use crate::transport::Command; const TS_OVERRIDE_ENV: &str = "HTTP_POLLING_TS_OVERRIDE"; @@ -34,7 +36,7 @@ fn signing_meta() -> (i64, String) { (ts, request_id) } -fn build_signed_headers(agent_id: &str, agent_token: &str, body: &[u8]) -> Result { +pub fn build_signed_headers(agent_id: &str, agent_token: &str, body: &[u8]) -> Result { let (ts, request_id) = signing_meta(); let sig = compute_signature_base64(agent_token, body); @@ -349,6 +351,7 @@ pub async fn report_result( result: &Option, error: &Option, completed_at: &str, + executed_by: Option<&str>, ) -> Result<()> { let url = format!("{}/api/v1/agent/commands/report", base_url); @@ -369,6 +372,12 @@ pub async fn report_result( "completed_at".to_string(), serde_json::Value::String(completed_at.to_string()), ); + if let Some(executed_by) = executed_by { + body.insert( + "executed_by".to_string(), + serde_json::Value::String(executed_by.to_string()), + ); + } if let Some(res) = result { body.insert("result".to_string(), res.clone()); @@ -424,6 +433,127 @@ pub async fn update_app_status( } } +// ---- Retry-aware variants (use TokenProvider + automatic 401/403 refresh) ---- + +/// Long-poll for a command with automatic token refresh on 401/403. +pub async fn wait_for_command_with_retry( + base_url: &str, + deployment_hash: &str, + agent_id: &str, + token_provider: &TokenProvider, + timeout_secs: u64, + priority: Option<&str>, +) -> Result { + let url = build_wait_command_url(base_url, deployment_hash, timeout_secs, priority); + let client = create_http_client()?; + + debug!( + url = %url, + deployment_hash = %deployment_hash, + timeout_secs = %timeout_secs, + "initiating long-poll with retry" + ); + + let config = RetryConfig::auth_only(); + let response = signed_get_with_retry( + &client, + &url, + agent_id, + token_provider, + Duration::from_secs(timeout_secs + 5), + &config, + ) + .await?; + + handle_poll_response(response, &url).await +} + +/// Report command result with automatic token refresh on 401/403. +#[allow(clippy::too_many_arguments)] +pub async fn report_result_with_retry( + base_url: &str, + agent_id: &str, + token_provider: &TokenProvider, + command_id: &str, + deployment_hash: &str, + status: &str, + result: &Option, + error: &Option, + completed_at: &str, + executed_by: Option<&str>, +) -> Result<()> { + let url = format!("{}/api/v1/agent/commands/report", base_url); + + let mut body = serde_json::Map::new(); + body.insert("command_id".into(), Value::String(command_id.into())); + body.insert( + "deployment_hash".into(), + Value::String(deployment_hash.into()), + ); + body.insert("status".into(), Value::String(status.into())); + body.insert("completed_at".into(), Value::String(completed_at.into())); + if let Some(executed_by) = executed_by { + body.insert("executed_by".into(), Value::String(executed_by.to_string())); + } + + if let Some(res) = result { + body.insert("result".into(), res.clone()); + } + body.insert( + "error".into(), + error + .as_ref() + .map(|e| Value::String(e.clone())) + .unwrap_or(Value::Null), + ); + + debug!(url = %url, body = ?body, "reporting result with retry"); + + let client = Client::new(); + let config = RetryConfig::default(); + let resp = + signed_post_with_retry(&client, &url, agent_id, token_provider, &body, &config).await?; + let status_code = resp.status(); + + if status_code.is_success() { + debug!(status_code = %status_code.as_u16(), "command result reported successfully"); + Ok(()) + } else { + let error_body = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + Err(anyhow!( + "report failed: {} | body: {}", + status_code, + error_body + )) + } +} + +/// Update app status with automatic token refresh on 401/403. +pub async fn update_app_status_with_retry( + base_url: &str, + agent_id: &str, + token_provider: &TokenProvider, + payload: &T, +) -> Result<()> { + let url = format!("{}/api/v1/apps/status", base_url); + let client = Client::new(); + let config = RetryConfig::default(); + let resp = + signed_post_with_retry(&client, &url, agent_id, token_provider, payload, &config).await?; + + if resp.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "app status update failed: {}", + resp.status() + )) + } +} + #[cfg(test)] mod tests { use super::*; @@ -453,6 +583,7 @@ mod tests { let result: Option = None; let error = None; let completed_at = "2023-11-15T10:00:00Z"; + let executed_by = Some("compose_agent"); let mut payload = serde_json::Map::new(); payload.insert( @@ -471,6 +602,10 @@ mod tests { "completed_at".to_string(), serde_json::Value::String(completed_at.to_string()), ); + payload.insert( + "executed_by".to_string(), + serde_json::Value::String(executed_by.unwrap().to_string()), + ); if let Some(value) = result.clone() { payload.insert("result".to_string(), value); } @@ -505,12 +640,92 @@ mod tests { &result, &error, completed_at, + executed_by, ) .await .expect("report_result should succeed"); mock.assert(); } + #[tokio::test] + async fn report_result_with_retry_posts_executed_by() { + let _guard = env_lock().lock().expect("env lock poisoned"); + env::set_var(TS_OVERRIDE_ENV, "1700000000"); + env::set_var(REQUEST_ID_OVERRIDE_ENV, "req-123-retry"); + + let mut server = Server::new_async().await; + let base_url = server.url(); + let agent_id = "agent-123"; + let agent_token = "token-abc"; + let token_provider = + TokenProvider::new(agent_token.to_string(), None, "dep-hash-123".into()); + let command_id = "cmd-1"; + let deployment_hash = "dep-hash-123"; + let status = "success"; + let result: Option = None; + let error = None; + let completed_at = "2023-11-15T10:00:00Z"; + let executed_by = Some("compose_agent"); + + let mut payload = serde_json::Map::new(); + payload.insert( + "command_id".into(), + serde_json::Value::String(command_id.to_string()), + ); + payload.insert( + "deployment_hash".into(), + serde_json::Value::String(deployment_hash.to_string()), + ); + payload.insert( + "status".into(), + serde_json::Value::String(status.to_string()), + ); + payload.insert( + "completed_at".into(), + serde_json::Value::String(completed_at.to_string()), + ); + payload.insert( + "executed_by".into(), + serde_json::Value::String(executed_by.unwrap().to_string()), + ); + payload.insert("error".into(), serde_json::Value::Null); + + let body = serde_json::to_vec(&payload).unwrap(); + let signature = compute_signature_base64(agent_token, &body); + let ts = env::var(TS_OVERRIDE_ENV).unwrap(); + let req_id = env::var(REQUEST_ID_OVERRIDE_ENV).unwrap(); + let mock = server + .mock("POST", "/api/v1/agent/commands/report") + .match_header("X-Agent-Id", Matcher::Exact(agent_id.into())) + .match_header( + "Authorization", + Matcher::Exact(format!("Bearer {}", agent_token)), + ) + .match_header("X-Timestamp", Matcher::Exact(ts)) + .match_header("X-Request-Id", Matcher::Exact(req_id)) + .match_header("X-Agent-Signature", Matcher::Exact(signature)) + .match_body(Matcher::Exact(String::from_utf8(body).unwrap())) + .with_status(200) + .create_async() + .await; + + report_result_with_retry( + &base_url, + agent_id, + &token_provider, + command_id, + deployment_hash, + status, + &result, + &error, + completed_at, + executed_by, + ) + .await + .expect("report_result_with_retry should succeed"); + mock.assert(); + } + #[tokio::test] async fn update_app_status_posts_payload() { let _guard = env_lock().lock().expect("env lock poisoned"); diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 293213b..43afae4 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1,4 +1,5 @@ pub mod http_polling; +pub mod retry; pub mod websocket; use serde::{Deserialize, Serialize}; @@ -35,6 +36,8 @@ pub struct CommandResult { pub truncated: Option, #[serde(skip_serializing_if = "Option::is_none")] pub cursor: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub executed_by: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -54,4 +57,6 @@ pub struct StackerCommandReport { pub result: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub executed_by: Option, } diff --git a/src/transport/retry.rs b/src/transport/retry.rs new file mode 100644 index 0000000..1d4f8db --- /dev/null +++ b/src/transport/retry.rs @@ -0,0 +1,227 @@ +use std::time::Duration; + +use anyhow::{Context, Result}; +use reqwest::header::CONTENT_TYPE; +use reqwest::{Client, Response}; +use serde::Serialize; +use tracing::warn; + +use crate::security::token_provider::TokenProvider; +use crate::transport::http_polling::build_signed_headers; + +/// Configuration for retry behaviour on outbound Stacker requests. +#[derive(Debug, Clone)] +pub struct RetryConfig { + /// How many times to retry after a 401/403 (each attempt refreshes the token first). + pub max_auth_retries: u32, + /// How many times to retry on 5xx / network errors with exponential backoff. + pub max_server_retries: u32, + /// Starting backoff duration for server/network retries. + pub initial_backoff: Duration, + /// Maximum backoff cap. + pub max_backoff: Duration, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_auth_retries: 1, + max_server_retries: 3, + initial_backoff: Duration::from_secs(2), + max_backoff: Duration::from_secs(60), + } + } +} + +impl RetryConfig { + /// Suitable for long-poll requests where server retries are handled by the outer loop. + pub fn auth_only() -> Self { + Self { + max_auth_retries: 1, + max_server_retries: 0, + initial_backoff: Duration::from_secs(2), + max_backoff: Duration::from_secs(60), + } + } +} + +/// Returns `true` if the status code indicates an auth failure (401 or 403). +fn is_auth_error(status: u16) -> bool { + status == 401 || status == 403 +} + +/// Send a signed GET request, automatically refreshing the token on 401/403. +pub async fn signed_get_with_retry( + client: &Client, + url: &str, + agent_id: &str, + token_provider: &TokenProvider, + timeout: Duration, + config: &RetryConfig, +) -> Result { + let mut auth_retries = 0u32; + let mut server_retries = 0u32; + let mut backoff = config.initial_backoff; + + loop { + let token = token_provider.get().await; + let headers = build_signed_headers(agent_id, &token, &[])?; + + let result = client + .get(url) + .headers(headers) + .timeout(timeout) + .send() + .await; + + match result { + Ok(resp) => { + let status = resp.status().as_u16(); + + if is_auth_error(status) && auth_retries < config.max_auth_retries { + auth_retries += 1; + warn!( + status, + attempt = auth_retries, + url = %url, + "auth error from Stacker; refreshing token and retrying" + ); + token_provider.refresh().await?; + continue; + } + + if resp.status().is_server_error() && server_retries < config.max_server_retries { + server_retries += 1; + warn!( + status, + attempt = server_retries, + backoff_ms = backoff.as_millis() as u64, + "server error; retrying with backoff" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(config.max_backoff); + continue; + } + + return Ok(resp); + } + Err(e) => { + if server_retries < config.max_server_retries { + server_retries += 1; + warn!( + error = %e, + attempt = server_retries, + "network error; retrying with backoff" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(config.max_backoff); + continue; + } + return Err(e).context("signed GET failed after retries"); + } + } + } +} + +/// Send a signed POST (JSON body) request with 401/403 retry. +pub async fn signed_post_with_retry( + client: &Client, + url: &str, + agent_id: &str, + token_provider: &TokenProvider, + payload: &T, + config: &RetryConfig, +) -> Result { + let body_bytes = serde_json::to_vec(payload).context("serialize JSON body")?; + let mut auth_retries = 0u32; + let mut server_retries = 0u32; + let mut backoff = config.initial_backoff; + + loop { + let token = token_provider.get().await; + let headers = build_signed_headers(agent_id, &token, &body_bytes)?; + + let result = client + .post(url) + .headers(headers) + .header(CONTENT_TYPE, "application/json") + .body(body_bytes.clone()) + .send() + .await; + + match result { + Ok(resp) => { + let status = resp.status().as_u16(); + + if is_auth_error(status) && auth_retries < config.max_auth_retries { + auth_retries += 1; + warn!( + status, + attempt = auth_retries, + url = %url, + "auth error on POST; refreshing token and retrying" + ); + token_provider.refresh().await?; + continue; + } + + if resp.status().is_server_error() && server_retries < config.max_server_retries { + server_retries += 1; + warn!( + status, + attempt = server_retries, + backoff_ms = backoff.as_millis() as u64, + "server error on POST; retrying with backoff" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(config.max_backoff); + continue; + } + + return Ok(resp); + } + Err(e) => { + if server_retries < config.max_server_retries { + server_retries += 1; + warn!( + error = %e, + attempt = server_retries, + "network error on POST; retrying with backoff" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(config.max_backoff); + continue; + } + return Err(e).context("signed POST failed after retries"); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_config_values() { + let cfg = RetryConfig::default(); + assert_eq!(cfg.max_auth_retries, 1); + assert_eq!(cfg.max_server_retries, 3); + } + + #[test] + fn auth_only_config_no_server_retries() { + let cfg = RetryConfig::auth_only(); + assert_eq!(cfg.max_server_retries, 0); + assert_eq!(cfg.max_auth_retries, 1); + } + + #[test] + fn is_auth_error_detects_401_403() { + assert!(is_auth_error(401)); + assert!(is_auth_error(403)); + assert!(!is_auth_error(200)); + assert!(!is_auth_error(500)); + assert!(!is_auth_error(404)); + } +} diff --git a/static/css/style.css b/static/css/style.css index eac406d..4eb10aa 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -265,6 +265,137 @@ h1, h2, h3, h4 { .badge-warning .badge-dot { background: var(--color-warning); } .badge-error .badge-dot { background: var(--color-error); } +/* ===== Notification bell ===== */ +.notification-bell { + position: relative; + cursor: pointer; + background: none; + border: none; + color: var(--color-text-light); + padding: 4px; + border-radius: var(--radius-md); + transition: color 0.15s, background 0.15s; +} +.notification-bell:hover { + color: var(--color-accent); + background: var(--color-accent-light); +} +.notification-bell .material-icons-outlined { + font-size: 24px; + display: block; +} +.notification-bell .bell-badge { + position: absolute; + top: 0; + right: -2px; + min-width: 18px; + height: 18px; + padding: 0 5px; + border-radius: 9px; + background: var(--color-error); + color: #fff; + font-size: 11px; + font-weight: 700; + line-height: 18px; + text-align: center; + display: none; +} +.notification-bell .bell-badge.has-unread { + display: block; +} + +.notification-dropdown { + position: absolute; + top: calc(100% + 8px); + right: 0; + width: 360px; + max-height: 420px; + background: var(--color-bg-card); + border: 1px solid var(--color-border); + border-radius: var(--radius-lg); + box-shadow: var(--shadow-lg); + z-index: 100; + display: none; + overflow: hidden; +} +.notification-dropdown.open { + display: flex; + flex-direction: column; +} +.notification-dropdown-header { + padding: 12px 16px; + border-bottom: 1px solid var(--color-border); + display: flex; + align-items: center; + justify-content: space-between; +} +.notification-dropdown-header h4 { + font-size: 14px; + font-weight: 700; + margin: 0; +} +.notification-dropdown-header button { + font-size: 12px; + color: var(--color-accent); + background: none; + border: none; + cursor: pointer; + padding: 0; +} +.notification-dropdown-header button:hover { + text-decoration: underline; +} +.notification-dropdown-list { + overflow-y: auto; + flex: 1; + max-height: 360px; +} +.notification-item { + padding: 12px 16px; + border-bottom: 1px solid var(--color-border); + display: flex; + gap: 10px; + align-items: flex-start; + transition: background 0.1s; +} +.notification-item:last-child { border-bottom: none; } +.notification-item:hover { background: var(--color-bg); } +.notification-item.unread { background: var(--color-accent-light); } +.notification-item .notif-icon { + font-size: 20px; + color: var(--color-accent); + flex-shrink: 0; + margin-top: 2px; +} +.notification-item .notif-icon.update { color: var(--color-warning); } +.notification-item .notif-icon.publish { color: var(--color-success); } +.notification-item .notif-icon.system { color: var(--color-info); } +.notification-item .notif-body { flex: 1; min-width: 0; } +.notification-item .notif-title { + font-size: 13px; + font-weight: 600; + color: var(--color-text-title); + margin: 0 0 2px; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; +} +.notification-item .notif-message { + font-size: 12px; + color: var(--color-text-light); + margin: 0; + display: -webkit-box; + -webkit-line-clamp: 2; + -webkit-box-orient: vertical; + overflow: hidden; +} +.notification-empty { + padding: 32px 16px; + text-align: center; + color: var(--color-text-muted); + font-size: 13px; +} + /* ===== Buttons ===== */ .btn { display: inline-flex; diff --git a/static/js/app.js b/static/js/app.js index 1a1f612..c4be247 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -58,4 +58,94 @@ document.addEventListener('DOMContentLoaded', () => { if (e.target === overlay) overlay.classList.remove('open'); }); }); + + // ---- Notification bell ---- + const bell = document.getElementById('notification-bell'); + const bellBadge = document.getElementById('bell-badge'); + const dropdown = document.getElementById('notification-dropdown'); + const notifList = document.getElementById('notification-list'); + const markAllBtn = document.getElementById('mark-all-read'); + + if (bell && dropdown) { + bell.addEventListener('click', (e) => { + e.stopPropagation(); + const isOpen = dropdown.classList.toggle('open'); + if (isOpen) fetchNotifications(); + }); + + document.addEventListener('click', (e) => { + if (!dropdown.contains(e.target) && !bell.contains(e.target)) { + dropdown.classList.remove('open'); + } + }); + + if (markAllBtn) { + markAllBtn.addEventListener('click', async () => { + try { + await fetch('/api/v1/notifications/read', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ all: true }), + }); + fetchNotifications(); + pollUnreadCount(); + } catch (_) {} + }); + } + + function notifIcon(kind) { + switch (kind) { + case 'stack_update_available': + return 'system_update'; + case 'stack_published': + return 'new_releases'; + default: + return 'info'; + } + } + + async function fetchNotifications() { + try { + const resp = await fetch('/api/v1/notifications'); + if (!resp.ok) return; + const data = await resp.json(); + if (!data.notifications || data.notifications.length === 0) { + notifList.innerHTML = '
No notifications
'; + return; + } + notifList.innerHTML = data.notifications.map(n => + `
+ ${notifIcon(n.kind)} +
+

${escapeHtml(n.title)}

+

${escapeHtml(n.message)}

+
+
` + ).join(''); + } catch (_) {} + } + + async function pollUnreadCount() { + try { + const resp = await fetch('/api/v1/notifications/unread-count'); + if (!resp.ok) return; + const data = await resp.json(); + const count = data.unread_count || 0; + if (bellBadge) { + bellBadge.textContent = count > 99 ? '99+' : String(count); + bellBadge.classList.toggle('has-unread', count > 0); + } + } catch (_) {} + } + + function escapeHtml(str) { + const d = document.createElement('div'); + d.textContent = str; + return d.innerHTML; + } + + // Initial poll + periodic refresh + pollUnreadCount(); + setInterval(pollUnreadCount, 60000); + } }); \ No newline at end of file diff --git a/templates/base.html b/templates/base.html index f3e76cb..1549fd8 100644 --- a/templates/base.html +++ b/templates/base.html @@ -52,6 +52,21 @@

{% block page_title %}Dashboard{% endblock %}

+
+ +
+
+

Notifications

+ +
+
+
No notifications
+
+
+
{% block topbar_actions %}{% endblock %}
diff --git a/templates/marketplace.html b/templates/marketplace.html index bee0fb8..9d73193 100644 --- a/templates/marketplace.html +++ b/templates/marketplace.html @@ -32,7 +32,15 @@ {% endif %}
-
{{ stack.name }}
+
+ {{ stack.name }} + +
{{ stack.description }}
{{ stack.author }} @@ -145,5 +153,26 @@

Deploy Stack

document.getElementById('deploy-status').textContent = 'Network error: ' + err.message; } } + +// Show "Update Available" badges from notifications +(async function checkUpdateBadges() { + try { + const resp = await fetch('/api/v1/notifications'); + if (!resp.ok) return; + const data = await resp.json(); + if (!data.notifications) return; + const updateIds = new Set(); + data.notifications.forEach(n => { + if (n.kind === 'stack_update_available' && n.stack_id) { + updateIds.add(n.stack_id); + } + }); + document.querySelectorAll('.update-badge[data-stack-id]').forEach(el => { + if (updateIds.has(el.dataset.stackId)) { + el.style.display = 'inline'; + } + }); + } catch (_) {} +})(); {% endblock %} diff --git a/tests/http_routes.rs b/tests/http_routes.rs index 8068dd7..1c1a3ee 100644 --- a/tests/http_routes.rs +++ b/tests/http_routes.rs @@ -479,3 +479,193 @@ async fn test_backup_download_success() { let body = response.into_body().collect().await.unwrap().to_bytes(); assert_eq!(body.as_ref(), b"test backup content"); } + +// ---- Notification endpoint tests ---- + +#[tokio::test] +async fn test_notifications_unread_count_starts_at_zero() { + let app = test_router(); + + let response = app + .oneshot( + Request::builder() + .uri("/api/v1/notifications/unread-count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["unread_count"], 0); +} + +#[tokio::test] +async fn test_notifications_list_empty() { + let app = test_router(); + + let response = app + .oneshot( + Request::builder() + .uri("/api/v1/notifications") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["unread_count"], 0); + assert_eq!(json["notifications"].as_array().unwrap().len(), 0); +} + +#[tokio::test] +async fn test_notifications_mark_read_all_on_empty() { + let app = test_router(); + + let response = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/v1/notifications/read") + .header("Content-Type", "application/json") + .body(Body::from(r#"{"all": true}"#)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["status"], "ok"); +} + +#[tokio::test] +async fn test_notifications_full_lifecycle() { + use status_panel::comms::notifications::{self, Notification, NotificationKind}; + + let state = Arc::new(AppState::new(test_config(), false, None)); + let app = create_router(state.clone()); + + // Seed notifications into the store directly + let notifs = vec![ + Notification { + id: "test-1".to_string(), + kind: NotificationKind::StackUpdateAvailable, + title: "Update for MyStack".to_string(), + message: "Version 2.0 is available".to_string(), + stack_id: Some("stack-1".to_string()), + stack_name: Some("MyStack".to_string()), + new_version: Some("2.0".to_string()), + created_at: "2026-04-12T00:00:00Z".to_string(), + read: false, + }, + Notification { + id: "test-2".to_string(), + kind: NotificationKind::StackPublished, + title: "New stack: CoolApp".to_string(), + message: "CoolApp has been published".to_string(), + stack_id: Some("stack-2".to_string()), + stack_name: Some("CoolApp".to_string()), + new_version: None, + created_at: "2026-04-12T01:00:00Z".to_string(), + read: false, + }, + ]; + notifications::merge_notifications(&state.notification_store, notifs).await; + + // Check unread count = 2 + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/api/v1/notifications/unread-count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["unread_count"], 2); + + // List all notifications + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/api/v1/notifications") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["unread_count"], 2); + assert_eq!(json["notifications"].as_array().unwrap().len(), 2); + + // Mark one as read + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/api/v1/notifications/read") + .header("Content-Type", "application/json") + .body(Body::from(r#"{"ids": ["test-1"]}"#)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Unread count should now be 1 + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/api/v1/notifications/unread-count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["unread_count"], 1); + + // Mark all as read + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/api/v1/notifications/read") + .header("Content-Type", "application/json") + .body(Body::from(r#"{"all": true}"#)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Unread count should now be 0 + let response = app + .oneshot( + Request::builder() + .uri("/api/v1/notifications/unread-count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["unread_count"], 0); +}