diff --git a/runners/kafka-streams/build.gradle b/runners/kafka-streams/build.gradle
index 3f34a3ca76b6..52b320dd70a8 100644
--- a/runners/kafka-streams/build.gradle
+++ b/runners/kafka-streams/build.gradle
@@ -61,6 +61,7 @@ dependencies {
permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+ testImplementation project(":sdks:java:harness")
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
new file mode 100644
index 000000000000..90fe72e845a4
--- /dev/null
+++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streams {@link Processor} that executes a fused {@link ExecutableStage} (stateless user
+ * code such as ParDo) in the Beam SDK harness over the Fn API.
+ *
+ *
For each {@link KStreamsPayload#isData() data} payload it unwraps the {@link WindowedValue}
+ * and feeds it to the harness through the stage's main input {@link FnDataReceiver}. Harness
+ * outputs are collected on the harness threads into {@link #pendingOutputs} and then flushed
+ * downstream on the Kafka Streams processing thread when the bundle closes — Kafka Streams' {@link
+ * ProcessorContext#forward} must only be called from the processing thread, so outputs are never
+ * forwarded directly from a harness callback.
+ *
+ *
A {@link KStreamsPayload#isWatermark() watermark} payload marks a bundle boundary: the open
+ * bundle (if any) is closed (flushing outputs), and the watermark is then forwarded downstream so
+ * that subsequent stages observe it after all data of the bundle.
+ *
+ *
This is the Kafka Streams analogue of Flink's {@code ExecutableStageDoFnOperator} and Spark's
+ * {@code SparkExecutableStageFunction}. State, timers, and side inputs are out of scope for this
+ * first version: the stage is executed with {@link StateRequestHandler#unsupported()} and no timer
+ * receivers.
+ */
+class ExecutableStageProcessor
+ implements Processor, byte[], KStreamsPayload> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageProcessor.class);
+
+ private final RunnerApi.ExecutableStagePayload stagePayload;
+ private final JobInfo jobInfo;
+
+ private final Queue> pendingOutputs = new ArrayDeque<>();
+
+ private @Nullable ProcessorContext> context;
+ private @Nullable ExecutableStageContext stageContext;
+ private @Nullable StageBundleFactory stageBundleFactory;
+ private @Nullable RemoteBundle currentBundle;
+
+ ExecutableStageProcessor(RunnerApi.ExecutableStagePayload stagePayload, JobInfo jobInfo) {
+ this.stagePayload = stagePayload;
+ this.jobInfo = jobInfo;
+ }
+
+ @Override
+ public void init(ProcessorContext> context) {
+ this.context = context;
+ ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
+ this.stageContext = KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo);
+ this.stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
+ }
+
+ @Override
+ public void process(Record> record) {
+ KStreamsPayload payload = record.value();
+ if (payload.isWatermark()) {
+ // NOTE: flushing the bundle on every received watermark is provisional. Once the
+ // WatermarkManager lands, a stage will receive watermarks from multiple parent instances and
+ // the output watermark becomes min() across them — the bundle should flush / the output
+ // watermark advance only when that minimum actually moves forward, not on every received
+ // watermark. Tracked in #38743.
+ closeBundleAndFlush(record);
+ forwardWatermark(record, payload.getWatermarkMillis());
+ return;
+ }
+ try {
+ ensureBundleOpen();
+ mainInputReceiver().accept(payload.getData());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to process element through SDK harness", e);
+ }
+ }
+
+ private void ensureBundleOpen() throws Exception {
+ if (currentBundle != null) {
+ return;
+ }
+ StageBundleFactory factory = checkInitialized(stageBundleFactory);
+ OutputReceiverFactory outputReceiverFactory =
+ new OutputReceiverFactory() {
+ @Override
+ public FnDataReceiver create(String pCollectionId) {
+ // Outputs are queued here on harness threads and drained on the processing thread
+ // after the bundle closes.
+ return receivedElement -> {
+ if (receivedElement != null) {
+ pendingOutputs.add((WindowedValue) receivedElement);
+ }
+ };
+ }
+ };
+ currentBundle =
+ factory.getBundle(
+ outputReceiverFactory,
+ StateRequestHandler.unsupported(),
+ BundleProgressHandler.ignored());
+ }
+
+ private FnDataReceiver> mainInputReceiver() {
+ RemoteBundle bundle = checkInitialized(currentBundle);
+ @SuppressWarnings("unchecked")
+ FnDataReceiver> receiver =
+ (FnDataReceiver>)
+ (FnDataReceiver>) Iterables.getOnlyElement(bundle.getInputReceivers().values());
+ return receiver;
+ }
+
+ private void closeBundleAndFlush(Record> record) {
+ RemoteBundle bundle = currentBundle;
+ if (bundle == null) {
+ return;
+ }
+ try {
+ // close() blocks until the harness finishes the bundle and all outputs have been delivered
+ // to the output receiver (and hence enqueued in pendingOutputs).
+ bundle.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to close SDK harness bundle", e);
+ } finally {
+ currentBundle = null;
+ }
+ ProcessorContext> ctx = checkInitialized(context);
+ WindowedValue output;
+ while ((output = pendingOutputs.poll()) != null) {
+ ctx.forward(
+ new Record>(
+ record.key(), KStreamsPayload.data(output), record.timestamp()));
+ }
+ }
+
+ private void forwardWatermark(
+ Record> record, long watermarkMillis) {
+ ProcessorContext> ctx = checkInitialized(context);
+ ctx.forward(
+ new Record>(
+ record.key(), KStreamsPayload.watermark(watermarkMillis), record.timestamp()));
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (currentBundle != null) {
+ currentBundle.close();
+ currentBundle = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing in-flight SDK harness bundle", e);
+ }
+ try {
+ if (stageBundleFactory != null) {
+ stageBundleFactory.close();
+ stageBundleFactory = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing stage bundle factory", e);
+ }
+ try {
+ if (stageContext != null) {
+ stageContext.close();
+ stageContext = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing executable stage context", e);
+ }
+ }
+
+ private static T checkInitialized(@Nullable T value) {
+ if (value == null) {
+ throw new IllegalStateException("ExecutableStageProcessor used before init()");
+ }
+ return value;
+ }
+}
diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
new file mode 100644
index 000000000000..9015424cdf88
--- /dev/null
+++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.Topology;
+
+/**
+ * Translates the {@code beam:runner:executable_stage:v1} URN.
+ *
+ *
Adds an {@link ExecutableStageProcessor} node to the topology, wired to the processor that
+ * produces the stage's input PCollection (resolved through {@link
+ * KafkaStreamsTranslationContext#getProcessorNameForPCollection}). The processor runs the fused
+ * user code in the SDK harness; its single output PCollection is registered so downstream
+ * translators can attach to this node.
+ *
+ *
Multi-output stages (additional outputs / side inputs / state / timers) are out of scope for
+ * this first version and are rejected so the limitation fails fast rather than silently dropping
+ * outputs.
+ */
+class ExecutableStageTranslator implements PTransformTranslator {
+
+ @Override
+ public void translate(
+ String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) {
+ RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId);
+
+ RunnerApi.ExecutableStagePayload stagePayload;
+ try {
+ stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ "Failed to parse ExecutableStagePayload for transform " + transformId, e);
+ }
+
+ String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values());
+ String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
+
+ if (transform.getOutputsMap().size() > 1) {
+ throw new UnsupportedOperationException(
+ "ExecutableStage "
+ + transformId
+ + " has "
+ + transform.getOutputsMap().size()
+ + " outputs; multi-output stages are not yet supported by the Kafka Streams runner.");
+ }
+
+ Topology topology = context.getTopology();
+ topology.addProcessor(
+ transformId,
+ () -> new ExecutableStageProcessor(stagePayload, context.getJobInfo()),
+ parentProcessor);
+
+ if (!transform.getOutputsMap().isEmpty()) {
+ String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());
+ context.registerPCollectionProducer(outputPCollectionId, transformId);
+ }
+ }
+}
diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
index 47c94eea6eff..53e47b1216bb 100644
--- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
+++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
@@ -19,6 +19,7 @@
import java.util.Objects;
import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -121,6 +122,12 @@ public int hashCode() {
@Override
public String toString() {
- return kind == Kind.DATA ? "Data{" + data + "}" : "Watermark{" + watermarkMillis + "}";
+ MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this).add("kind", kind);
+ if (kind == Kind.DATA) {
+ helper.add("data", data);
+ } else {
+ helper.add("watermarkMillis", watermarkMillis);
+ }
+ return helper.toString();
}
}
diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
new file mode 100644
index 000000000000..3d1643f3ea9a
--- /dev/null
+++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+
+/**
+ * Provides one {@link ExecutableStageContext.Factory} per job for the Kafka Streams runner.
+ *
+ *
Mirrors {@code FlinkExecutableStageContextFactory}: a singleton that hands out reference-
+ * counted {@link DefaultExecutableStageContext}s keyed by job id, so the SDK harness environment
+ * for a job is created once and shared across the {@link ImpulseProcessor}/executable-stage
+ * processors that run within the same JVM instance.
+ */
+public class KafkaStreamsExecutableStageContextFactory implements ExecutableStageContext.Factory {
+
+ private static final KafkaStreamsExecutableStageContextFactory INSTANCE =
+ new KafkaStreamsExecutableStageContextFactory();
+
+ private final ConcurrentMap jobFactories =
+ new ConcurrentHashMap<>();
+
+ private KafkaStreamsExecutableStageContextFactory() {}
+
+ public static KafkaStreamsExecutableStageContextFactory getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public ExecutableStageContext get(JobInfo jobInfo) {
+ ExecutableStageContext.Factory jobFactory =
+ jobFactories.computeIfAbsent(
+ jobInfo.jobId(),
+ k ->
+ ReferenceCountingExecutableStageContextFactory.create(
+ DefaultExecutableStageContext::create,
+ // Release the context synchronously once its reference count drops to zero;
+ // the runner does not keep contexts alive across stages beyond their use.
+ (caller) -> true));
+ return jobFactory.get(jobInfo);
+ }
+}
diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
index eb8567146143..5042f5426169 100644
--- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
+++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
@@ -22,6 +22,8 @@
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -43,6 +45,7 @@ public KafkaStreamsPipelineTranslator() {
this(
ImmutableMap.builder()
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
+ .put(ExecutableStage.URN, new ExecutableStageTranslator())
.build());
}
@@ -55,9 +58,21 @@ public KafkaStreamsTranslationContext createTranslationContext(
return KafkaStreamsTranslationContext.create(jobInfo, pipelineOptions);
}
- /** Returns the pipeline to translate (placeholder for future fusion / expansion steps). */
+ /**
+ * Fuses the pipeline so that stateless user code is grouped into {@code ExecutableStage} nodes.
+ *
+ *
Runner-executed primitives that have their own translator (e.g. Impulse) are left intact;
+ * everything else is fused. If the pipeline already contains {@code ExecutableStage} transforms
+ * it is returned unchanged.
+ */
public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) {
- return pipeline;
+ boolean alreadyFused =
+ pipeline.getComponents().getTransformsMap().values().stream()
+ .anyMatch(t -> ExecutableStage.URN.equals(t.getSpec().getUrn()));
+ if (alreadyFused) {
+ return pipeline;
+ }
+ return GreedyPipelineFuser.fuse(pipeline).toPipeline();
}
/**
diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
new file mode 100644
index 000000000000..7dd6b71943ed
--- /dev/null
+++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.construction.Environments;
+import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.Test;
+
+/**
+ * End-to-end test for {@link ExecutableStageTranslator}: builds an {@code Impulse -> ParDo}
+ * pipeline with the high-level Beam Java SDK, fuses + translates it, and runs the resulting Kafka
+ * Streams topology under {@link TopologyTestDriver}. The fused ParDo executes in an in-process
+ * (EMBEDDED) Java SDK harness, so the {@link DoFn}'s {@code @ProcessElement} body runs for real —
+ * no Docker, no broker.
+ *
+ *
Because the ParDo's output PCollection has no downstream consumer, it is not a stage output
+ * and is never forwarded out of the harness — that is the documented behaviour. The test verifies
+ * the bridge works by having the DoFn record into a {@link SharedTestCollector} as a side effect
+ * and asserting the recorded input from the test thread.
+ */
+public class ExecutableStageTranslatorTest {
+
+ private static final String JOB_ID = "kafka-streams-executable-stage-test";
+ private static final String APPLICATION_ID = "ks-executable-stage-test";
+
+ /**
+ * Records the length of every input element seen by the harness so the test can verify the DoFn
+ * ran. {@link SharedTestCollector} carries its identity via a UUID stored on the instance itself,
+ * so it survives any serialization the runner may perform on the DoFn.
+ */
+ private static class RecordingFn extends DoFn {
+ private final SharedTestCollector collector;
+
+ RecordingFn(SharedTestCollector collector) {
+ this.collector = collector;
+ }
+
+ @ProcessElement
+ public void processElement(@Element byte[] input, OutputReceiver out) {
+ collector.record(input.length);
+ // Still emit something so the output codepath of the harness is exercised, even though no
+ // downstream consumer means the runner never observes the value.
+ out.output(new byte[] {1});
+ }
+ }
+
+ @Test
+ public void impulseThenParDoExecutesDoFnInHarnessOncePerImpulseElement() throws Exception {
+ SharedTestCollector collector = SharedTestCollector.create();
+ collector.reset();
+
+ Pipeline pipeline = Pipeline.create(pipelineOptions());
+ pipeline
+ .apply("impulse", Impulse.create())
+ .apply("pardo", ParDo.of(new RecordingFn(collector)));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+
+ KafkaStreamsPipelineOptions options =
+ pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
+ KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
+ JobInfo jobInfo =
+ JobInfo.create(
+ JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options));
+ KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options);
+
+ translator.translate(context, translator.prepareForTranslation(pipelineProto));
+
+ Topology topology = context.getTopology();
+ try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) {
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ }
+
+ List recorded = collector.recorded();
+ // Impulse emits exactly one empty byte[] in the GlobalWindow, so the DoFn must run exactly
+ // once and see a zero-length input.
+ assertThat(recorded.size(), is(1));
+ assertThat(recorded.get(0), is(0));
+ }
+
+ private static PipelineOptions pipelineOptions() {
+ PipelineOptions options =
+ PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create();
+ options.setRunner(CrashingRunner.class);
+ options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID);
+ options
+ .as(PortablePipelineOptions.class)
+ .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+ return options;
+ }
+
+ private static Properties streamsConfig() {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(
+ StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
+ props.put(
+ StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
+ return props;
+ }
+}
diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
index 44cc00bcebbd..13baa551ebbf 100644
--- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
+++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
@@ -18,16 +18,19 @@
package org.apache.beam.runners.kafka.streams.translation;
import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.kafka.streams.TopologyDescription;
import org.junit.Test;
@@ -57,10 +60,11 @@ public void translateRejectsUnknownTransformWithUrnInMessage() {
.build()))
.build();
+ // translate() directly — this test pins the URN-rejection contract on the dispatch loop
+ // itself, independent of the fuser/validator that prepareForTranslation runs.
UnsupportedOperationException ex =
assertThrows(
- UnsupportedOperationException.class,
- () -> translator.translate(context, translator.prepareForTranslation(pipeline)));
+ UnsupportedOperationException.class, () -> translator.translate(context, pipeline));
assertThat(ex.getMessage(), containsString("No translator registered for URN"));
assertThat(ex.getMessage(), containsString(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN));
@@ -73,15 +77,23 @@ public void translateImpulsePipelineAddsSourceAndProcessorNodes() {
KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
KafkaStreamsTranslationContext context = newContext();
- RunnerApi.Pipeline pipeline = singleImpulsePipeline();
+ // Build the pipeline through the SDK so the resulting RunnerApi.Pipeline carries the coders
+ // and windowing strategies that PipelineValidator requires (run inside the fuser).
+ Pipeline sdkPipeline =
+ Pipeline.create(
+ PipelineOptionsFactory.fromArgs(
+ "--applicationId=ks-translator-test",
+ "--runner=" + CrashingRunner.class.getName())
+ .create());
+ sdkPipeline.apply("impulse", Impulse.create());
+ RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(sdkPipeline);
+
translator.translate(context, translator.prepareForTranslation(pipeline));
TopologyDescription description = context.getTopology().describe();
String describeText = description.toString();
-
- assertThat(describeText, containsString("impulse-source"));
- assertThat(describeText, containsString("impulse"));
- assertThat(context.getProcessorNameForPCollection(OUTPUT_PCOLLECTION_ID), is("impulse"));
+ assertThat(describeText, containsString("Source:"));
+ assertThat(describeText, containsString("Processor:"));
}
@Test
diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
new file mode 100644
index 000000000000..a9c1010dd38f
--- /dev/null
+++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test-only side-effect sink that survives Beam serialization without losing collected elements.
+ *
+ *
An ExecutableStage that contains a user {@link org.apache.beam.sdk.transforms.DoFn} runs the
+ * DoFn in the SDK harness even when its output PCollection has no downstream consumer — the work is
+ * still performed for its side effects. The natural unit test for that is to have the DoFn record
+ * into a side-effect container and assert the container's contents from the test thread.
+ *
+ *
A plain static {@code AtomicReference} / {@code List} works only as long as the runner does
+ * not serialize the {@code DoFn} (and therefore the container instance it holds). The EMBEDDED
+ * environment may already, and could in the future, serialize the user code, in which case a cloned
+ * container would silently drop its writes.
+ *
+ *
This class works around that by keying the actual storage on a {@link UUID} held by an
+ * otherwise-empty instance. The instance itself is cheaply {@link Serializable}; clones still carry
+ * the same {@code UUID} and therefore see the same backing list in the static {@link #REGISTRY}.
+ *
+ * @param element type
+ */
+final class SharedTestCollector implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Per-UUID storage, populated lazily on the first {@code record} for each instance. */
+ private static final Map> REGISTRY = new ConcurrentHashMap<>();
+
+ private final UUID id = UUID.randomUUID();
+
+ /** Returns a fresh, empty collector instance with its own UUID. */
+ static SharedTestCollector create() {
+ return new SharedTestCollector<>();
+ }
+
+ /** Records a single element. Safe to call from any thread. */
+ void record(T element) {
+ REGISTRY.computeIfAbsent(id, k -> Collections.synchronizedList(new ArrayList<>())).add(element);
+ }
+
+ /** Returns an immutable snapshot of all recorded elements, in order. */
+ @SuppressWarnings("unchecked")
+ List recorded() {
+ List