[GSoC 2026] Kafka Streams runner — ExecutableStage (stateless ParDo) translator#38764
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements the third sub-issue of the Kafka Streams runner GSoC 2026 project, focusing on the execution of stateless user code. By adding the ExecutableStage translator and the necessary bridge to the SDK harness, the runner can now execute fused ParDo operations using the Fn API. This change includes infrastructure for managing harness contexts and robust testing utilities to verify execution within the Kafka Streams topology. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request implements the translation and execution of fused executable stages (beam:runner:executable_stage:v1) for the Kafka Streams runner, allowing stateless user code to run in the Beam SDK harness. The feedback highlights two critical issues: a concurrency bug in ExecutableStageProcessor due to the use of a non-thread-safe ArrayDeque for queueing outputs across threads, which should be replaced with ConcurrentLinkedQueue, and a potential memory leak in KafkaStreamsExecutableStageContextFactory where job factories are not cleaned up from the map when their reference count drops to zero.
| */ | ||
| package org.apache.beam.runners.kafka.streams.translation; | ||
|
|
||
| import java.util.ArrayDeque; |
There was a problem hiding this comment.
Since the queue is accessed concurrently by SDK harness threads (which enqueue outputs) and the Kafka Streams processing thread (which flushes them), we should use a thread-safe queue like ConcurrentLinkedQueue instead of ArrayDeque to avoid race conditions and data corruption.
| import java.util.ArrayDeque; | |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| private final RunnerApi.ExecutableStagePayload stagePayload; | ||
| private final JobInfo jobInfo; | ||
|
|
||
| private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>(); |
There was a problem hiding this comment.
Instantiate pendingOutputs as a ConcurrentLinkedQueue to ensure thread-safe operations when elements are added from harness threads and polled from the processing thread.
| private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>(); | |
| private final Queue<WindowedValue<byte[]>> pendingOutputs = new ConcurrentLinkedQueue<>(); |
| // Release the context synchronously once its reference count drops to zero; | ||
| // the runner does not keep contexts alive across stages beyond their use. | ||
| (caller) -> true)); |
There was a problem hiding this comment.
The jobFactories map is a static ConcurrentHashMap that stores ExecutableStageContext.Factory instances per job. If we do not remove the factory from the map when the reference count drops to zero, it will cause a memory leak of the factory and its associated resources for every job run in the JVM. We should remove the job ID from jobFactories in the releaser callback.
| // Release the context synchronously once its reference count drops to zero; | |
| // the runner does not keep contexts alive across stages beyond their use. | |
| (caller) -> true)); | |
| (caller) -> { | |
| jobFactories.remove(jobInfo.jobId()); | |
| return true; | |
| })); |
There was a problem hiding this comment.
Code Review
This pull request introduces support for executing fused ExecutableStage nodes (stateless user code) in the Beam SDK harness over the Fn API for the Kafka Streams runner. It adds the ExecutableStageProcessor to run the fused code, the ExecutableStageTranslator to handle the translation, and a context factory to manage shared SDK harness environments. Feedback on the changes highlights a concurrency issue in ExecutableStageProcessor where a non-thread-safe ArrayDeque is used for pending outputs, which should be replaced with ConcurrentLinkedQueue. Additionally, the translator should explicitly reject unsupported features like side inputs, user states, and timers to fail fast with descriptive errors.
| private final RunnerApi.ExecutableStagePayload stagePayload; | ||
| private final JobInfo jobInfo; | ||
|
|
||
| private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>(); |
There was a problem hiding this comment.
pendingOutputs is accessed concurrently: it is populated by SDK harness threads inside the FnDataReceiver callback and drained by the Kafka Streams processing thread. Since ArrayDeque is not thread-safe and lacks memory visibility guarantees across threads, this can lead to race conditions or data corruption.
Using ConcurrentLinkedQueue provides a thread-safe, lock-free queue that is ideal for this producer-consumer pattern.
| private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>(); | |
| private final Queue<WindowedValue<byte[]>> pendingOutputs = new ConcurrentLinkedQueue<>(); |
| */ | ||
| package org.apache.beam.runners.kafka.streams.translation; | ||
|
|
||
| import java.util.ArrayDeque; |
| String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); | ||
| String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); |
There was a problem hiding this comment.
Instead of using Iterables.getOnlyElement(transform.getInputsMap().values()), which will fail with a generic IllegalArgumentException if there are side inputs, we can directly retrieve the main input PCollection ID from stagePayload.getInput().
Additionally, we should explicitly check and reject side inputs, user states, and timers using stagePayload to fail fast with a clear, descriptive error message.
if (stagePayload.getSideInputsCount() > 0) {
throw new UnsupportedOperationException(
"ExecutableStage "
+ transformId
+ " has side inputs; side inputs are not yet supported by the Kafka Streams runner.");
}
if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() > 0) {
throw new UnsupportedOperationException(
"ExecutableStage "
+ transformId
+ " has user states or timers; these are not yet supported by the Kafka Streams runner.");
}
String inputPCollectionId = stagePayload.getInput();
String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);|
Assigning reviewers: R: @chamikaramj added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Summary
Third sub-issue under the Kafka Streams runner GSoC 2026 project. Adds the ExecutableStage translator and SDK-harness bridge so fused stateless user code (ParDo etc.) actually runs in the SDK harness over the Fn API and its outputs flow back into the topology.
Per design doc §4.2 and the live discussion with @je-ik on the issue.
What's in this PR
KafkaStreamsExecutableStageContextFactorymirroring Flink's pattern.ExecutableStageProcessor(the harness bridge) +ExecutableStageTranslator.ExecutableStage.URN;prepareForTranslationnow runsGreedyPipelineFuser.KStreamsPayload.toStringusesMoreObjects.toStringHelper(post-merge tweak from [GSoC 2026] Kafka Streams runner — translation framework + Impulse translator #38689).testImplementation project(':sdks:java:harness')for the EMBEDDED environment.Tests
ExecutableStageTranslatorTestbuildsImpulse -> ParDovia the BeamJava SDK, drives the resulting topology under
TopologyTestDriverwith the EMBEDDED environment, and asserts via side effect that the
DoFn ran in the SDK harness with the expected input. Approach
discussed with @je-ik on [GSoC 2026] Kafka Streams Runner — ExecutableStage (stateless ParDo) translator #38743 — because the ParDo's output has no
downstream consumer, the stage has no output PCollection and the
harness does not deliver the value back to the runner (per Beam
semantics), so the bridge is verified by a recorded side effect.
SharedTestCollector<T>helper: instances areSerializablebut their identity is a UUID; the actual storage lives in a static
registry keyed by UUID. Survives the runner cloning the DoFn
(current or future EMBEDDED behaviour).
KafkaStreamsPipelineTranslatorTestupdated so the Impulse casebuilds via the Beam SDK (validator-compliant proto for the fuser)
and the URN-rejection case calls
translatedirectly to keep thedispatch-loop contract isolated.
Validation
./gradlew :runners:kafka-streams:checkgreen locally (14 tests).@SuppressWarningsof any flavor in the new code.Notes / deferred
ExecutableStageProcessorisprovisional. When the
WatermarkManagerlands, the output watermarkwill be
min()across received watermarks and the flush should fireonly when that minimum moves forward, not on every received
watermark — comment in the processor flags this.
KStreamsPayloadCoder + KafkaSerdefor topic-boundaryserialization is still deferred to the first sub-issue that
introduces a topic boundary (GBK / repartition).
Closes #38743
Refs #18479
cc @je-ik