Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class GCSToGCSOperator(BaseOperator):
of copied to the new location. This is the equivalent of a mv command
as opposed to a cp command.
:type move_object: bool
:param replace: Whether you want to replace existing destination files or not.
:type replace: bool
:param delimiter: This is used to restrict the result to only the 'files' in a given 'folder'.
If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the 'files' in the
folder 'foo/bah/' with '.avro' delimiter will be copied to the destination object.
Expand Down Expand Up @@ -176,6 +178,7 @@ def __init__(self, # pylint: disable=too-many-arguments
destination_object=None,
delimiter=None,
move_object=False,
replace=True,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
delegate_to=None,
Expand All @@ -198,6 +201,7 @@ def __init__(self, # pylint: disable=too-many-arguments
self.destination_object = destination_object
self.delimiter = delimiter
self.move_object = move_object
self.replace = replace
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.last_modified_time = last_modified_time
Expand Down Expand Up @@ -276,6 +280,21 @@ def _copy_source_with_wildcard(self, hook, prefix):
self.log.info('Delimiter ignored because wildcard is in prefix')
prefix_, delimiter = prefix.split(WILDCARD, 1)
objects = hook.list(self.source_bucket, prefix=prefix_, delimiter=delimiter)
if not self.replace:
# If we are not replacing, list all files in the Destination GCS bucket
# and only keep those files which are present in
# Source GCS bucket and not in Destination GCS bucket

existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)

objects = set(objects) - set(existing_objects)
if len(objects) > 0:
self.log.info(
'%s files are going to be synced: %s.', len(objects), objects
)
else:
self.log.info(
'There are no new files to sync. Have a nice day!')
for source_object in objects:
if self.destination_object is None:
destination_object = source_object
Expand Down
15 changes: 15 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ def test_execute_no_suffix(self, mock_hook):
TEST_BUCKET, prefix="test_object", delimiter=""
)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_wildcard_with_replace_flag_false(self, mock_hook):
operator = GCSToGCSOperator(
task_id=TASK_ID, source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
destination_bucket=DESTINATION_BUCKET,
replace=False)

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object", delimiter=""),
mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_prefix_and_suffix(self, mock_hook):
operator = GCSToGCSOperator(
Expand Down