Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies {
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:extensions:google-cloud-platform-core")
compile project(":sdks:java:io:kafka")
compile project(":sdks:java:io:google-cloud-platform")
compile project(":runners:core-construction-java")
compile library.java.avro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
Expand Down Expand Up @@ -491,6 +492,9 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
new StreamingPubsubIOWriteOverrideFactory(this)));
}
}
if (useUnifiedWorker(options)) {
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
}
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.writeWithRunnerDeterminedSharding(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
Expand Down Expand Up @@ -193,6 +195,7 @@ private TranslationContext translatePipeline(Pipeline pipeline) {
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}

Expand Down
1 change: 1 addition & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ dependencies {
compile project(":runners:core-java")
compile project(":runners:java-fn-execution")
compile project(":runners:java-job-service")
compile project(":sdks:java:io:kafka")
compile project(":sdks:java:extensions:google-cloud-platform-core")
compile library.java.jackson_annotations
compile library.java.slf4j_api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
Expand All @@ -66,6 +67,7 @@
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -181,6 +183,7 @@ public SparkPipelineResult run(final Pipeline pipeline) {
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;
Expand Down Expand Up @@ -85,6 +87,7 @@ public SparkPipelineResult run(Pipeline pipeline) {
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")}
dependencies {
compile library.java.vendored_guava_26_0_jre
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":runners:core-construction-java")
compile project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
compile library.java.avro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
Expand All @@ -51,6 +54,9 @@
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -72,6 +78,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1209,67 +1216,144 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);

// The Read will be expanded into SDF transform when "beam_fn_api" is enabled.
if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
|| ExperimentalOptions.hasExperiment(
// For read from unbounded in a bounded manner, we actually are not going through Read or SDF.
if (ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "use_deprecated_read")
|| getMaxNumRecords() < Long.MAX_VALUE
|| getMaxReadTime() != null) {
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
}
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}

/**
* A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka
* read if runners doesn't have a good support on executing unbounded Splittable DoFn.
*/
@Internal
public static final PTransformOverride KAFKA_READ_OVERRIDE =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark @Internal just to be clear that it is not for pipeline authors. Would be good to document why this exists and when to use it.

Perhaps it could be in runners-core-construction, but I actually want to merge that back into the core SDK so no need.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runners-core-construction cannot depend on kafka-io because expansion-service introduces circular dependency, just like pubsub

PTransformOverride.of(
PTransformMatchers.classEqualTo(ReadFromKafkaViaSDF.class),
new KafkaReadOverrideFactory<>());

private static class KafkaReadOverrideFactory<K, V>
implements PTransformOverrideFactory<
PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> {

@Override
public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>>
transform) {
return PTransformReplacement.of(
transform.getPipeline().begin(),
new ReadFromKafkaViaUnbounded<>(
transform.getTransform().kafkaRead,
transform.getTransform().keyCoder,
transform.getTransform().valueCoder));
}

@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, V>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}

private static class ReadFromKafkaViaUnbounded<K, V>
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
Read<K, V> kafkaRead;
Coder<K> keyCoder;
Coder<V> valueCoder;

ReadFromKafkaViaUnbounded(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> valueCoder) {
this.kafkaRead = kafkaRead;
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}

@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
org.apache.beam.sdk.io.Read.from(
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
kafkaRead
.toBuilder()
.setKeyCoder(keyCoder)
.setValueCoder(valueCoder)
.build()
.makeSource());

PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;

if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
if (kafkaRead.getMaxNumRecords() < Long.MAX_VALUE || kafkaRead.getMaxReadTime() != null) {
transform =
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
unbounded
.withMaxReadTime(kafkaRead.getMaxReadTime())
.withMaxNumRecords(kafkaRead.getMaxNumRecords());
}

return input.getPipeline().apply(transform);
}
ReadSourceDescriptors<K, V> readTransform =
ReadSourceDescriptors.<K, V>read()
.withConsumerConfigOverrides(getConsumerConfig())
.withOffsetConsumerConfigOverrides(getOffsetConsumerConfig())
.withConsumerFactoryFn(getConsumerFactoryFn())
.withKeyDeserializerProvider(getKeyDeserializerProvider())
.withValueDeserializerProvider(getValueDeserializerProvider())
.withManualWatermarkEstimator()
.withTimestampPolicyFactory(getTimestampPolicyFactory())
.withCheckStopReadingFn(getCheckStopReadingFn());
if (isCommitOffsetsInFinalizeEnabled()) {
readTransform = readTransform.commitOffsets();
}

static class ReadFromKafkaViaSDF<K, V>
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
Read<K, V> kafkaRead;
Coder<K> keyCoder;
Coder<V> valueCoder;

ReadFromKafkaViaSDF(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> valueCoder) {
this.kafkaRead = kafkaRead;
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}
PCollection<KafkaSourceDescriptor> output;
if (isDynamicRead()) {
output =
input
.getPipeline()
.apply(Impulse.create())
.apply(
MapElements.into(
TypeDescriptors.kvs(
new TypeDescriptor<byte[]>() {}, new TypeDescriptor<byte[]>() {}))
.via(element -> KV.of(element, element)))
.apply(
ParDo.of(
new WatchKafkaTopicPartitionDoFn(
getWatchTopicPartitionDuration(),
getConsumerFactoryFn(),
getCheckStopReadingFn(),
getConsumerConfig(),
getStartReadTime())));

} else {
output =
input
.getPipeline()
.apply(Impulse.create())
.apply(ParDo.of(new GenerateKafkaSourceDescriptor(this)));
@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
ReadSourceDescriptors<K, V> readTransform =
ReadSourceDescriptors.<K, V>read()
.withConsumerConfigOverrides(kafkaRead.getConsumerConfig())
.withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig())
.withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn())
.withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider())
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
.withManualWatermarkEstimator()
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
readTransform = readTransform.commitOffsets();
}
PCollection<KafkaSourceDescriptor> output;
if (kafkaRead.isDynamicRead()) {
output =
input
.getPipeline()
.apply(Impulse.create())
.apply(
MapElements.into(
TypeDescriptors.kvs(
new TypeDescriptor<byte[]>() {}, new TypeDescriptor<byte[]>() {}))
.via(element -> KV.of(element, element)))
.apply(
ParDo.of(
new WatchKafkaTopicPartitionDoFn(
kafkaRead.getWatchTopicPartitionDuration(),
kafkaRead.getConsumerFactoryFn(),
kafkaRead.getCheckStopReadingFn(),
kafkaRead.getConsumerConfig(),
kafkaRead.getStartReadTime())));

} else {
output =
input
.getPipeline()
.apply(Impulse.create())
.apply(ParDo.of(new GenerateKafkaSourceDescriptor(kafkaRead)));
}
return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
}
return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
}

/**
Expand Down Expand Up @@ -1798,10 +1882,6 @@ ReadSourceDescriptors<K, V> withTimestampPolicyFactory(

@Override
public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor> input) {
checkArgument(
ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"),
"The ReadSourceDescriptors can only used when beam_fn_api is enabled.");

checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ public void testConstructKafkaRead() throws Exception {
assertThat(transform.getInputsCount(), Matchers.is(0));
assertThat(transform.getOutputsCount(), Matchers.is(1));

RunnerApi.PTransform kafkaComposite =
RunnerApi.PTransform kafkaReadComposite =
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
RunnerApi.PTransform kafkaComposite =
result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0));
assertThat(
kafkaComposite.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*Impulse.*")));
Expand Down