Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dagger-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ configurations {
dependencies {
minimalJar project(path: ':dagger-common', configuration: 'minimalCommonJar')
minimalJar project(path: ':dagger-functions', configuration: 'minimalFunctionsJar')
minimalJar('io.odpf:depot:0.2.0') {
minimalJar('io.odpf:depot:0.3.1') {
exclude group: 'org.apache.httpcomponents'
exclude module: 'stencil', group: 'io.odpf'
exclude group: 'com.google.protobuf'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.odpf.dagger.core;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.core.config.ConfigurationProvider;
import io.odpf.dagger.core.config.ConfigurationProviderFactory;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.core.config.ConfigurationProvider;
import io.odpf.dagger.core.config.ConfigurationProviderFactory;

import java.util.TimeZone;

/**
Expand Down
70 changes: 33 additions & 37 deletions dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
package io.odpf.dagger.core;

import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.source.Stream;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.core.StreamInfo;
Expand All @@ -20,6 +9,7 @@
import io.odpf.dagger.common.watermark.StreamWatermarkAssigner;
import io.odpf.dagger.common.watermark.WatermarkStrategyDefinition;
import io.odpf.dagger.core.exception.UDFFactoryClassNotDefinedException;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.processors.PostProcessorFactory;
import io.odpf.dagger.core.processors.PreProcessorConfig;
import io.odpf.dagger.core.processors.PreProcessorFactory;
Expand All @@ -31,6 +21,14 @@
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.functions.udfs.python.PythonUdfConfig;
import io.odpf.dagger.functions.udfs.python.PythonUdfManager;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -48,11 +46,12 @@
*/
public class StreamManager {

private StencilClientOrchestrator stencilClientOrchestrator;
private final Configuration configuration;
private final StreamExecutionEnvironment executionEnvironment;
private StreamTableEnvironment tableEnvironment;
private MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter();
private final StreamTableEnvironment tableEnvironment;
private final MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter();
private StencilClientOrchestrator stencilClientOrchestrator;
private DaggerStatsDReporter daggerStatsDReporter;

/**
* Instantiates a new Stream manager.
Expand All @@ -74,6 +73,8 @@ public StreamManager(Configuration configuration, StreamExecutionEnvironment exe
*/
public StreamManager registerConfigs() {
stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration();
daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration);

executionEnvironment.setMaxParallelism(configuration.getInteger(Constants.FLINK_PARALLELISM_MAX_KEY, Constants.FLINK_PARALLELISM_MAX_DEFAULT));
executionEnvironment.setParallelism(configuration.getInteger(FLINK_PARALLELISM_KEY, FLINK_PARALLELISM_DEFAULT));
Expand All @@ -86,7 +87,6 @@ public StreamManager registerConfigs() {
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(configuration.getInteger(FLINK_CHECKPOINT_MAX_CONCURRENT_KEY, FLINK_CHECKPOINT_MAX_CONCURRENT_DEFAULT));
executionEnvironment.getConfig().setGlobalJobParameters(configuration.getParam());


tableEnvironment.getConfig().setIdleStateRetention(Duration.ofMinutes(configuration.getInteger(FLINK_RETENTION_IDLE_STATE_MINUTE_KEY, FLINK_RETENTION_IDLE_STATE_MINUTE_DEFAULT)));
return this;
}
Expand All @@ -100,22 +100,23 @@ public StreamManager registerSourceWithPreProcessors() {
long watermarkDelay = configuration.getLong(FLINK_WATERMARK_DELAY_MS_KEY, FLINK_WATERMARK_DELAY_MS_DEFAULT);
Boolean enablePerPartitionWatermark = configuration.getBoolean(FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT);
PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration);
getStreams().forEach(stream -> {
String tableName = stream.getStreamName();
WatermarkStrategyDefinition watermarkStrategyDefinition = getSourceWatermarkDefinition(enablePerPartitionWatermark);
DataStream<Row> dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay));
StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark());

DataStream<Row> rowSingleOutputStreamOperator = streamWatermarkAssigner
.assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark);

TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType());
StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames());
streamInfo = addPreProcessor(streamInfo, tableName, preProcessorConfig);

Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));
tableEnvironment.createTemporaryView(tableName, table);
});
StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter)
.forEach(stream -> {
String tableName = stream.getStreamName();
WatermarkStrategyDefinition watermarkStrategyDefinition = getSourceWatermarkDefinition(enablePerPartitionWatermark);
DataStream<Row> dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay));
StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark());

DataStream<Row> rowSingleOutputStreamOperator = streamWatermarkAssigner
.assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark);

TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType());
StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames());
streamInfo = addPreProcessor(streamInfo, tableName, preProcessorConfig);

Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));
tableEnvironment.createTemporaryView(tableName, table);
});
return this;
}

Expand Down Expand Up @@ -228,12 +229,7 @@ private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName, PreP
private void addSink(StreamInfo streamInfo) {
SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter);
sinkOrchestrator.addSubscriber(telemetryExporter);
streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator));
streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter));
}

List<Stream> getStreams() {
org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration();
DaggerStatsDReporter daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration);
return StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.odpf.dagger.core.exception;

import java.io.IOException;

public class BigQueryWriterException extends IOException {

public BigQueryWriterException(String message, Throwable cause) {
super(message, cause);
}

public BigQueryWriterException(String message) {
super(message);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.odpf.dagger.core.sink;

import io.odpf.dagger.core.sink.bigquery.BigquerySinkBuilder;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.sink.bigquery.BigQuerySinkBuilder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
Expand Down Expand Up @@ -48,7 +49,7 @@ public SinkOrchestrator(MetricsTelemetryExporter telemetryExporter) {
* @columnNames columnNames the column names
* @StencilClientOrchestrator stencilClientOrchestrator the stencil client orchestrator
*/
public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator) {
public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, DaggerStatsDReporter daggerStatsDReporter) {
String sinkType = configuration.getString("SINK_TYPE", "influx");
addMetric(TelemetryTypes.SINK_TYPE.getValue(), sinkType);
Sink sink;
Expand All @@ -73,8 +74,9 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl
sink = new LogSink(columnNames);
break;
case "bigquery":
sink = BigquerySinkBuilder.create()
sink = BigQuerySinkBuilder.create()
.setColumnNames(columnNames)
.setDaggerStatsDReporter(daggerStatsDReporter)
.setConfiguration(configuration)
.setStencilClientOrchestrator(stencilClientOrchestrator)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer;
import io.odpf.dagger.core.metrics.reporters.ErrorReporter;
import io.odpf.dagger.core.metrics.reporters.ErrorReporterFactory;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.depot.OdpfSink;
import io.odpf.depot.bigquery.BigQuerySinkFactory;
Expand All @@ -24,30 +25,32 @@
import java.util.Optional;
import java.util.Set;

public class BigquerySink implements Sink<Row, Void, Void, Void> {
public class BigQuerySink implements Sink<Row, Void, Void, Void> {
private final ProtoSerializer protoSerializer;
private final Configuration configuration;
private final DaggerStatsDReporter daggerStatsDReporter;
private transient BigQuerySinkFactory sinkFactory;

protected BigquerySink(Configuration configuration, ProtoSerializer protoSerializer) {
this(configuration, protoSerializer, null);
protected BigQuerySink(Configuration configuration, ProtoSerializer protoSerializer, DaggerStatsDReporter daggerStatsDReporter) {
this(configuration, protoSerializer, null, daggerStatsDReporter);
}

/**
* Constructor for testing.
*/
protected BigquerySink(Configuration configuration, ProtoSerializer protoSerializer, BigQuerySinkFactory sinkFactory) {
protected BigQuerySink(Configuration configuration, ProtoSerializer protoSerializer, BigQuerySinkFactory sinkFactory, DaggerStatsDReporter daggerStatsDReporter) {
this.configuration = configuration;
this.protoSerializer = protoSerializer;
this.sinkFactory = sinkFactory;
this.daggerStatsDReporter = daggerStatsDReporter;
}

@Override
public SinkWriter<Row, Void, Void> createWriter(InitContext context, List<Void> states) {
ErrorReporter errorReporter = ErrorReporterFactory.getErrorReporter(context.metricGroup(), configuration);
if (sinkFactory == null) {
BigQuerySinkConfig sinkConfig = ConfigFactory.create(BigQuerySinkConfig.class, configuration.getParam().toMap());
sinkFactory = new BigQuerySinkFactory(sinkConfig);
sinkFactory = new BigQuerySinkFactory(sinkConfig, daggerStatsDReporter.buildStatsDReporter());
try {
sinkFactory.init();
} catch (Exception e) {
Expand All @@ -66,7 +69,7 @@ public SinkWriter<Row, Void, Void> createWriter(InitContext context, List<Void>
for (String s : Splitter.on(",").omitEmptyStrings().split(errorsForFailing)) {
errorTypesForFailing.add(ErrorType.valueOf(s.trim()));
}
return new BigquerySinkWriter(protoSerializer, odpfSink, batchSize, errorReporter, errorTypesForFailing);
return new BigQuerySinkWriter(protoSerializer, odpfSink, batchSize, errorReporter, errorTypesForFailing);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,34 @@
import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import org.apache.flink.api.java.utils.ParameterTool;

import java.util.HashMap;
import java.util.Map;

public class BigquerySinkBuilder {
public class BigQuerySinkBuilder {

private String[] columnNames;
private StencilClientOrchestrator stencilClientOrchestrator;
private Configuration configuration;
private DaggerStatsDReporter daggerStatsDReporter;

private BigquerySinkBuilder() {
private BigQuerySinkBuilder() {
}

public static BigquerySinkBuilder create() {
return new BigquerySinkBuilder();
public static BigQuerySinkBuilder create() {
return new BigQuerySinkBuilder();
}

public BigquerySink build() {
public BigQuerySink build() {
ProtoSerializer protoSerializer = new ProtoSerializer(
configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS", ""),
configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", ""),
columnNames,
stencilClientOrchestrator);
Configuration conf = setDefaultValues(configuration);
return new BigquerySink(conf, protoSerializer);
return new BigQuerySink(conf, protoSerializer, daggerStatsDReporter);
}

private Configuration setDefaultValues(Configuration inputConf) {
Expand All @@ -40,22 +42,28 @@ private Configuration setDefaultValues(Configuration inputConf) {
configMap.put("SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY", "LONG_POLLING");
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS", "60000");
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS", "");
configMap.put("SINK_METRICS_APPLICATION_PREFIX", "dagger_");
configMap.put("SINK_BIGQUERY_ROW_INSERT_ID_ENABLE", "false");
return new Configuration(ParameterTool.fromMap(configMap));
}

public BigquerySinkBuilder setConfiguration(Configuration configuration) {
public BigQuerySinkBuilder setConfiguration(Configuration configuration) {
this.configuration = configuration;
return this;
}

public BigquerySinkBuilder setColumnNames(String[] columnNames) {
public BigQuerySinkBuilder setColumnNames(String[] columnNames) {
this.columnNames = columnNames;
return this;
}

public BigquerySinkBuilder setStencilClientOrchestrator(StencilClientOrchestrator stencilClientOrchestrator) {
public BigQuerySinkBuilder setStencilClientOrchestrator(StencilClientOrchestrator stencilClientOrchestrator) {
this.stencilClientOrchestrator = stencilClientOrchestrator;
return this;
}

public BigQuerySinkBuilder setDaggerStatsDReporter(DaggerStatsDReporter daggerStatsDReporter) {
this.daggerStatsDReporter = daggerStatsDReporter;
return this;
}
}
Loading