diff --git a/agent/cmd/agent/main.go b/agent/cmd/agent/main.go
index da0c796..f8e1240 100644
--- a/agent/cmd/agent/main.go
+++ b/agent/cmd/agent/main.go
@@ -102,6 +102,7 @@ func main() {
if err := os.MkdirAll(dataDir, 0o700); err != nil {
log.Fatalf("Failed to create data directory: %v", err)
}
+ agent.CheckPendingUpgradeMarker(dataDir)
keyDir := filepath.Join(dataDir, "keys")
diff --git a/agent/internal/agent/upgrade.go b/agent/internal/agent/upgrade.go
new file mode 100644
index 0000000..d240e10
--- /dev/null
+++ b/agent/internal/agent/upgrade.go
@@ -0,0 +1,207 @@
+package agent
+
+import (
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "os"
+ "path/filepath"
+ "regexp"
+ "runtime"
+ "strings"
+ "time"
+
+ agenthttp "techulus/cloud-agent/internal/http"
+ "techulus/cloud-agent/internal/paths"
+)
+
+const (
+ agentBinaryPath = "/usr/local/bin/techulus-agent"
+ agentPreviousPath = "/usr/local/bin/techulus-agent.previous"
+ agentUpgradeMarkerFile = "upgrade-pending.json"
+ agentReleaseBaseURL = "https://github.com/techulus/cloud/releases/download"
+)
+
+var (
+ targetVersionPattern = regexp.MustCompile(`^v\d+\.\d+\.\d+(?:-[0-9A-Za-z.-]+)?$`)
+ sha256Pattern = regexp.MustCompile(`^[0-9a-f]{64}$`)
+ errAgentUpgradeRestartNeeded = errors.New("agent upgrade restart needed")
+)
+
+type agentUpgradeMarker struct {
+ TargetVersion string `json:"targetVersion"`
+}
+
+func CheckPendingUpgradeMarker(dataDir string) {
+ marker, err := readAgentUpgradeMarker(dataDir)
+ if err != nil {
+ if !errors.Is(err, os.ErrNotExist) {
+ log.Printf("[upgrade] failed to read upgrade marker: %v", err)
+ }
+ return
+ }
+
+ if marker.TargetVersion == Version {
+ if err := os.Remove(agentUpgradeMarkerPath(dataDir)); err != nil && !errors.Is(err, os.ErrNotExist) {
+ log.Printf("[upgrade] failed to remove upgrade marker: %v", err)
+ }
+ log.Printf("[upgrade] completed upgrade to %s", Version)
+ return
+ }
+
+ if _, err := os.Stat(agentPreviousPath); err != nil {
+ log.Printf("[upgrade] pending upgrade to %s did not boot target version %s and no previous binary is available: %v", marker.TargetVersion, Version, err)
+ if removeErr := os.Remove(agentUpgradeMarkerPath(dataDir)); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) {
+ log.Printf("[upgrade] failed to remove unrecoverable upgrade marker: %v", removeErr)
+ }
+ return
+ }
+
+ log.Printf("[upgrade] restoring previous binary after failed upgrade to %s (running %s)", marker.TargetVersion, Version)
+ if err := copyFile(agentPreviousPath, agentBinaryPath, 0o755); err != nil {
+ log.Printf("[upgrade] failed to restore previous binary: %v", err)
+ return
+ }
+ if err := os.Chmod(agentBinaryPath, 0o755); err != nil {
+ log.Printf("[upgrade] failed to chmod restored binary: %v", err)
+ return
+ }
+ if err := os.Remove(agentUpgradeMarkerPath(dataDir)); err != nil && !errors.Is(err, os.ErrNotExist) {
+ log.Printf("[upgrade] failed to remove upgrade marker after rollback: %v", err)
+ }
+ os.Exit(0)
+}
+
+func (a *Agent) ProcessAgentUpgrade(item agenthttp.WorkQueueItem) error {
+ var payload struct {
+ TargetVersion string `json:"targetVersion"`
+ ExpectedSHA256 string `json:"expectedSha256"`
+ }
+
+ if err := json.Unmarshal([]byte(item.Payload), &payload); err != nil {
+ return fmt.Errorf("failed to parse upgrade_agent payload: %w", err)
+ }
+
+ targetVersion := strings.TrimSpace(payload.TargetVersion)
+ if targetVersion == Version {
+ log.Printf("[upgrade] already running target version %s", targetVersion)
+ return nil
+ }
+ if !targetVersionPattern.MatchString(targetVersion) {
+ return fmt.Errorf("invalid target version: %s", targetVersion)
+ }
+ if runtime.GOOS != "linux" {
+ return fmt.Errorf("agent upgrades are only supported on linux")
+ }
+ arch := runtime.GOARCH
+ if arch != "amd64" && arch != "arm64" {
+ return fmt.Errorf("unsupported architecture: %s", arch)
+ }
+ expectedSHA256 := strings.ToLower(strings.TrimSpace(payload.ExpectedSHA256))
+ if !sha256Pattern.MatchString(expectedSHA256) {
+ return fmt.Errorf("expectedSha256 is required")
+ }
+
+ log.Printf("[upgrade] installing agent %s for linux/%s", targetVersion, arch)
+ tmpPath := filepath.Join(filepath.Dir(agentBinaryPath), fmt.Sprintf(".techulus-agent-%s.tmp", targetVersion))
+ defer os.Remove(tmpPath)
+
+ assetURL := fmt.Sprintf("%s/%s/agent-linux-%s", agentReleaseBaseURL, targetVersion, arch)
+ if err := downloadFile(assetURL, tmpPath); err != nil {
+ return err
+ }
+ if err := verifySHA256(tmpPath, expectedSHA256); err != nil {
+ return err
+ }
+ if err := os.Chmod(tmpPath, 0o755); err != nil {
+ return fmt.Errorf("failed to chmod new agent binary: %w", err)
+ }
+
+ if err := copyFile(agentBinaryPath, agentPreviousPath, 0o755); err != nil {
+ return fmt.Errorf("failed to back up current agent binary: %w", err)
+ }
+ if err := os.Chmod(agentPreviousPath, 0o755); err != nil {
+ return fmt.Errorf("failed to chmod backed up agent binary: %w", err)
+ }
+ if err := writeAgentUpgradeMarker(a.DataDir, targetVersion); err != nil {
+ return err
+ }
+ if err := os.Rename(tmpPath, agentBinaryPath); err != nil {
+ return fmt.Errorf("failed to install new agent binary: %w", err)
+ }
+
+ log.Printf("[upgrade] installed %s; restart required", targetVersion)
+ return errAgentUpgradeRestartNeeded
+}
+
+func agentUpgradeMarkerPath(dataDir string) string {
+ if dataDir == "" {
+ dataDir = paths.DataDir
+ }
+ return filepath.Join(dataDir, agentUpgradeMarkerFile)
+}
+
+func readAgentUpgradeMarker(dataDir string) (*agentUpgradeMarker, error) {
+ data, err := os.ReadFile(agentUpgradeMarkerPath(dataDir))
+ if err != nil {
+ return nil, err
+ }
+ var marker agentUpgradeMarker
+ if err := json.Unmarshal(data, &marker); err != nil {
+ return nil, err
+ }
+ return &marker, nil
+}
+
+func writeAgentUpgradeMarker(dataDir, targetVersion string) error {
+ data, err := json.Marshal(agentUpgradeMarker{TargetVersion: targetVersion})
+ if err != nil {
+ return err
+ }
+ return os.WriteFile(agentUpgradeMarkerPath(dataDir), data, 0o600)
+}
+
+func downloadFile(url, destPath string) error {
+ client := &http.Client{Timeout: 2 * time.Minute}
+ resp, err := client.Get(url)
+ if err != nil {
+ return fmt.Errorf("failed to download agent binary: %w", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("agent binary download failed with status %d", resp.StatusCode)
+ }
+
+ file, err := os.OpenFile(destPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
+ if err != nil {
+ return fmt.Errorf("failed to create temp agent binary: %w", err)
+ }
+ defer file.Close()
+ if _, err := io.Copy(file, resp.Body); err != nil {
+ return fmt.Errorf("failed to write temp agent binary: %w", err)
+ }
+ return nil
+}
+
+func verifySHA256(path, expected string) error {
+ file, err := os.Open(path)
+ if err != nil {
+ return fmt.Errorf("failed to open downloaded agent binary: %w", err)
+ }
+ defer file.Close()
+
+ hash := sha256.New()
+ if _, err := io.Copy(hash, file); err != nil {
+ return fmt.Errorf("failed to hash downloaded agent binary: %w", err)
+ }
+ actual := hex.EncodeToString(hash.Sum(nil))
+ if actual != expected {
+ return fmt.Errorf("checksum verification failed")
+ }
+ return nil
+}
diff --git a/agent/internal/agent/workqueue.go b/agent/internal/agent/workqueue.go
index 8eb02fa..77aead2 100644
--- a/agent/internal/agent/workqueue.go
+++ b/agent/internal/agent/workqueue.go
@@ -1,8 +1,10 @@
package agent
import (
+ "errors"
"fmt"
"log"
+ "os"
"time"
agenthttp "techulus/cloud-agent/internal/http"
@@ -84,16 +86,21 @@ func (a *Agent) processLeasedWorkItem(item agenthttp.WorkQueueItem) {
status := "completed"
errorMsg := ""
+ restartAfterReport := false
if err := a.ProcessWorkItem(item); err != nil {
- status = "failed"
- errorMsg = err.Error()
- log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), err)
+ if errors.Is(err, errAgentUpgradeRestartNeeded) {
+ restartAfterReport = true
+ } else {
+ status = "failed"
+ errorMsg = err.Error()
+ log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), err)
+ }
} else {
log.Printf("[work-queue] item %s completed", Truncate(item.ID, 8))
}
a.workMutex.Lock()
- if a.activeWorkItem != nil && a.activeWorkItem.ID == item.ID && a.activeWorkItem.Attempt == item.Attempt {
+ if !restartAfterReport && a.activeWorkItem != nil && a.activeWorkItem.ID == item.ID && a.activeWorkItem.Attempt == item.Attempt {
a.activeWorkItem = nil
}
a.pendingWorkResults = append(a.pendingWorkResults, agenthttp.CompletedWorkItem{
@@ -105,6 +112,11 @@ func (a *Agent) processLeasedWorkItem(item agenthttp.WorkQueueItem) {
a.workMutex.Unlock()
a.RequestStatusReport("work item " + status)
+ if restartAfterReport {
+ a.reportStatus("agent upgrade completed")
+ log.Printf("[upgrade] exiting so systemd restarts the upgraded agent")
+ os.Exit(0)
+ }
}
func (a *Agent) ProcessWorkItem(item agenthttp.WorkQueueItem) error {
@@ -128,6 +140,8 @@ func (a *Agent) ProcessWorkItem(item agenthttp.WorkQueueItem) error {
return a.ProcessRestoreVolume(item)
case "create_manifest":
return a.ProcessCreateManifest(item)
+ case "upgrade_agent":
+ return a.ProcessAgentUpgrade(item)
default:
return fmt.Errorf("unknown work item type: %s", item.Type)
}
diff --git a/web/actions/servers.ts b/web/actions/servers.ts
index cda9740..3696674 100644
--- a/web/actions/servers.ts
+++ b/web/actions/servers.ts
@@ -2,9 +2,11 @@
import { randomBytes } from "node:crypto";
import { eq } from "drizzle-orm";
+import { revalidatePath } from "next/cache";
import { ZodError } from "zod";
import { db } from "@/db";
import { servers } from "@/db/schema";
+import { enqueueAgentUpgrade } from "@/lib/agent-upgrades";
import { requireAuth } from "@/lib/auth";
import { nameSchema } from "@/lib/schemas";
import { getZodErrorMessage } from "@/lib/utils";
@@ -66,3 +68,11 @@ export async function updateServerName(id: string, name: string) {
throw error;
}
}
+
+export async function upgradeAgent(serverId: string, targetVersion: string) {
+ await requireAuth();
+ const result = await enqueueAgentUpgrade(serverId, targetVersion);
+ revalidatePath("/dashboard/servers");
+ revalidatePath(`/dashboard/servers/${serverId}`);
+ return result;
+}
diff --git a/web/app/(dashboard)/dashboard/servers/[id]/page.tsx b/web/app/(dashboard)/dashboard/servers/[id]/page.tsx
index cd95ecd..ca9ad17 100644
--- a/web/app/(dashboard)/dashboard/servers/[id]/page.tsx
+++ b/web/app/(dashboard)/dashboard/servers/[id]/page.tsx
@@ -68,9 +68,13 @@ export default async function ServerDetailPage({
{hasUpdate && (
)}
diff --git a/web/components/server/agent-update-nudge.tsx b/web/components/server/agent-update-nudge.tsx
index 0651bef..3764bee 100644
--- a/web/components/server/agent-update-nudge.tsx
+++ b/web/components/server/agent-update-nudge.tsx
@@ -1,9 +1,14 @@
"use client";
-import { useState } from "react";
import { ArrowUpCircle } from "lucide-react";
+import { useRouter } from "next/navigation";
+import { useState, useTransition } from "react";
+import { toast } from "sonner";
+import { upgradeAgent } from "@/actions/servers";
+import { Button } from "@/components/ui/button";
import {
Dialog,
+ DialogClose,
DialogContent,
DialogDescription,
DialogFooter,
@@ -12,17 +17,49 @@ import {
} from "@/components/ui/dialog";
interface AgentUpdateNudgeProps {
+ serverId: string;
currentVersion: string;
latestVersion: string;
- appUrl: string;
+ serverStatus: "pending" | "online" | "offline" | "unknown";
+ upgradeStatus: "idle" | "queued" | "upgrading" | "succeeded" | "failed";
+ upgradeTargetVersion: string | null;
+ upgradeError: string | null;
}
export function AgentUpdateNudge({
+ serverId,
currentVersion,
latestVersion,
- appUrl,
+ serverStatus,
+ upgradeStatus,
+ upgradeTargetVersion,
+ upgradeError,
}: AgentUpdateNudgeProps) {
const [open, setOpen] = useState(false);
+ const [isPending, startTransition] = useTransition();
+ const router = useRouter();
+ const isTargetUpgradeActive =
+ upgradeTargetVersion === latestVersion &&
+ (upgradeStatus === "queued" || upgradeStatus === "upgrading");
+ const disabled =
+ serverStatus !== "online" || isTargetUpgradeActive || isPending;
+
+ const handleUpgrade = () => {
+ startTransition(async () => {
+ try {
+ await upgradeAgent(serverId, latestVersion);
+ toast.success("Agent upgrade queued");
+ setOpen(false);
+ router.refresh();
+ } catch (error) {
+ toast.error(
+ error instanceof Error
+ ? error.message
+ : "Failed to queue agent upgrade",
+ );
+ }
+ });
+ };
return (
<>
@@ -45,17 +82,46 @@ export function AgentUpdateNudge({
Update Agent
- Run the following command on your server to update the agent from{" "}
+ Queue an upgrade for this server from{" "}
{currentVersion} to{" "}
{latestVersion} .
-
- sudo bash -c "$(curl -fsSL {appUrl}/update.sh)"
-
+
+ The control plane will send a signed work item to the agent. The
+ agent downloads the release binary, verifies its checksum, and
+ restarts itself after installation.
+
+
+ {isTargetUpgradeActive && (
+
+ Upgrade is already {upgradeStatus} for this version.
+
+ )}
+ {upgradeStatus === "failed" && upgradeError && (
+
+ Last failure: {upgradeError}
+
+ )}
+ {serverStatus !== "online" && (
+
+ Server must be online before an upgrade can be queued.
+
+ )}
-
+
+ }>
+ Cancel
+
+
+ {isPending
+ ? "Queueing..."
+ : isTargetUpgradeActive
+ ? "Upgrade queued"
+ : "Queue upgrade"}
+
+
>
diff --git a/web/db/queries.ts b/web/db/queries.ts
index 7a95274..800895a 100644
--- a/web/db/queries.ts
+++ b/web/db/queries.ts
@@ -112,6 +112,10 @@ export async function getServerDetails(id: string) {
networkHealth: servers.networkHealth,
containerHealth: servers.containerHealth,
agentHealth: servers.agentHealth,
+ agentUpgradeTargetVersion: servers.agentUpgradeTargetVersion,
+ agentUpgradeStatus: servers.agentUpgradeStatus,
+ agentUpgradeStartedAt: servers.agentUpgradeStartedAt,
+ agentUpgradeError: servers.agentUpgradeError,
})
.from(servers)
.where(eq(servers.id, id));
@@ -129,6 +133,10 @@ export async function getClusterHealth() {
networkHealth: servers.networkHealth,
containerHealth: servers.containerHealth,
agentHealth: servers.agentHealth,
+ agentUpgradeTargetVersion: servers.agentUpgradeTargetVersion,
+ agentUpgradeStatus: servers.agentUpgradeStatus,
+ agentUpgradeStartedAt: servers.agentUpgradeStartedAt,
+ agentUpgradeError: servers.agentUpgradeError,
})
.from(servers);
diff --git a/web/db/schema.ts b/web/db/schema.ts
index da3751c..a6f3698 100644
--- a/web/db/schema.ts
+++ b/web/db/schema.ts
@@ -1,4 +1,4 @@
-import { relations } from "drizzle-orm";
+import { relations, sql } from "drizzle-orm";
import {
bigint,
boolean,
@@ -208,6 +208,13 @@ export type AgentHealth = {
uptimeSecs: number;
};
+export type AgentUpgradeStatus =
+ | "idle"
+ | "queued"
+ | "upgrading"
+ | "succeeded"
+ | "failed";
+
export const servers = pgTable("servers", {
id: text("id").primaryKey(),
name: text("name").notNull(),
@@ -229,6 +236,16 @@ export const servers = pgTable("servers", {
networkHealth: jsonb("network_health").$type(),
containerHealth: jsonb("container_health").$type(),
agentHealth: jsonb("agent_health").$type(),
+ agentUpgradeTargetVersion: text("agent_upgrade_target_version"),
+ agentUpgradeStatus: text("agent_upgrade_status", {
+ enum: ["idle", "queued", "upgrading", "succeeded", "failed"],
+ })
+ .notNull()
+ .default("idle"),
+ agentUpgradeStartedAt: timestamp("agent_upgrade_started_at", {
+ withTimezone: true,
+ }),
+ agentUpgradeError: text("agent_upgrade_error"),
agentToken: text("agent_token"),
tokenCreatedAt: timestamp("token_created_at", { withTimezone: true }),
tokenUsedAt: timestamp("token_used_at", { withTimezone: true }),
@@ -538,6 +555,7 @@ export const workQueue = pgTable(
"backup_volume",
"restore_volume",
"create_manifest",
+ "upgrade_agent",
],
}).notNull(),
payload: text("payload").notNull(),
@@ -554,6 +572,11 @@ export const workQueue = pgTable(
},
(table) => [
index("work_queue_server_status_idx").on(table.serverId, table.status),
+ uniqueIndex("work_queue_one_active_agent_upgrade_idx")
+ .on(table.serverId)
+ .where(
+ sql`${table.type} = 'upgrade_agent' AND ${table.status} IN ('pending', 'processing')`,
+ ),
],
);
diff --git a/web/lib/agent-status.ts b/web/lib/agent-status.ts
index b1fadcf..a6d49be 100644
--- a/web/lib/agent-status.ts
+++ b/web/lib/agent-status.ts
@@ -9,6 +9,7 @@ import {
rollouts,
servers,
services,
+ workQueue,
} from "@/db/schema";
import {
AUTOHEAL_MAX_RECREATES,
@@ -160,6 +161,7 @@ export async function applyStatusReport(
lastHeartbeat: new Date(),
status: "online",
};
+ let completedAgentUpgradeTarget: string | null = null;
if (report.resources) {
if (report.resources.cpuCores !== undefined) {
@@ -191,9 +193,40 @@ export async function applyStatusReport(
}
if (report.agentHealth) {
updateData.agentHealth = report.agentHealth;
+
+ const [server] = await db
+ .select({
+ agentUpgradeTargetVersion: servers.agentUpgradeTargetVersion,
+ agentUpgradeStatus: servers.agentUpgradeStatus,
+ })
+ .from(servers)
+ .where(eq(servers.id, serverId))
+ .limit(1);
+
+ if (
+ server?.agentUpgradeTargetVersion === report.agentHealth.version &&
+ server.agentUpgradeStatus !== "succeeded" &&
+ server.agentUpgradeStatus !== "idle"
+ ) {
+ updateData.agentUpgradeStatus = "succeeded";
+ updateData.agentUpgradeError = null;
+ completedAgentUpgradeTarget = report.agentHealth.version;
+ }
}
await db.update(servers).set(updateData).where(eq(servers.id, serverId));
+ if (completedAgentUpgradeTarget) {
+ await db
+ .update(workQueue)
+ .set({ status: "completed" })
+ .where(
+ and(
+ eq(workQueue.serverId, serverId),
+ eq(workQueue.type, "upgrade_agent"),
+ inArray(workQueue.status, ["pending", "processing"]),
+ ),
+ );
+ }
let serverLogName: string | undefined;
const getCurrentServerLogName = async () => {
diff --git a/web/lib/agent-upgrades.ts b/web/lib/agent-upgrades.ts
new file mode 100644
index 0000000..5fbef91
--- /dev/null
+++ b/web/lib/agent-upgrades.ts
@@ -0,0 +1,136 @@
+import { randomUUID } from "node:crypto";
+import { and, eq } from "drizzle-orm";
+import { db } from "@/db";
+import { servers, workQueue } from "@/db/schema";
+
+const GITHUB_RELEASE_BASE_URL =
+ "https://github.com/techulus/cloud/releases/download";
+const TARGET_VERSION_PATTERN = /^v\d+\.\d+\.\d+(?:-[0-9A-Za-z.-]+)?$/;
+
+type ServerMeta = { arch?: string; os?: string } | null;
+
+export function validateAgentTargetVersion(targetVersion: string) {
+ const version = targetVersion.trim();
+ if (!TARGET_VERSION_PATTERN.test(version)) {
+ throw new Error("Invalid target version");
+ }
+ return version;
+}
+
+function getReleaseArch(meta: ServerMeta) {
+ if (meta?.os && meta.os !== "linux") {
+ throw new Error(`Agent upgrades are only supported for Linux servers`);
+ }
+ if (meta?.arch === "amd64" || meta?.arch === "arm64") return meta.arch;
+ throw new Error("Server architecture is unknown or unsupported");
+}
+
+async function fetchExpectedSha256(targetVersion: string, arch: string) {
+ const response = await fetch(
+ `${GITHUB_RELEASE_BASE_URL}/${targetVersion}/checksums.txt`,
+ { cache: "no-store" },
+ );
+ if (!response.ok) {
+ throw new Error(`Failed to fetch release checksums (${response.status})`);
+ }
+
+ const assetName = `agent-linux-${arch}`;
+ const checksums = await response.text();
+ for (const line of checksums.split("\n")) {
+ const [checksum, fileName] = line.trim().split(/\s+/);
+ if (fileName === assetName && /^[0-9a-f]{64}$/i.test(checksum)) {
+ return checksum.toLowerCase();
+ }
+ }
+
+ throw new Error(`Checksum for ${assetName} was not found`);
+}
+
+export async function enqueueAgentUpgrade(
+ serverId: string,
+ targetVersionInput: string,
+) {
+ const targetVersion = validateAgentTargetVersion(targetVersionInput);
+
+ const [server] = await db
+ .select({
+ id: servers.id,
+ status: servers.status,
+ meta: servers.meta,
+ agentHealth: servers.agentHealth,
+ })
+ .from(servers)
+ .where(eq(servers.id, serverId))
+ .limit(1);
+
+ if (!server) throw new Error("Server not found");
+ if (server.status !== "online") throw new Error("Server must be online");
+ if (server.agentHealth?.version === targetVersion) {
+ await db
+ .update(servers)
+ .set({
+ agentUpgradeTargetVersion: targetVersion,
+ agentUpgradeStatus: "succeeded",
+ agentUpgradeStartedAt: null,
+ agentUpgradeError: null,
+ })
+ .where(eq(servers.id, serverId));
+ return { status: "succeeded" as const };
+ }
+
+ const arch = getReleaseArch(server.meta);
+ const expectedSha256 = await fetchExpectedSha256(targetVersion, arch);
+
+ try {
+ await db.transaction(async (tx) => {
+ await tx
+ .update(servers)
+ .set({
+ agentUpgradeTargetVersion: targetVersion,
+ agentUpgradeStatus: "queued",
+ agentUpgradeStartedAt: null,
+ agentUpgradeError: null,
+ })
+ .where(eq(servers.id, serverId));
+
+ await tx.insert(workQueue).values({
+ id: randomUUID(),
+ serverId,
+ type: "upgrade_agent",
+ payload: JSON.stringify({ targetVersion, expectedSha256 }),
+ });
+ });
+ } catch (error) {
+ if (isUniqueViolation(error)) {
+ throw new Error("Agent upgrade already in progress");
+ }
+ throw error;
+ }
+
+ return { status: "queued" as const };
+}
+
+function isUniqueViolation(error: unknown) {
+ return (
+ error instanceof Error &&
+ "code" in error &&
+ (error as Error & { code?: string }).code === "23505"
+ );
+}
+
+export async function clearCompletedAgentUpgrade(serverId: string) {
+ await db
+ .update(servers)
+ .set({
+ agentUpgradeTargetVersion: null,
+ agentUpgradeStatus: "idle",
+ agentUpgradeStartedAt: null,
+ agentUpgradeError: null,
+ })
+ .where(
+ and(
+ eq(servers.id, serverId),
+ eq(servers.agentUpgradeStatus, "succeeded"),
+ ),
+ );
+}
diff --git a/web/lib/inngest/functions/crons.ts b/web/lib/inngest/functions/crons.ts
index 85e4d7b..78983e0 100644
--- a/web/lib/inngest/functions/crons.ts
+++ b/web/lib/inngest/functions/crons.ts
@@ -9,6 +9,7 @@ import {
checkAndRecoverStaleServers,
checkAndRunScheduledDeployments,
cleanupStaleItems,
+ failTimedOutAgentUpgrades,
} from "@/lib/scheduler";
import { inngest } from "../client";
@@ -122,3 +123,17 @@ export const staleItemsCleanup = inngest.createFunction(
});
},
);
+
+export const agentUpgradeTimeoutCheck = inngest.createFunction(
+ {
+ id: "cron-agent-upgrade-timeout-check",
+ triggers: [cron("*/5 * * * *")],
+ singleton: { mode: "skip" },
+ },
+ async ({ step }) => {
+ await step.run("fail-timed-out-agent-upgrades", async () => {
+ console.log("[cron] checking timed out agent upgrades");
+ await failTimedOutAgentUpgrades();
+ });
+ },
+);
diff --git a/web/lib/inngest/functions/index.ts b/web/lib/inngest/functions/index.ts
index dd6b545..ef862f5 100644
--- a/web/lib/inngest/functions/index.ts
+++ b/web/lib/inngest/functions/index.ts
@@ -2,6 +2,7 @@ export { backupWorkflow } from "./backup-workflow";
export { buildTriggerWorkflow } from "./build-trigger-workflow";
export { buildWorkflow } from "./build-workflow";
export {
+ agentUpgradeTimeoutCheck,
certificateRenewal,
challengeCleanup,
controlPlaneUpdateCheck,
diff --git a/web/lib/scheduler.ts b/web/lib/scheduler.ts
index e2edab1..ccb5ed3 100644
--- a/web/lib/scheduler.ts
+++ b/web/lib/scheduler.ts
@@ -234,6 +234,45 @@ export async function checkAndRunScheduledDeployments(): Promise {
}
const OLD_ITEM_THRESHOLD_MS = 90 * 24 * 60 * 60 * 1000;
+const AGENT_UPGRADE_TIMEOUT_MS = 5 * 60 * 1000;
+
+export async function failTimedOutAgentUpgrades(): Promise {
+ const timeoutThreshold = new Date(Date.now() - AGENT_UPGRADE_TIMEOUT_MS);
+
+ const timedOut = await db
+ .update(servers)
+ .set({
+ agentUpgradeStatus: "failed",
+ agentUpgradeError: "Agent did not report the target version in time",
+ })
+ .where(
+ and(
+ eq(servers.agentUpgradeStatus, "upgrading"),
+ lt(servers.agentUpgradeStartedAt, timeoutThreshold),
+ sql`(${servers.agentHealth}->>'version') IS DISTINCT FROM ${servers.agentUpgradeTargetVersion}`,
+ ),
+ )
+ .returning({ id: servers.id });
+
+ if (timedOut.length > 0) {
+ await db
+ .update(workQueue)
+ .set({ status: "failed" })
+ .where(
+ and(
+ inArray(
+ workQueue.serverId,
+ timedOut.map((server) => server.id),
+ ),
+ eq(workQueue.type, "upgrade_agent"),
+ inArray(workQueue.status, ["pending", "processing"]),
+ ),
+ );
+ console.log(
+ `[scheduler] marked ${timedOut.length} agent upgrade(s) timed out`,
+ );
+ }
+}
export async function cleanupStaleItems(): Promise {
const workItemLeaseThreshold = new Date(
diff --git a/web/lib/work-queue.ts b/web/lib/work-queue.ts
index c6bb674..c592442 100644
--- a/web/lib/work-queue.ts
+++ b/web/lib/work-queue.ts
@@ -1,7 +1,7 @@
import { randomUUID } from "node:crypto";
-import { and, eq, sql } from "drizzle-orm";
+import { and, eq, inArray, sql } from "drizzle-orm";
import { db } from "@/db";
-import { deployments, workQueue } from "@/db/schema";
+import { deployments, servers, workQueue } from "@/db/schema";
import type { WorkQueue } from "@/db/types";
import { inngest } from "@/lib/inngest/client";
import { inngestEvents } from "@/lib/inngest/events";
@@ -162,6 +162,9 @@ export async function claimNextWorkItem(
const row = rows[0];
if (!row) return null;
+ if (row.type === "upgrade_agent") {
+ await markAgentUpgradeStarted(serverId, row.payload);
+ }
return {
id: row.id,
@@ -171,6 +174,29 @@ export async function claimNextWorkItem(
};
}
+async function markAgentUpgradeStarted(serverId: string, payloadText: string) {
+ try {
+ const payload = JSON.parse(payloadText) as { targetVersion?: string };
+ if (!payload.targetVersion) return;
+ await db
+ .update(servers)
+ .set({
+ agentUpgradeStatus: "upgrading",
+ agentUpgradeStartedAt: new Date(),
+ agentUpgradeError: null,
+ })
+ .where(
+ and(
+ eq(servers.id, serverId),
+ eq(servers.agentUpgradeTargetVersion, payload.targetVersion),
+ inArray(servers.agentUpgradeStatus, ["queued", "upgrading"]),
+ ),
+ );
+ } catch (error) {
+ console.error("[work-queue] failed to mark agent upgrade started:", error);
+ }
+}
+
async function getRejectionReason(
serverId: string,
id: string,
@@ -205,6 +231,11 @@ async function runWorkItemCompletionSideEffects(
return;
}
+ if (item.type === "upgrade_agent" && item.payload) {
+ await runAgentUpgradeCompletionSideEffects(item, result);
+ return;
+ }
+
if (item.type !== "create_manifest" || !item.payload) {
return;
}
@@ -240,6 +271,60 @@ async function runWorkItemCompletionSideEffects(
}
}
+async function runAgentUpgradeCompletionSideEffects(
+ item: WorkQueue,
+ result: WorkItemResult,
+): Promise {
+ try {
+ const payload = JSON.parse(item.payload) as { targetVersion?: string };
+ if (!payload.targetVersion) return;
+
+ if (result.status === "failed") {
+ await db
+ .update(servers)
+ .set({
+ agentUpgradeStatus: "failed",
+ agentUpgradeError: result.error || "Agent upgrade failed",
+ })
+ .where(
+ and(
+ eq(servers.id, item.serverId),
+ eq(servers.agentUpgradeTargetVersion, payload.targetVersion),
+ ),
+ );
+ return;
+ }
+
+ const [server] = await db
+ .select({ agentHealth: servers.agentHealth })
+ .from(servers)
+ .where(eq(servers.id, item.serverId))
+ .limit(1);
+
+ await db
+ .update(servers)
+ .set({
+ agentUpgradeStatus:
+ server?.agentHealth?.version === payload.targetVersion
+ ? "succeeded"
+ : "upgrading",
+ agentUpgradeStartedAt: item.startedAt ?? new Date(),
+ agentUpgradeError: null,
+ })
+ .where(
+ and(
+ eq(servers.id, item.serverId),
+ eq(servers.agentUpgradeTargetVersion, payload.targetVersion),
+ ),
+ );
+ } catch (error) {
+ console.error(
+ "[work-queue] failed to run agent upgrade completion side effects:",
+ error,
+ );
+ }
+}
+
async function runForceCleanupCompletionSideEffects(
item: WorkQueue,
result: WorkItemResult,