Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/workflow-executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ npm install -g @forestadmin/workflow-executor
| `DATABASE_URL` | ✓* | — | Postgres connection string (*not needed with `--in-memory`) |
| `HTTP_PORT` | — | `3400` | Port for the executor HTTP server |
| `FOREST_SERVER_URL` | — | `https://api.forestadmin.com` | Orchestrator URL |
| `POLLING_INTERVAL_MS` | — | `5000` | Poll cadence for pending steps |
| `STOP_TIMEOUT_MS` | — | `30000` | Graceful shutdown deadline |
| `POLLING_INTERVAL_S` | — | `30` | Poll cadence for pending steps |
| `STOP_TIMEOUT_S` | — | `30` | Graceful shutdown deadline |

Optional AI configuration (all-or-nothing — falls back to server AI if any is missing):

Expand All @@ -44,7 +44,7 @@ forest-workflow-executor
You should see (pretty format when stdout is a TTY):

```
13:33:42 info Workflow executor starting mode="database" forestServerUrl="https://api.forestadmin.com" agentUrl="http://localhost:3351" httpPort=3400 pollingIntervalMs=5000 aiConfig="server fallback"
13:33:42 info Workflow executor starting mode="database" forestServerUrl="https://api.forestadmin.com" agentUrl="http://localhost:3351" httpPort=3400 pollingIntervalS=5 aiConfig="server fallback"
13:33:42 info Workflow executor ready url="http://localhost:3400"
13:33:47 info Poll cycle completed fetched=0 dispatching=0
```
Expand Down Expand Up @@ -77,7 +77,7 @@ curl http://localhost:3400/health
### Graceful shutdown

Send `SIGTERM` or `SIGINT`. The executor drains in-flight steps, closes the HTTP
server, and exits with code `0`. Steps that don't drain within `STOP_TIMEOUT_MS`
server, and exits with code `0`. Steps that don't drain within `STOP_TIMEOUT_S`
are force-killed and the process exits with code `1`.

### Exit codes
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow-executor/example/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ DATABASE_URL=postgres://executor:password@localhost:5459/workflow_executor
# Optional — defaults shown
HTTP_PORT=3400
FOREST_SERVER_URL=https://api.development.forestadmin.com
POLLING_INTERVAL_MS=5000
POLLING_INTERVAL_S=5

# Optional local AI (all-or-nothing — falls back to server AI if any is missing)
# AI_PROVIDER=anthropic
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow-executor/example/.env.executors.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ NODE_TLS_REJECT_UNAUTHORIZED=0


# Optional — default shown.
POLLING_INTERVAL_MS=5000
POLLING_INTERVAL_S=5
4 changes: 2 additions & 2 deletions packages/workflow-executor/example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Expected output:
Forest server : https://api.forestadmin.com
Agent URL : http://localhost:3351
HTTP port : 3400
Polling interval : 5000ms
Polling interval : 5s
AI config : server fallback (no local AI)
[forest-workflow-executor] Ready on http://localhost:3400
{"message":"Poll cycle completed","timestamp":"...","fetched":0,"dispatching":0}
Expand All @@ -67,7 +67,7 @@ curl http://localhost:3400/health

The executor will:
- Auto-create the `workflow_step_executions` table via Umzug migrations
- Poll the Forest Admin orchestrator every `POLLING_INTERVAL_MS` (5s default)
- Poll the Forest Admin orchestrator every `POLLING_INTERVAL_S` (5s default)
- Execute steps locally and report results back

## Available scripts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ x-executor-env: &executor-env
AGENT_URL: ${EXECUTOR_AGENT_URL:-http://host.docker.internal:3351}
FOREST_SERVER_URL: ${FOREST_SERVER_URL:-https://api.development.forestadmin.com}
DATABASE_URL: ${EXECUTOR_DATABASE_URL}
POLLING_INTERVAL_MS: ${POLLING_INTERVAL_MS:-5000}
POLLING_INTERVAL_S: ${POLLING_INTERVAL_S:-5}
HTTP_PORT: "3400"
NODE_TLS_REJECT_UNAUTHORIZED: 0

Expand Down
34 changes: 17 additions & 17 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory';
import ServerAiAdapter from './adapters/server-ai-adapter';
import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_AI_INVOKE_TIMEOUT_S,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_SCHEMA_CACHE_TTL_MS,
DEFAULT_STEP_TIMEOUT_MS,
DEFAULT_POLLING_INTERVAL_S,
DEFAULT_SCHEMA_CACHE_TTL_S,
DEFAULT_STEP_TIMEOUT_S,
} from './defaults';
import ExecutorHttpServer from './http/executor-http-server';
import Runner from './runner';
import SchemaCache from './schema-cache';
import DatabaseStore from './stores/database-store';
import InMemoryStore from './stores/in-memory-store';

const FORCE_EXIT_DELAY_MS = 5000;
const FORCE_EXIT_DELAY_S = 5;

export interface WorkflowExecutor {
start(): Promise<void>;
Expand All @@ -41,15 +41,15 @@ export interface ExecutorOptions {
httpPort: number;
forestServerUrl?: string;
aiConfigurations?: AiConfiguration[];
pollingIntervalMs?: number;
pollingIntervalS?: number;
logger?: Logger;
stopTimeoutMs?: number;
stepTimeoutMs?: number;
aiInvokeTimeoutMs?: number;
stopTimeoutS?: number;
stepTimeoutS?: number;
aiInvokeTimeoutS?: number;
// Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining.
maxChainDepth?: number;
// Collection schema cache TTL in ms. Lower it to pick up orchestrator schema changes sooner.
schemaCacheTtlMs?: number;
// Collection schema cache TTL in seconds. Lower it to pick up orchestrator schema changes sooner.
schemaCacheTtlS?: number;
// Dev only: makes every AI call fail immediately so error paths can be exercised locally.
forceAiError?: boolean;
}
Expand Down Expand Up @@ -96,7 +96,7 @@ function buildCommonDependencies(options: ExecutorOptions) {

// A TTL of 0/negative/non-finite would silently make the cache always-stale, so fall back.
const schemaCache = new SchemaCache(
positiveOrDefault(options.schemaCacheTtlMs, DEFAULT_SCHEMA_CACHE_TTL_MS),
positiveOrDefault(options.schemaCacheTtlS, DEFAULT_SCHEMA_CACHE_TTL_S),
);

const agentPort = new AgentClientAgentPort({
Expand All @@ -121,12 +121,12 @@ function buildCommonDependencies(options: ExecutorOptions) {
aiModelPort,
activityLogPortFactory,
logger,
pollingIntervalMs: options.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS,
pollingIntervalS: options.pollingIntervalS ?? DEFAULT_POLLING_INTERVAL_S,
envSecret: options.envSecret,
authSecret: options.authSecret,
stopTimeoutMs: options.stopTimeoutMs,
stepTimeoutMs: positiveOrDefault(options.stepTimeoutMs, DEFAULT_STEP_TIMEOUT_MS),
aiInvokeTimeoutMs: positiveOrDefault(options.aiInvokeTimeoutMs, DEFAULT_AI_INVOKE_TIMEOUT_MS),
stopTimeoutS: options.stopTimeoutS,
stepTimeoutS: positiveOrDefault(options.stepTimeoutS, DEFAULT_STEP_TIMEOUT_S),
aiInvokeTimeoutS: positiveOrDefault(options.aiInvokeTimeoutS, DEFAULT_AI_INVOKE_TIMEOUT_S),
maxChainDepth: options.maxChainDepth,
};
}
Expand Down Expand Up @@ -169,7 +169,7 @@ function createWorkflowExecutor(
setTimeout(() => {
logger.error('Process did not exit after shutdown — forcing exit', {});
process.exit(process.exitCode ?? 1);
}, FORCE_EXIT_DELAY_MS).unref();
}, FORCE_EXIT_DELAY_S * 1000).unref();
};

return {
Expand Down
32 changes: 16 additions & 16 deletions packages/workflow-executor/src/cli-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import {
type WorkflowExecutor,
} from './build-workflow-executor';
import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_AI_INVOKE_TIMEOUT_S,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_HTTP_PORT,
DEFAULT_MAX_CHAIN_DEPTH,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_SCHEMA_CACHE_TTL_MS,
DEFAULT_STEP_TIMEOUT_MS,
DEFAULT_STOP_TIMEOUT_MS,
DEFAULT_POLLING_INTERVAL_S,
DEFAULT_SCHEMA_CACHE_TTL_S,
DEFAULT_STEP_TIMEOUT_S,
DEFAULT_STOP_TIMEOUT_S,
} from './defaults';
import { ConfigurationError, extractErrorMessage } from './errors';

Expand Down Expand Up @@ -157,12 +157,12 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig
agentUrl: env.AGENT_URL as string,
httpPort: parsePositiveIntEnv('HTTP_PORT', env.HTTP_PORT) ?? DEFAULT_HTTP_PORT,
forestServerUrl: env.FOREST_SERVER_URL,
pollingIntervalMs: parsePositiveIntEnv('POLLING_INTERVAL_MS', env.POLLING_INTERVAL_MS),
stopTimeoutMs: parsePositiveIntEnv('STOP_TIMEOUT_MS', env.STOP_TIMEOUT_MS),
stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS),
aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS),
pollingIntervalS: parsePositiveIntEnv('POLLING_INTERVAL_S', env.POLLING_INTERVAL_S),
stopTimeoutS: parsePositiveIntEnv('STOP_TIMEOUT_S', env.STOP_TIMEOUT_S),
stepTimeoutS: parsePositiveIntEnv('STEP_TIMEOUT_S', env.STEP_TIMEOUT_S),
aiInvokeTimeoutS: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_S', env.AI_INVOKE_TIMEOUT_S),
maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH),
schemaCacheTtlMs: parsePositiveIntEnv('SCHEMA_CACHE_TTL_MS', env.SCHEMA_CACHE_TTL_MS),
schemaCacheTtlS: parsePositiveIntEnv('SCHEMA_CACHE_TTL_S', env.SCHEMA_CACHE_TTL_S),
...(aiConfigurations && { aiConfigurations }),
...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }),
};
Expand Down Expand Up @@ -195,12 +195,12 @@ Required environment variables:
Optional environment variables:
HTTP_PORT Default: ${DEFAULT_HTTP_PORT}
FOREST_SERVER_URL Default: ${DEFAULT_FOREST_SERVER_URL}
POLLING_INTERVAL_MS Default: ${DEFAULT_POLLING_INTERVAL_MS}
STOP_TIMEOUT_MS Default: ${DEFAULT_STOP_TIMEOUT_MS}
STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS})
AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS})
POLLING_INTERVAL_S Default: ${DEFAULT_POLLING_INTERVAL_S}
STOP_TIMEOUT_S Default: ${DEFAULT_STOP_TIMEOUT_S}
STEP_TIMEOUT_S Max duration of a step in seconds (default: ${DEFAULT_STEP_TIMEOUT_S})
AI_INVOKE_TIMEOUT_S Max duration of a single AI provider invocation in seconds (default: ${DEFAULT_AI_INVOKE_TIMEOUT_S})
MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH})
SCHEMA_CACHE_TTL_MS Collection schema cache TTL in ms (default: ${DEFAULT_SCHEMA_CACHE_TTL_MS})
SCHEMA_CACHE_TTL_S Collection schema cache TTL in seconds (default: ${DEFAULT_SCHEMA_CACHE_TTL_S})
NO_COLOR Set to any value to disable ANSI colors in pretty logs
FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths)

Expand Down Expand Up @@ -234,7 +234,7 @@ export function logStartup(logger: Logger, config: CliConfig): void {
forestServerUrl: opts.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL,
agentUrl: opts.agentUrl,
httpPort: opts.httpPort,
pollingIntervalMs: opts.pollingIntervalMs ?? DEFAULT_POLLING_INTERVAL_MS,
pollingIntervalS: opts.pollingIntervalS ?? DEFAULT_POLLING_INTERVAL_S,
aiConfig: aiLabel,
});
}
Expand Down
10 changes: 5 additions & 5 deletions packages/workflow-executor/src/defaults.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
export const DEFAULT_HTTP_PORT = 3400;
export const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com';
export const DEFAULT_POLLING_INTERVAL_MS = 30_000;
export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000;
export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 30_000;
export const DEFAULT_STOP_TIMEOUT_MS = 30_000;
export const DEFAULT_POLLING_INTERVAL_S = 30;
export const DEFAULT_STEP_TIMEOUT_S = 5 * 60;
export const DEFAULT_AI_INVOKE_TIMEOUT_S = 30;
export const DEFAULT_STOP_TIMEOUT_S = 30;
export const DEFAULT_MAX_CHAIN_DEPTH = 50;
export const DEFAULT_SCHEMA_CACHE_TTL_MS = 10 * 60_000;
export const DEFAULT_SCHEMA_CACHE_TTL_S = 10 * 60;
8 changes: 4 additions & 4 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ export class ActivityLogCreationError extends WorkflowExecutorError {
}

export class StepTimeoutError extends WorkflowExecutorError {
constructor(timeoutMs: number) {
constructor(timeoutS: number) {
super(
`Step execution exceeded timeout of ${timeoutMs}ms`,
`Step execution exceeded timeout of ${timeoutS}s`,
'The step took too long to complete. Please try again, or contact your administrator if the problem persists.',
);
}
Expand All @@ -224,9 +224,9 @@ export class StepTimeoutError extends WorkflowExecutorError {
// StepTimeoutError so we can surface a provider-specific message and tune the AI timeout
// independently of the step timeout (AI hangs are common; record fetches are not).
export class AiInvokeTimeoutError extends WorkflowExecutorError {
constructor(timeoutMs: number) {
constructor(timeoutS: number) {
super(
`AI provider did not respond within ${timeoutMs}ms`,
`AI provider did not respond within ${timeoutS}s`,
'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.',
);
}
Expand Down
18 changes: 9 additions & 9 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
if (error instanceof StepTimeoutError) {
this.context.logger.error(error.message, {
...this.logCtx,
timeoutMs: this.context.stepTimeoutMs,
timeoutS: this.context.stepTimeoutS,
});

return this.buildOutcomeResult({ status: 'error', error: error.userMessage });
Expand Down Expand Up @@ -112,8 +112,8 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
// on execPromise must be attached BEFORE the race so a late rejection doesn't trigger
// UnhandledPromiseRejection. Late resolutions are silently discarded.
private async runWithTimeout(): Promise<StepExecutionResult> {
const timeoutMs = this.context.stepTimeoutMs;
if (!timeoutMs || timeoutMs <= 0) return this.doExecute();
const timeoutS = this.context.stepTimeoutS;
if (!timeoutS || timeoutS <= 0) return this.doExecute();

let timer: NodeJS.Timeout | undefined;
let hasTimeoutFired = false;
Expand All @@ -133,8 +133,8 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
new Promise<never>((_, reject) => {
timer = setTimeout(() => {
hasTimeoutFired = true;
reject(new StepTimeoutError(timeoutMs));
}, timeoutMs);
reject(new StepTimeoutError(timeoutS));
}, timeoutS * 1000);
}),
]);
} finally {
Expand Down Expand Up @@ -314,9 +314,9 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
BaseStepExecutor.assertNoMidArraySystemMessages(messages);
const modelWithTools = this.context.model.bindTools(tools, { tool_choice: 'any' });
const preparedMessages = BaseStepExecutor.mergeLeadingSystemMessages(messages);
const aiTimeoutMs = this.context.aiInvokeTimeoutMs;
const timeoutMs = aiTimeoutMs && aiTimeoutMs > 0 ? aiTimeoutMs : undefined;
const signal = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;
const aiTimeoutS = this.context.aiInvokeTimeoutS;
const timeoutS = aiTimeoutS && aiTimeoutS > 0 ? aiTimeoutS : undefined;
const signal = timeoutS ? AbortSignal.timeout(timeoutS * 1000) : undefined;

let response;

Expand All @@ -326,7 +326,7 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
} catch (err) {
// Detect the timeout via our own signal, not the thrown error's name: providers wrap an
// aborted request differently (AbortError, TimeoutError, APIUserAbortError, …).
if (timeoutMs !== undefined && signal?.aborted) throw new AiInvokeTimeoutError(timeoutMs);
if (timeoutS !== undefined && signal?.aborted) throw new AiInvokeTimeoutError(timeoutS);
throw err;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ export interface StepContextConfig {
runStore: RunStore;
schemaCache: SchemaCache;
logger: Logger;
stepTimeoutMs?: number;
aiInvokeTimeoutMs?: number;
stepTimeoutS?: number;
aiInvokeTimeoutS?: number;
}

export default class StepExecutorFactory {
Expand Down Expand Up @@ -159,8 +159,8 @@ export default class StepExecutorFactory {
schemaResolver,
logger: cfg.logger,
incomingPendingData,
stepTimeoutMs: cfg.stepTimeoutMs,
aiInvokeTimeoutMs: cfg.aiInvokeTimeoutMs,
stepTimeoutS: cfg.stepTimeoutS,
aiInvokeTimeoutS: cfg.aiInvokeTimeoutS,
};
}
}
Loading
Loading