Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@ All notable changes to this project will be documented in this file.
- Automatic metrics wiring: `ConductorClient.Builder.withMetricsCollector(...)` installs the HTTP interceptor and auto-registers listeners on `TaskClient`, `WorkflowClient`, and `TaskRunnerConfigurer`
- HTTP API client metrics via OkHttp interceptor (`http_api_client_request_seconds`, `task_result_size_bytes`, `workflow_input_size_bytes`)
- Event-driven metrics architecture with `EventDispatcher` and typed event POJOs
- File storage support: `FileClient` for uploading and downloading files via S3, Azure Blob, GCS, or local storage backends, with single-part and multipart upload support
- `FileHandler` abstraction for passing files into and out of workers — the SDK auto-resolves `conductor://file/` references in task input and uploads `FileHandler` values in task output
- `@InputParam`-annotated worker parameters of type `FileHandler` are automatically deserialized from file references
- Spring auto-configuration for `FileClient` and `FileClientProperties` (`conductor.file-client.*` properties)
- Automatic token refresh via `TokenRefreshInterceptor`: transparently retries requests that fail with `EXPIRED_TOKEN` or `INVALID_TOKEN` (401/403), minting a fresh token and replaying the request once
- `FatalAuthenticationException` and JVM termination when token refresh is permanently exhausted (5 consecutive failures), preventing workers from silently spinning on bad credentials

### Changed

- `PrometheusMetricsCollector` metric names updated to the harmonized cross-SDK catalog (e.g. `task_poll_total`, `task_execute_time_seconds`)
- `micrometer-registry-prometheus` is now a transitive (`api`) dependency
- Token refresh reworked to a reactive interceptor model replacing the previous scheduled refresh mechanism; includes exponential backoff and thundering-herd prevention

### Removed

- Removed non-functioning scheduled token refresh mechanism (replaced by automatic reactive refresh)

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;

import io.orkes.conductor.client.http.FatalAuthenticationException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -93,6 +95,7 @@ class TaskRunner {
private final ScheduledExecutorService leaseExtendExecutorService;
private Map<String, ScheduledFuture<?>> leaseExtendMap = new ConcurrentHashMap<>();
private final AtomicInteger activeWorkerCount = new AtomicInteger(0);
private Runnable exitAction = () -> System.exit(1);

TaskRunner(Worker worker,
TaskClient taskClient,
Expand Down Expand Up @@ -168,6 +171,11 @@ class TaskRunner {
);
}

@VisibleForTesting
void setExitAction(Runnable exitAction) {
this.exitAction = exitAction;
}

public void pollAndExecute() {
Stopwatch stopwatch = null;
while (pollingAndExecuting) {
Expand Down Expand Up @@ -296,6 +304,11 @@ private List<Task> pollTasksForWorker() {
} catch (Throwable e) {
permits.release(pollCount - tasks.size());

if (hasFatalAuthCause(e)) {
LOGGER.error("Fatal authentication failure — terminating JVM");
exitAction.run();
}

//For the first 100 errors, just print them as is...
boolean printError = pollingErrorCount < 100 || pollingErrorCount % errorAt == 0;
pollingErrorCount++;
Expand Down Expand Up @@ -350,6 +363,15 @@ private void onUncaughtException(Thread thread, Throwable error) {
}
}

private static boolean hasFatalAuthCause(Throwable t) {
for (Throwable cause = t; cause != null; cause = cause.getCause()) {
if (cause instanceof FatalAuthenticationException) {
return true;
}
}
return false;
}

private Task processTask(Task task) {
eventDispatcher.publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity()));
eventDispatcher.publish(new ActiveWorkersChanged(taskType, activeWorkerCount.incrementAndGet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.orkes.conductor.client.http.ApiResponse;
import io.orkes.conductor.client.http.OrkesAuthentication;
import io.orkes.conductor.client.http.Pair;
import io.orkes.conductor.client.http.TokenRefreshAuthenticator;
import io.orkes.conductor.client.http.TokenRefreshInterceptor;

import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
Expand Down Expand Up @@ -194,7 +194,7 @@ public ApiClientBuilder credentials(String key, String secret) {

this.authentication = new OrkesAuthentication(key, secret);
this.addHeaderSupplier(this.authentication);
this.configureOkHttp(b -> b.authenticator(new TokenRefreshAuthenticator(this.authentication)));
this.configureOkHttp(b -> b.addInterceptor(new TokenRefreshInterceptor(this.authentication)));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.orkes.conductor.client.http;

/**
* Thrown when authentication has permanently failed (e.g. max token refresh
* attempts exhausted). This deliberately does NOT extend
* {@link com.netflix.conductor.client.exception.ConductorClientException} so
* that generic catch blocks for transient client errors won't swallow it.
* The worker polling loop in {@code TaskRunner} re-throws this to its
* uncaught-exception handler, which terminates the JVM.
*/
public class FatalAuthenticationException extends RuntimeException {

public FatalAuthenticationException(String message) {
super(message);
}

public FatalAuthenticationException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.client.exception.ConductorClientException;
import com.netflix.conductor.client.http.ConductorClient;
import com.netflix.conductor.client.http.HeaderSupplier;

Expand All @@ -35,12 +36,19 @@ public class OrkesAuthentication implements HeaderSupplier {
private static final Logger LOGGER = LoggerFactory.getLogger(OrkesAuthentication.class);
private static final String TOKEN_CACHE_KEY = "TOKEN";

// Stop minting after this many consecutive failures.
private static final int MAX_TOKEN_REFRESH_FAILURES = 5;
Comment thread
chrishagglund-ship-it marked this conversation as resolved.

private final Cache<String, String> tokenCache;
private final String keyId;
private final String keySecret;
private final long tokenRefreshInSeconds;
private final Object refreshLock = new Object();

// Guarded by refreshLock.
private int tokenRefreshFailures = 0;
private long lastTokenRefreshAttempt = 0;

private TokenResource tokenResource;

public OrkesAuthentication(String keyId, String keySecret) {
Expand Down Expand Up @@ -122,14 +130,52 @@ public String refreshIfStale(String staleToken) {
}
}

/**
* Mints a fresh token. Shared by the lazy cache-TTL reload ({@link #getToken()})
Comment thread
chrishagglund-ship-it marked this conversation as resolved.
* and the reactive {@link #refreshIfStale(String)} path. After a failure,
* attempts are spaced by an exponential delay
* ({@code 2^failures} seconds) and stop entirely once
* {@link #MAX_TOKEN_REFRESH_FAILURES} consecutive failures are reached. A
* successful mint resets the failure counter.
*/
private String refreshToken() {
LOGGER.debug("Refreshing token @ {}", Instant.now());
if (keyId == null || keySecret == null) {
throw new RuntimeException("KeyId and KeySecret must be set in order to get an authentication token");
}
synchronized (refreshLock) {
if (tokenRefreshFailures >= MAX_TOKEN_REFRESH_FAILURES) {
throw new FatalAuthenticationException("Token refresh has failed " + tokenRefreshFailures
+ " times. Please check your authentication credentials (CONDUCTOR_AUTH_KEY/"
+ "CONDUCTOR_AUTH_SECRET). Stopping token refresh attempts.");
}

GenerateTokenRequest generateTokenRequest = new GenerateTokenRequest(keyId, keySecret);
TokenResponse response = tokenResource.generate(generateTokenRequest).getData();
return response.getToken();
if (tokenRefreshFailures > 0) {
long backoffMillis = (1L << tokenRefreshFailures) * 1000L; // 2^failures seconds
long sinceLastAttempt = System.currentTimeMillis() - lastTokenRefreshAttempt;
if (sinceLastAttempt < backoffMillis) {
long remaining = backoffMillis - sinceLastAttempt;
throw new ConductorClientException("Token refresh backoff active. Please wait " + remaining
+ "ms before the next attempt. (Failure count: " + tokenRefreshFailures + ")");
}
}

lastTokenRefreshAttempt = System.currentTimeMillis();

if (keyId == null || keySecret == null) {
tokenRefreshFailures++;
throw new ConductorClientException(
"KeyId and KeySecret must be set in order to get an authentication token");
}

LOGGER.debug("Refreshing token @ {}", Instant.now());
try {
GenerateTokenRequest generateTokenRequest = new GenerateTokenRequest(keyId, keySecret);
TokenResponse response = tokenResource.generate(generateTokenRequest).getData();
tokenRefreshFailures = 0;
return response.getToken();
} catch (RuntimeException e) {
tokenRefreshFailures++;
LOGGER.error("Failed to refresh authentication token (attempt {}): {}",
tokenRefreshFailures, e.getMessage());
throw e;
}
}
}
}

This file was deleted.

Loading
Loading