From 749c5592c41f037dda8f92bc36263d0401441c94 Mon Sep 17 00:00:00 2001 From: Arjun Komath Date: Mon, 29 Jun 2026 09:35:29 +1000 Subject: [PATCH] Trigger agent upgrades from control plane Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019f103a-e558-7226-a539-d795e27e4472 --- agent/cmd/agent/main.go | 1 + agent/internal/agent/upgrade.go | 207 ++++++++++++++++++ agent/internal/agent/workqueue.go | 22 +- web/actions/servers.ts | 10 + .../dashboard/servers/[id]/page.tsx | 6 +- web/components/server/agent-update-nudge.tsx | 82 ++++++- web/db/queries.ts | 8 + web/db/schema.ts | 25 ++- web/lib/agent-status.ts | 33 +++ web/lib/agent-upgrades.ts | 136 ++++++++++++ web/lib/inngest/functions/crons.ts | 15 ++ web/lib/inngest/functions/index.ts | 1 + web/lib/scheduler.ts | 39 ++++ web/lib/work-queue.ts | 89 +++++++- 14 files changed, 658 insertions(+), 16 deletions(-) create mode 100644 agent/internal/agent/upgrade.go create mode 100644 web/lib/agent-upgrades.ts diff --git a/agent/cmd/agent/main.go b/agent/cmd/agent/main.go index da0c7961..f8e12401 100644 --- a/agent/cmd/agent/main.go +++ b/agent/cmd/agent/main.go @@ -102,6 +102,7 @@ func main() { if err := os.MkdirAll(dataDir, 0o700); err != nil { log.Fatalf("Failed to create data directory: %v", err) } + agent.CheckPendingUpgradeMarker(dataDir) keyDir := filepath.Join(dataDir, "keys") diff --git a/agent/internal/agent/upgrade.go b/agent/internal/agent/upgrade.go new file mode 100644 index 00000000..d240e102 --- /dev/null +++ b/agent/internal/agent/upgrade.go @@ -0,0 +1,207 @@ +package agent + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "regexp" + "runtime" + "strings" + "time" + + agenthttp "techulus/cloud-agent/internal/http" + "techulus/cloud-agent/internal/paths" +) + +const ( + agentBinaryPath = "/usr/local/bin/techulus-agent" + agentPreviousPath = "/usr/local/bin/techulus-agent.previous" + agentUpgradeMarkerFile = "upgrade-pending.json" + agentReleaseBaseURL = "https://github.com/techulus/cloud/releases/download" +) + +var ( + targetVersionPattern = regexp.MustCompile(`^v\d+\.\d+\.\d+(?:-[0-9A-Za-z.-]+)?$`) + sha256Pattern = regexp.MustCompile(`^[0-9a-f]{64}$`) + errAgentUpgradeRestartNeeded = errors.New("agent upgrade restart needed") +) + +type agentUpgradeMarker struct { + TargetVersion string `json:"targetVersion"` +} + +func CheckPendingUpgradeMarker(dataDir string) { + marker, err := readAgentUpgradeMarker(dataDir) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + log.Printf("[upgrade] failed to read upgrade marker: %v", err) + } + return + } + + if marker.TargetVersion == Version { + if err := os.Remove(agentUpgradeMarkerPath(dataDir)); err != nil && !errors.Is(err, os.ErrNotExist) { + log.Printf("[upgrade] failed to remove upgrade marker: %v", err) + } + log.Printf("[upgrade] completed upgrade to %s", Version) + return + } + + if _, err := os.Stat(agentPreviousPath); err != nil { + log.Printf("[upgrade] pending upgrade to %s did not boot target version %s and no previous binary is available: %v", marker.TargetVersion, Version, err) + if removeErr := os.Remove(agentUpgradeMarkerPath(dataDir)); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) { + log.Printf("[upgrade] failed to remove unrecoverable upgrade marker: %v", removeErr) + } + return + } + + log.Printf("[upgrade] restoring previous binary after failed upgrade to %s (running %s)", marker.TargetVersion, Version) + if err := copyFile(agentPreviousPath, agentBinaryPath, 0o755); err != nil { + log.Printf("[upgrade] failed to restore previous binary: %v", err) + return + } + if err := os.Chmod(agentBinaryPath, 0o755); err != nil { + log.Printf("[upgrade] failed to chmod restored binary: %v", err) + return + } + if err := os.Remove(agentUpgradeMarkerPath(dataDir)); err != nil && !errors.Is(err, os.ErrNotExist) { + log.Printf("[upgrade] failed to remove upgrade marker after rollback: %v", err) + } + os.Exit(0) +} + +func (a *Agent) ProcessAgentUpgrade(item agenthttp.WorkQueueItem) error { + var payload struct { + TargetVersion string `json:"targetVersion"` + ExpectedSHA256 string `json:"expectedSha256"` + } + + if err := json.Unmarshal([]byte(item.Payload), &payload); err != nil { + return fmt.Errorf("failed to parse upgrade_agent payload: %w", err) + } + + targetVersion := strings.TrimSpace(payload.TargetVersion) + if targetVersion == Version { + log.Printf("[upgrade] already running target version %s", targetVersion) + return nil + } + if !targetVersionPattern.MatchString(targetVersion) { + return fmt.Errorf("invalid target version: %s", targetVersion) + } + if runtime.GOOS != "linux" { + return fmt.Errorf("agent upgrades are only supported on linux") + } + arch := runtime.GOARCH + if arch != "amd64" && arch != "arm64" { + return fmt.Errorf("unsupported architecture: %s", arch) + } + expectedSHA256 := strings.ToLower(strings.TrimSpace(payload.ExpectedSHA256)) + if !sha256Pattern.MatchString(expectedSHA256) { + return fmt.Errorf("expectedSha256 is required") + } + + log.Printf("[upgrade] installing agent %s for linux/%s", targetVersion, arch) + tmpPath := filepath.Join(filepath.Dir(agentBinaryPath), fmt.Sprintf(".techulus-agent-%s.tmp", targetVersion)) + defer os.Remove(tmpPath) + + assetURL := fmt.Sprintf("%s/%s/agent-linux-%s", agentReleaseBaseURL, targetVersion, arch) + if err := downloadFile(assetURL, tmpPath); err != nil { + return err + } + if err := verifySHA256(tmpPath, expectedSHA256); err != nil { + return err + } + if err := os.Chmod(tmpPath, 0o755); err != nil { + return fmt.Errorf("failed to chmod new agent binary: %w", err) + } + + if err := copyFile(agentBinaryPath, agentPreviousPath, 0o755); err != nil { + return fmt.Errorf("failed to back up current agent binary: %w", err) + } + if err := os.Chmod(agentPreviousPath, 0o755); err != nil { + return fmt.Errorf("failed to chmod backed up agent binary: %w", err) + } + if err := writeAgentUpgradeMarker(a.DataDir, targetVersion); err != nil { + return err + } + if err := os.Rename(tmpPath, agentBinaryPath); err != nil { + return fmt.Errorf("failed to install new agent binary: %w", err) + } + + log.Printf("[upgrade] installed %s; restart required", targetVersion) + return errAgentUpgradeRestartNeeded +} + +func agentUpgradeMarkerPath(dataDir string) string { + if dataDir == "" { + dataDir = paths.DataDir + } + return filepath.Join(dataDir, agentUpgradeMarkerFile) +} + +func readAgentUpgradeMarker(dataDir string) (*agentUpgradeMarker, error) { + data, err := os.ReadFile(agentUpgradeMarkerPath(dataDir)) + if err != nil { + return nil, err + } + var marker agentUpgradeMarker + if err := json.Unmarshal(data, &marker); err != nil { + return nil, err + } + return &marker, nil +} + +func writeAgentUpgradeMarker(dataDir, targetVersion string) error { + data, err := json.Marshal(agentUpgradeMarker{TargetVersion: targetVersion}) + if err != nil { + return err + } + return os.WriteFile(agentUpgradeMarkerPath(dataDir), data, 0o600) +} + +func downloadFile(url, destPath string) error { + client := &http.Client{Timeout: 2 * time.Minute} + resp, err := client.Get(url) + if err != nil { + return fmt.Errorf("failed to download agent binary: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("agent binary download failed with status %d", resp.StatusCode) + } + + file, err := os.OpenFile(destPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + return fmt.Errorf("failed to create temp agent binary: %w", err) + } + defer file.Close() + if _, err := io.Copy(file, resp.Body); err != nil { + return fmt.Errorf("failed to write temp agent binary: %w", err) + } + return nil +} + +func verifySHA256(path, expected string) error { + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("failed to open downloaded agent binary: %w", err) + } + defer file.Close() + + hash := sha256.New() + if _, err := io.Copy(hash, file); err != nil { + return fmt.Errorf("failed to hash downloaded agent binary: %w", err) + } + actual := hex.EncodeToString(hash.Sum(nil)) + if actual != expected { + return fmt.Errorf("checksum verification failed") + } + return nil +} diff --git a/agent/internal/agent/workqueue.go b/agent/internal/agent/workqueue.go index 8eb02fa0..77aead2b 100644 --- a/agent/internal/agent/workqueue.go +++ b/agent/internal/agent/workqueue.go @@ -1,8 +1,10 @@ package agent import ( + "errors" "fmt" "log" + "os" "time" agenthttp "techulus/cloud-agent/internal/http" @@ -84,16 +86,21 @@ func (a *Agent) processLeasedWorkItem(item agenthttp.WorkQueueItem) { status := "completed" errorMsg := "" + restartAfterReport := false if err := a.ProcessWorkItem(item); err != nil { - status = "failed" - errorMsg = err.Error() - log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), err) + if errors.Is(err, errAgentUpgradeRestartNeeded) { + restartAfterReport = true + } else { + status = "failed" + errorMsg = err.Error() + log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), err) + } } else { log.Printf("[work-queue] item %s completed", Truncate(item.ID, 8)) } a.workMutex.Lock() - if a.activeWorkItem != nil && a.activeWorkItem.ID == item.ID && a.activeWorkItem.Attempt == item.Attempt { + if !restartAfterReport && a.activeWorkItem != nil && a.activeWorkItem.ID == item.ID && a.activeWorkItem.Attempt == item.Attempt { a.activeWorkItem = nil } a.pendingWorkResults = append(a.pendingWorkResults, agenthttp.CompletedWorkItem{ @@ -105,6 +112,11 @@ func (a *Agent) processLeasedWorkItem(item agenthttp.WorkQueueItem) { a.workMutex.Unlock() a.RequestStatusReport("work item " + status) + if restartAfterReport { + a.reportStatus("agent upgrade completed") + log.Printf("[upgrade] exiting so systemd restarts the upgraded agent") + os.Exit(0) + } } func (a *Agent) ProcessWorkItem(item agenthttp.WorkQueueItem) error { @@ -128,6 +140,8 @@ func (a *Agent) ProcessWorkItem(item agenthttp.WorkQueueItem) error { return a.ProcessRestoreVolume(item) case "create_manifest": return a.ProcessCreateManifest(item) + case "upgrade_agent": + return a.ProcessAgentUpgrade(item) default: return fmt.Errorf("unknown work item type: %s", item.Type) } diff --git a/web/actions/servers.ts b/web/actions/servers.ts index cda97409..36966743 100644 --- a/web/actions/servers.ts +++ b/web/actions/servers.ts @@ -2,9 +2,11 @@ import { randomBytes } from "node:crypto"; import { eq } from "drizzle-orm"; +import { revalidatePath } from "next/cache"; import { ZodError } from "zod"; import { db } from "@/db"; import { servers } from "@/db/schema"; +import { enqueueAgentUpgrade } from "@/lib/agent-upgrades"; import { requireAuth } from "@/lib/auth"; import { nameSchema } from "@/lib/schemas"; import { getZodErrorMessage } from "@/lib/utils"; @@ -66,3 +68,11 @@ export async function updateServerName(id: string, name: string) { throw error; } } + +export async function upgradeAgent(serverId: string, targetVersion: string) { + await requireAuth(); + const result = await enqueueAgentUpgrade(serverId, targetVersion); + revalidatePath("/dashboard/servers"); + revalidatePath(`/dashboard/servers/${serverId}`); + return result; +} diff --git a/web/app/(dashboard)/dashboard/servers/[id]/page.tsx b/web/app/(dashboard)/dashboard/servers/[id]/page.tsx index cd95ecd3..ca9ad179 100644 --- a/web/app/(dashboard)/dashboard/servers/[id]/page.tsx +++ b/web/app/(dashboard)/dashboard/servers/[id]/page.tsx @@ -68,9 +68,13 @@ export default async function ServerDetailPage({ {hasUpdate && ( )} diff --git a/web/components/server/agent-update-nudge.tsx b/web/components/server/agent-update-nudge.tsx index 0651befe..3764beea 100644 --- a/web/components/server/agent-update-nudge.tsx +++ b/web/components/server/agent-update-nudge.tsx @@ -1,9 +1,14 @@ "use client"; -import { useState } from "react"; import { ArrowUpCircle } from "lucide-react"; +import { useRouter } from "next/navigation"; +import { useState, useTransition } from "react"; +import { toast } from "sonner"; +import { upgradeAgent } from "@/actions/servers"; +import { Button } from "@/components/ui/button"; import { Dialog, + DialogClose, DialogContent, DialogDescription, DialogFooter, @@ -12,17 +17,49 @@ import { } from "@/components/ui/dialog"; interface AgentUpdateNudgeProps { + serverId: string; currentVersion: string; latestVersion: string; - appUrl: string; + serverStatus: "pending" | "online" | "offline" | "unknown"; + upgradeStatus: "idle" | "queued" | "upgrading" | "succeeded" | "failed"; + upgradeTargetVersion: string | null; + upgradeError: string | null; } export function AgentUpdateNudge({ + serverId, currentVersion, latestVersion, - appUrl, + serverStatus, + upgradeStatus, + upgradeTargetVersion, + upgradeError, }: AgentUpdateNudgeProps) { const [open, setOpen] = useState(false); + const [isPending, startTransition] = useTransition(); + const router = useRouter(); + const isTargetUpgradeActive = + upgradeTargetVersion === latestVersion && + (upgradeStatus === "queued" || upgradeStatus === "upgrading"); + const disabled = + serverStatus !== "online" || isTargetUpgradeActive || isPending; + + const handleUpgrade = () => { + startTransition(async () => { + try { + await upgradeAgent(serverId, latestVersion); + toast.success("Agent upgrade queued"); + setOpen(false); + router.refresh(); + } catch (error) { + toast.error( + error instanceof Error + ? error.message + : "Failed to queue agent upgrade", + ); + } + }); + }; return ( <> @@ -45,17 +82,46 @@ export function AgentUpdateNudge({ Update Agent - Run the following command on your server to update the agent from{" "} + Queue an upgrade for this server from{" "} {currentVersion} to{" "} {latestVersion}. - - sudo bash -c "$(curl -fsSL {appUrl}/update.sh)" - +
+ The control plane will send a signed work item to the agent. The + agent downloads the release binary, verifies its checksum, and + restarts itself after installation. +
+ + {isTargetUpgradeActive && ( +

+ Upgrade is already {upgradeStatus} for this version. +

+ )} + {upgradeStatus === "failed" && upgradeError && ( +

+ Last failure: {upgradeError} +

+ )} + {serverStatus !== "online" && ( +

+ Server must be online before an upgrade can be queued. +

+ )} - + + }> + Cancel + + + diff --git a/web/db/queries.ts b/web/db/queries.ts index 7a95274d..800895a4 100644 --- a/web/db/queries.ts +++ b/web/db/queries.ts @@ -112,6 +112,10 @@ export async function getServerDetails(id: string) { networkHealth: servers.networkHealth, containerHealth: servers.containerHealth, agentHealth: servers.agentHealth, + agentUpgradeTargetVersion: servers.agentUpgradeTargetVersion, + agentUpgradeStatus: servers.agentUpgradeStatus, + agentUpgradeStartedAt: servers.agentUpgradeStartedAt, + agentUpgradeError: servers.agentUpgradeError, }) .from(servers) .where(eq(servers.id, id)); @@ -129,6 +133,10 @@ export async function getClusterHealth() { networkHealth: servers.networkHealth, containerHealth: servers.containerHealth, agentHealth: servers.agentHealth, + agentUpgradeTargetVersion: servers.agentUpgradeTargetVersion, + agentUpgradeStatus: servers.agentUpgradeStatus, + agentUpgradeStartedAt: servers.agentUpgradeStartedAt, + agentUpgradeError: servers.agentUpgradeError, }) .from(servers); diff --git a/web/db/schema.ts b/web/db/schema.ts index da3751cd..a6f36982 100644 --- a/web/db/schema.ts +++ b/web/db/schema.ts @@ -1,4 +1,4 @@ -import { relations } from "drizzle-orm"; +import { relations, sql } from "drizzle-orm"; import { bigint, boolean, @@ -208,6 +208,13 @@ export type AgentHealth = { uptimeSecs: number; }; +export type AgentUpgradeStatus = + | "idle" + | "queued" + | "upgrading" + | "succeeded" + | "failed"; + export const servers = pgTable("servers", { id: text("id").primaryKey(), name: text("name").notNull(), @@ -229,6 +236,16 @@ export const servers = pgTable("servers", { networkHealth: jsonb("network_health").$type(), containerHealth: jsonb("container_health").$type(), agentHealth: jsonb("agent_health").$type(), + agentUpgradeTargetVersion: text("agent_upgrade_target_version"), + agentUpgradeStatus: text("agent_upgrade_status", { + enum: ["idle", "queued", "upgrading", "succeeded", "failed"], + }) + .notNull() + .default("idle"), + agentUpgradeStartedAt: timestamp("agent_upgrade_started_at", { + withTimezone: true, + }), + agentUpgradeError: text("agent_upgrade_error"), agentToken: text("agent_token"), tokenCreatedAt: timestamp("token_created_at", { withTimezone: true }), tokenUsedAt: timestamp("token_used_at", { withTimezone: true }), @@ -538,6 +555,7 @@ export const workQueue = pgTable( "backup_volume", "restore_volume", "create_manifest", + "upgrade_agent", ], }).notNull(), payload: text("payload").notNull(), @@ -554,6 +572,11 @@ export const workQueue = pgTable( }, (table) => [ index("work_queue_server_status_idx").on(table.serverId, table.status), + uniqueIndex("work_queue_one_active_agent_upgrade_idx") + .on(table.serverId) + .where( + sql`${table.type} = 'upgrade_agent' AND ${table.status} IN ('pending', 'processing')`, + ), ], ); diff --git a/web/lib/agent-status.ts b/web/lib/agent-status.ts index b1fadcf1..a6d49beb 100644 --- a/web/lib/agent-status.ts +++ b/web/lib/agent-status.ts @@ -9,6 +9,7 @@ import { rollouts, servers, services, + workQueue, } from "@/db/schema"; import { AUTOHEAL_MAX_RECREATES, @@ -160,6 +161,7 @@ export async function applyStatusReport( lastHeartbeat: new Date(), status: "online", }; + let completedAgentUpgradeTarget: string | null = null; if (report.resources) { if (report.resources.cpuCores !== undefined) { @@ -191,9 +193,40 @@ export async function applyStatusReport( } if (report.agentHealth) { updateData.agentHealth = report.agentHealth; + + const [server] = await db + .select({ + agentUpgradeTargetVersion: servers.agentUpgradeTargetVersion, + agentUpgradeStatus: servers.agentUpgradeStatus, + }) + .from(servers) + .where(eq(servers.id, serverId)) + .limit(1); + + if ( + server?.agentUpgradeTargetVersion === report.agentHealth.version && + server.agentUpgradeStatus !== "succeeded" && + server.agentUpgradeStatus !== "idle" + ) { + updateData.agentUpgradeStatus = "succeeded"; + updateData.agentUpgradeError = null; + completedAgentUpgradeTarget = report.agentHealth.version; + } } await db.update(servers).set(updateData).where(eq(servers.id, serverId)); + if (completedAgentUpgradeTarget) { + await db + .update(workQueue) + .set({ status: "completed" }) + .where( + and( + eq(workQueue.serverId, serverId), + eq(workQueue.type, "upgrade_agent"), + inArray(workQueue.status, ["pending", "processing"]), + ), + ); + } let serverLogName: string | undefined; const getCurrentServerLogName = async () => { diff --git a/web/lib/agent-upgrades.ts b/web/lib/agent-upgrades.ts new file mode 100644 index 00000000..5fbef917 --- /dev/null +++ b/web/lib/agent-upgrades.ts @@ -0,0 +1,136 @@ +import { randomUUID } from "node:crypto"; +import { and, eq } from "drizzle-orm"; +import { db } from "@/db"; +import { servers, workQueue } from "@/db/schema"; + +const GITHUB_RELEASE_BASE_URL = + "https://github.com/techulus/cloud/releases/download"; +const TARGET_VERSION_PATTERN = /^v\d+\.\d+\.\d+(?:-[0-9A-Za-z.-]+)?$/; + +type ServerMeta = { arch?: string; os?: string } | null; + +export function validateAgentTargetVersion(targetVersion: string) { + const version = targetVersion.trim(); + if (!TARGET_VERSION_PATTERN.test(version)) { + throw new Error("Invalid target version"); + } + return version; +} + +function getReleaseArch(meta: ServerMeta) { + if (meta?.os && meta.os !== "linux") { + throw new Error(`Agent upgrades are only supported for Linux servers`); + } + if (meta?.arch === "amd64" || meta?.arch === "arm64") return meta.arch; + throw new Error("Server architecture is unknown or unsupported"); +} + +async function fetchExpectedSha256(targetVersion: string, arch: string) { + const response = await fetch( + `${GITHUB_RELEASE_BASE_URL}/${targetVersion}/checksums.txt`, + { cache: "no-store" }, + ); + if (!response.ok) { + throw new Error(`Failed to fetch release checksums (${response.status})`); + } + + const assetName = `agent-linux-${arch}`; + const checksums = await response.text(); + for (const line of checksums.split("\n")) { + const [checksum, fileName] = line.trim().split(/\s+/); + if (fileName === assetName && /^[0-9a-f]{64}$/i.test(checksum)) { + return checksum.toLowerCase(); + } + } + + throw new Error(`Checksum for ${assetName} was not found`); +} + +export async function enqueueAgentUpgrade( + serverId: string, + targetVersionInput: string, +) { + const targetVersion = validateAgentTargetVersion(targetVersionInput); + + const [server] = await db + .select({ + id: servers.id, + status: servers.status, + meta: servers.meta, + agentHealth: servers.agentHealth, + }) + .from(servers) + .where(eq(servers.id, serverId)) + .limit(1); + + if (!server) throw new Error("Server not found"); + if (server.status !== "online") throw new Error("Server must be online"); + if (server.agentHealth?.version === targetVersion) { + await db + .update(servers) + .set({ + agentUpgradeTargetVersion: targetVersion, + agentUpgradeStatus: "succeeded", + agentUpgradeStartedAt: null, + agentUpgradeError: null, + }) + .where(eq(servers.id, serverId)); + return { status: "succeeded" as const }; + } + + const arch = getReleaseArch(server.meta); + const expectedSha256 = await fetchExpectedSha256(targetVersion, arch); + + try { + await db.transaction(async (tx) => { + await tx + .update(servers) + .set({ + agentUpgradeTargetVersion: targetVersion, + agentUpgradeStatus: "queued", + agentUpgradeStartedAt: null, + agentUpgradeError: null, + }) + .where(eq(servers.id, serverId)); + + await tx.insert(workQueue).values({ + id: randomUUID(), + serverId, + type: "upgrade_agent", + payload: JSON.stringify({ targetVersion, expectedSha256 }), + }); + }); + } catch (error) { + if (isUniqueViolation(error)) { + throw new Error("Agent upgrade already in progress"); + } + throw error; + } + + return { status: "queued" as const }; +} + +function isUniqueViolation(error: unknown) { + return ( + error instanceof Error && + "code" in error && + (error as Error & { code?: string }).code === "23505" + ); +} + +export async function clearCompletedAgentUpgrade(serverId: string) { + await db + .update(servers) + .set({ + agentUpgradeTargetVersion: null, + agentUpgradeStatus: "idle", + agentUpgradeStartedAt: null, + agentUpgradeError: null, + }) + .where( + and( + eq(servers.id, serverId), + eq(servers.agentUpgradeStatus, "succeeded"), + ), + ); +} diff --git a/web/lib/inngest/functions/crons.ts b/web/lib/inngest/functions/crons.ts index 85e4d7ba..78983e04 100644 --- a/web/lib/inngest/functions/crons.ts +++ b/web/lib/inngest/functions/crons.ts @@ -9,6 +9,7 @@ import { checkAndRecoverStaleServers, checkAndRunScheduledDeployments, cleanupStaleItems, + failTimedOutAgentUpgrades, } from "@/lib/scheduler"; import { inngest } from "../client"; @@ -122,3 +123,17 @@ export const staleItemsCleanup = inngest.createFunction( }); }, ); + +export const agentUpgradeTimeoutCheck = inngest.createFunction( + { + id: "cron-agent-upgrade-timeout-check", + triggers: [cron("*/5 * * * *")], + singleton: { mode: "skip" }, + }, + async ({ step }) => { + await step.run("fail-timed-out-agent-upgrades", async () => { + console.log("[cron] checking timed out agent upgrades"); + await failTimedOutAgentUpgrades(); + }); + }, +); diff --git a/web/lib/inngest/functions/index.ts b/web/lib/inngest/functions/index.ts index dd6b545e..ef862f53 100644 --- a/web/lib/inngest/functions/index.ts +++ b/web/lib/inngest/functions/index.ts @@ -2,6 +2,7 @@ export { backupWorkflow } from "./backup-workflow"; export { buildTriggerWorkflow } from "./build-trigger-workflow"; export { buildWorkflow } from "./build-workflow"; export { + agentUpgradeTimeoutCheck, certificateRenewal, challengeCleanup, controlPlaneUpdateCheck, diff --git a/web/lib/scheduler.ts b/web/lib/scheduler.ts index e2edab16..ccb5ed3c 100644 --- a/web/lib/scheduler.ts +++ b/web/lib/scheduler.ts @@ -234,6 +234,45 @@ export async function checkAndRunScheduledDeployments(): Promise { } const OLD_ITEM_THRESHOLD_MS = 90 * 24 * 60 * 60 * 1000; +const AGENT_UPGRADE_TIMEOUT_MS = 5 * 60 * 1000; + +export async function failTimedOutAgentUpgrades(): Promise { + const timeoutThreshold = new Date(Date.now() - AGENT_UPGRADE_TIMEOUT_MS); + + const timedOut = await db + .update(servers) + .set({ + agentUpgradeStatus: "failed", + agentUpgradeError: "Agent did not report the target version in time", + }) + .where( + and( + eq(servers.agentUpgradeStatus, "upgrading"), + lt(servers.agentUpgradeStartedAt, timeoutThreshold), + sql`(${servers.agentHealth}->>'version') IS DISTINCT FROM ${servers.agentUpgradeTargetVersion}`, + ), + ) + .returning({ id: servers.id }); + + if (timedOut.length > 0) { + await db + .update(workQueue) + .set({ status: "failed" }) + .where( + and( + inArray( + workQueue.serverId, + timedOut.map((server) => server.id), + ), + eq(workQueue.type, "upgrade_agent"), + inArray(workQueue.status, ["pending", "processing"]), + ), + ); + console.log( + `[scheduler] marked ${timedOut.length} agent upgrade(s) timed out`, + ); + } +} export async function cleanupStaleItems(): Promise { const workItemLeaseThreshold = new Date( diff --git a/web/lib/work-queue.ts b/web/lib/work-queue.ts index c6bb674a..c592442b 100644 --- a/web/lib/work-queue.ts +++ b/web/lib/work-queue.ts @@ -1,7 +1,7 @@ import { randomUUID } from "node:crypto"; -import { and, eq, sql } from "drizzle-orm"; +import { and, eq, inArray, sql } from "drizzle-orm"; import { db } from "@/db"; -import { deployments, workQueue } from "@/db/schema"; +import { deployments, servers, workQueue } from "@/db/schema"; import type { WorkQueue } from "@/db/types"; import { inngest } from "@/lib/inngest/client"; import { inngestEvents } from "@/lib/inngest/events"; @@ -162,6 +162,9 @@ export async function claimNextWorkItem( const row = rows[0]; if (!row) return null; + if (row.type === "upgrade_agent") { + await markAgentUpgradeStarted(serverId, row.payload); + } return { id: row.id, @@ -171,6 +174,29 @@ export async function claimNextWorkItem( }; } +async function markAgentUpgradeStarted(serverId: string, payloadText: string) { + try { + const payload = JSON.parse(payloadText) as { targetVersion?: string }; + if (!payload.targetVersion) return; + await db + .update(servers) + .set({ + agentUpgradeStatus: "upgrading", + agentUpgradeStartedAt: new Date(), + agentUpgradeError: null, + }) + .where( + and( + eq(servers.id, serverId), + eq(servers.agentUpgradeTargetVersion, payload.targetVersion), + inArray(servers.agentUpgradeStatus, ["queued", "upgrading"]), + ), + ); + } catch (error) { + console.error("[work-queue] failed to mark agent upgrade started:", error); + } +} + async function getRejectionReason( serverId: string, id: string, @@ -205,6 +231,11 @@ async function runWorkItemCompletionSideEffects( return; } + if (item.type === "upgrade_agent" && item.payload) { + await runAgentUpgradeCompletionSideEffects(item, result); + return; + } + if (item.type !== "create_manifest" || !item.payload) { return; } @@ -240,6 +271,60 @@ async function runWorkItemCompletionSideEffects( } } +async function runAgentUpgradeCompletionSideEffects( + item: WorkQueue, + result: WorkItemResult, +): Promise { + try { + const payload = JSON.parse(item.payload) as { targetVersion?: string }; + if (!payload.targetVersion) return; + + if (result.status === "failed") { + await db + .update(servers) + .set({ + agentUpgradeStatus: "failed", + agentUpgradeError: result.error || "Agent upgrade failed", + }) + .where( + and( + eq(servers.id, item.serverId), + eq(servers.agentUpgradeTargetVersion, payload.targetVersion), + ), + ); + return; + } + + const [server] = await db + .select({ agentHealth: servers.agentHealth }) + .from(servers) + .where(eq(servers.id, item.serverId)) + .limit(1); + + await db + .update(servers) + .set({ + agentUpgradeStatus: + server?.agentHealth?.version === payload.targetVersion + ? "succeeded" + : "upgrading", + agentUpgradeStartedAt: item.startedAt ?? new Date(), + agentUpgradeError: null, + }) + .where( + and( + eq(servers.id, item.serverId), + eq(servers.agentUpgradeTargetVersion, payload.targetVersion), + ), + ); + } catch (error) { + console.error( + "[work-queue] failed to run agent upgrade completion side effects:", + error, + ); + } +} + async function runForceCleanupCompletionSideEffects( item: WorkQueue, result: WorkItemResult,