Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,8 @@ def __init__(
test_client=None,
max_buffered_rows=None,
retry_strategy=None,
additional_bq_parameters=None):
additional_bq_parameters=None,
ignore_insert_ids=False):
"""Initialize a WriteToBigQuery transform.

Args:
Expand Down Expand Up @@ -1054,6 +1055,12 @@ def __init__(
to be passed when creating a BigQuery table. These are passed when
triggering a load job for FILE_LOADS, and when creating a new table for
STREAMING_INSERTS.
ignore_insert_ids: When using the STREAMING_INSERTS method to write data
to BigQuery, `insert_ids` are a feature of BigQuery that support
deduplication of events. If your use case is not sensitive to
duplication of data inserted to BigQuery, set `ignore_insert_ids`
to True to increase the throughput for BQ writing. See:
https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication
"""
self.schema = schema
self.test_client = test_client
Expand All @@ -1068,6 +1075,7 @@ def __init__(
self._max_buffered_rows = (
max_buffered_rows or BigQueryWriteFn.DEFAULT_MAX_BUFFERED_ROWS)
self._retry_strategy = retry_strategy or RetryStrategy.RETRY_ALWAYS
self.ignore_insert_ids = ignore_insert_ids

self.additional_bq_parameters = additional_bq_parameters or {}

Expand All @@ -1085,7 +1093,8 @@ def display_data(self):
'retry_strategy': self._retry_strategy,
'create_disposition': str(self.create_disposition),
'write_disposition': str(self.write_disposition),
'additional_bq_parameters': str(self.additional_bq_parameters)
'additional_bq_parameters': str(self.additional_bq_parameters),
'ignore_insert_ids': str(self.ignore_insert_ids)
}

def _reset_rows_buffer(self):
Expand Down Expand Up @@ -1209,7 +1218,10 @@ def _flush_batch(self, destination):
self.batch_size_metric.update(len(rows_and_insert_ids))

rows = [r[0] for r in rows_and_insert_ids]
insert_ids = [r[1] for r in rows_and_insert_ids]
if self.ignore_insert_ids:
insert_ids = None
else:
insert_ids = [r[1] for r in rows_and_insert_ids]

while True:
start = time.time()
Expand Down Expand Up @@ -1270,6 +1282,7 @@ def __init__(
kms_key,
retry_strategy,
additional_bq_parameters,
ignore_insert_ids,
test_client=None):
self.table_reference = table_reference
self.table_side_inputs = table_side_inputs
Expand All @@ -1282,6 +1295,7 @@ def __init__(
self.retry_strategy = retry_strategy
self.test_client = test_client
self.additional_bq_parameters = additional_bq_parameters
self.ignore_insert_ids = ignore_insert_ids

class InsertIdPrefixFn(DoFn):
def __init__(self, shards=DEFAULT_SHARDS_PER_DESTINATION):
Expand Down Expand Up @@ -1309,22 +1323,28 @@ def expand(self, input):
kms_key=self.kms_key,
retry_strategy=self.retry_strategy,
test_client=self.test_client,
additional_bq_parameters=self.additional_bq_parameters)
additional_bq_parameters=self.additional_bq_parameters,
ignore_insert_ids=self.ignore_insert_ids)

def drop_shard(elms):
key_and_shard = elms[0]
key = key_and_shard[0]
value = elms[1]
return (key, value)

return (
sharded_data = (
input
| 'AppendDestination' >> beam.ParDo(
bigquery_tools.AppendDestinationsFn(self.table_reference),
*self.table_side_inputs)
| 'AddInsertIdsWithRandomKeys' >> beam.ParDo(
_StreamToBigQuery.InsertIdPrefixFn())
| 'CommitInsertIds' >> ReshufflePerKey()
_StreamToBigQuery.InsertIdPrefixFn()))

if not self.ignore_insert_ids:
sharded_data = (sharded_data | 'CommitInsertIds' >> ReshufflePerKey())

return (
sharded_data
| 'DropShard' >> beam.Map(drop_shard)
| 'StreamInsertRows' >> ParDo(
bigquery_write_fn, *self.schema_side_inputs).with_outputs(
Expand Down Expand Up @@ -1368,7 +1388,8 @@ def __init__(
schema_side_inputs=None,
triggering_frequency=None,
validate=True,
temp_file_format=None):
temp_file_format=None,
ignore_insert_ids=False):
"""Initialize a WriteToBigQuery transform.

Args:
Expand Down Expand Up @@ -1486,6 +1507,12 @@ def __init__(
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
and
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.
ignore_insert_ids: When using the STREAMING_INSERTS method to write data
to BigQuery, `insert_ids` are a feature of BigQuery that support
deduplication of events. If your use case is not sensitive to
duplication of data inserted to BigQuery, set `ignore_insert_ids`
to True to increase the throughput for BQ writing. See:
https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication
"""
self._table = table
self._dataset = dataset
Expand Down Expand Up @@ -1517,6 +1544,7 @@ def __init__(
self.additional_bq_parameters = additional_bq_parameters or {}
self.table_side_inputs = table_side_inputs or ()
self.schema_side_inputs = schema_side_inputs or ()
self._ignore_insert_ids = ignore_insert_ids

# Dict/schema methods were moved to bigquery_tools, but keep references
# here for backward compatibility.
Expand Down Expand Up @@ -1571,6 +1599,7 @@ def expand(self, pcoll):
self.kms_key,
self.insert_retry_strategy,
self.additional_bq_parameters,
self._ignore_insert_ids,
test_client=self.test_client)

return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ def store_callback(arg):
None,
None,
None, [],
ignore_insert_ids=False,
test_client=client))

with open(file_name_1) as f1, open(file_name_2) as f2:
Expand Down