Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
Expand All @@ -33,6 +31,7 @@
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Exposes {@link JdbcIO.Write} and {@link JdbcIO.ReadRows} as the external transforms for
Expand All @@ -46,17 +45,17 @@ public class JdbcTransformsRegistrar implements ExternalTransformRegistrar {
public static final String WRITE_URN = "beam:external:java:jdbc:write:v1";

@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(READ_ROWS_URN, ReadRowsBuilder.class, WRITE_URN, WriteBuilder.class);
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
return ImmutableMap.of(READ_ROWS_URN, new ReadRowsBuilder(), WRITE_URN, new WriteBuilder());
}

private static class CrossLanguageConfiguration {
String driverClassName;
String jdbcUrl;
String username;
String password;
String connectionProperties;
Iterable<String> connectionInitSqls;
@Nullable String connectionProperties;
@Nullable List<String> connectionInitSqls;

public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
Expand All @@ -74,11 +73,11 @@ public void setPassword(String password) {
this.password = password;
}

public void setConnectionProperties(String connectionProperties) {
public void setConnectionProperties(@Nullable String connectionProperties) {
this.connectionProperties = connectionProperties;
}

public void setConnectionInitSqls(Iterable<String> connectionInitSqls) {
public void setConnectionInitSqls(@Nullable List<String> connectionInitSqls) {
this.connectionInitSqls = connectionInitSqls;
}

Expand All @@ -94,11 +93,10 @@ protected JdbcIO.DataSourceConfiguration getDataSourceConfiguration() {
}

if (connectionInitSqls != null) {
List<String> initSqls =
StreamSupport.stream(connectionInitSqls.spliterator(), false)
.collect(Collectors.toList());
dataSourceConfiguration = dataSourceConfiguration.withConnectionInitSqls(initSqls);
dataSourceConfiguration =
dataSourceConfiguration.withConnectionInitSqls(connectionInitSqls);
}

return dataSourceConfiguration;
}
}
Expand All @@ -108,15 +106,17 @@ public static class ReadRowsBuilder

public static class Configuration extends CrossLanguageConfiguration {
private String query;
private Integer fetchSize;
private Boolean outputParallelization;
private @Nullable Integer fetchSize;
private @Nullable Boolean outputParallelization;

public void setOutputParallelization(Boolean outputParallelization) {
public void setOutputParallelization(@Nullable Boolean outputParallelization) {
this.outputParallelization = outputParallelization;
}

public void setFetchSize(Integer fetchSize) {
this.fetchSize = fetchSize;
public void setFetchSize(@Nullable Long fetchSize) {
if (fetchSize != null) {
this.fetchSize = fetchSize.intValue();
}
}

public void setQuery(String query) {
Expand Down