From 07756e9de527699c047acc039164e7183ba2be07 Mon Sep 17 00:00:00 2001 From: Roy Berkowitz Date: Sun, 5 Jul 2020 19:20:39 +0300 Subject: [PATCH 1/7] Allow `replace` flag in gcs_to_gcs operator. If we are not replacing, list all files in the Destination GCS bucket and only keep those files which are present in Source GCS bucket and not in Destination GCS bucket --- .../google/cloud/transfers/gcs_to_gcs.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index a3d6db8636b85..0340e1aefa694 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -69,6 +69,8 @@ class GCSToGCSOperator(BaseOperator): of copied to the new location. This is the equivalent of a mv command as opposed to a cp command. :type move_object: bool + :param replace: Whether you want to replace existing destination files or not. + :type replace: bool :param delimiter: This is used to restrict the result to only the 'files' in a given 'folder'. If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the 'files' in the folder 'foo/bah/' with '.avro' delimiter will be copied to the destination object. @@ -176,6 +178,7 @@ def __init__(self, # pylint: disable=too-many-arguments destination_object=None, delimiter=None, move_object=False, + replace=True, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id=None, delegate_to=None, @@ -198,6 +201,7 @@ def __init__(self, # pylint: disable=too-many-arguments self.destination_object = destination_object self.delimiter = delimiter self.move_object = move_object + self.replace = replace self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.last_modified_time = last_modified_time @@ -276,6 +280,21 @@ def _copy_source_with_wildcard(self, hook, prefix): self.log.info('Delimiter ignored because wildcard is in prefix') prefix_, delimiter = prefix.split(WILDCARD, 1) objects = hook.list(self.source_bucket, prefix=prefix_, delimiter=delimiter) + if not self.replace: + # If we are not replacing, list all files in the Destination GCS bucket + # and only keep those files which are present in + # Source GCS bucket and not in Destination GCS bucket + + existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter) + + objects = list(set(objects) - set(existing_objects)) + if len(objects) > 0: + self.log.info( + '%s files are going to be synced: %s.', len(objects), objects + ) + else: + self.log.info( + 'There are no new files to sync. Have a nice day!') for source_object in objects: if self.destination_object is None: destination_object = source_object From 7943c4bc5b1bc33c3c4941fab4a27bd8c2494830 Mon Sep 17 00:00:00 2001 From: Roy Berkowitz Date: Mon, 6 Jul 2020 14:36:23 +0300 Subject: [PATCH 2/7] Adding a unit test for 'replace' flag set to false in GcsToGcsOperator test_execute_wildcard_with_replace_flag_false --- .../google/cloud/transfers/test_gcs_to_gcs.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 49c067b6aec53..cce839dd6d3cd 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -86,6 +86,20 @@ def test_execute_no_suffix(self, mock_hook): TEST_BUCKET, prefix="test_object", delimiter="" ) + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') + def test_execute_wildcard_with_replace_flag_false(self, mock_hook): + mock_hook.return_value.list.return_value = True + operator = GCSToGCSOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_WILDCARD_SUFFIX, + destination_bucket=DESTINATION_BUCKET, + replace=False) + + operator.execute(None) + mock_hook.return_value.list.assert_called_once_with( + TEST_BUCKET, prefix="test_object", delimiter="", replace=False + ) + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') def test_execute_prefix_and_suffix(self, mock_hook): operator = GCSToGCSOperator( From f9beb05f70b503f35ef48b82795bcf9f68d4157b Mon Sep 17 00:00:00 2001 From: Roy Berkowitz Date: Tue, 7 Jul 2020 12:56:35 +0300 Subject: [PATCH 3/7] not setting return value at the begging of test_execute_wildcard_with_replace_flag_false (GcsToGcsOperator) --- tests/providers/google/cloud/transfers/test_gcs_to_gcs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index cce839dd6d3cd..4c77cd44c36e4 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -88,7 +88,6 @@ def test_execute_no_suffix(self, mock_hook): @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') def test_execute_wildcard_with_replace_flag_false(self, mock_hook): - mock_hook.return_value.list.return_value = True operator = GCSToGCSOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, source_object=SOURCE_OBJECT_WILDCARD_SUFFIX, From 85f28a87e96c3ae473d8cb4bcc1c6e484238214d Mon Sep 17 00:00:00 2001 From: Roy Berkowitz Date: Tue, 7 Jul 2020 15:31:37 +0300 Subject: [PATCH 4/7] 1. assert_has_calls as this mock makes 2 calls 2. removing list() in set() - set() result --- airflow/providers/google/cloud/transfers/gcs_to_gcs.py | 2 +- tests/providers/google/cloud/transfers/test_gcs_to_gcs.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index 0340e1aefa694..70b6288e7241f 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -287,7 +287,7 @@ def _copy_source_with_wildcard(self, hook, prefix): existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter) - objects = list(set(objects) - set(existing_objects)) + objects = set(objects) - set(existing_objects) if len(objects) > 0: self.log.info( '%s files are going to be synced: %s.', len(objects), objects diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 4c77cd44c36e4..058914f448a2c 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -95,9 +95,11 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook): replace=False) operator.execute(None) - mock_hook.return_value.list.assert_called_once_with( - TEST_BUCKET, prefix="test_object", delimiter="", replace=False - ) + calls = [ + mock.Call(TEST_BUCKET, prefix="test_object", delimiter=""), + mock.Call(DESTINATION_BUCKET, prefix="test_object", delimiter=""), + ] + mock_hook.return_value.assert_has_calls(calls) @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') def test_execute_prefix_and_suffix(self, mock_hook): From 3274f860540d768cc870f5bae0cd1cac120342cb Mon Sep 17 00:00:00 2001 From: Roy Berkowitz Date: Tue, 7 Jul 2020 15:40:39 +0300 Subject: [PATCH 5/7] fixing call method name --- tests/providers/google/cloud/transfers/test_gcs_to_gcs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 058914f448a2c..c3f68bb26ae3d 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -96,8 +96,8 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook): operator.execute(None) calls = [ - mock.Call(TEST_BUCKET, prefix="test_object", delimiter=""), - mock.Call(DESTINATION_BUCKET, prefix="test_object", delimiter=""), + mock.call(TEST_BUCKET, prefix="test_object", delimiter=""), + mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""), ] mock_hook.return_value.assert_has_calls(calls) From ac2c7dfec31cc02a3e59f5d47f1b668bae5111b2 Mon Sep 17 00:00:00 2001 From: Roy Berkowitz Date: Tue, 7 Jul 2020 17:20:15 +0300 Subject: [PATCH 6/7] rename calls to 'mock_calls' rewrite return value fix. --- tests/providers/google/cloud/transfers/test_gcs_to_gcs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index c3f68bb26ae3d..50187996f8d49 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -95,11 +95,11 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook): replace=False) operator.execute(None) - calls = [ + mock_calls = [ mock.call(TEST_BUCKET, prefix="test_object", delimiter=""), mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""), ] - mock_hook.return_value.assert_has_calls(calls) + mock_hook.return_value.rewrite.assert_has_calls(mock_calls) @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') def test_execute_prefix_and_suffix(self, mock_hook): From 40946c442ee5849447433594a72f6348cd22400e Mon Sep 17 00:00:00 2001 From: royberkoweee <61996194+royberkoweee@users.noreply.github.com> Date: Tue, 7 Jul 2020 21:10:24 +0300 Subject: [PATCH 7/7] Update tests/providers/google/cloud/transfers/test_gcs_to_gcs.py Co-authored-by: Tomek Urbaszek --- tests/providers/google/cloud/transfers/test_gcs_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 50187996f8d49..a51b101147151 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -99,7 +99,7 @@ def test_execute_wildcard_with_replace_flag_false(self, mock_hook): mock.call(TEST_BUCKET, prefix="test_object", delimiter=""), mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""), ] - mock_hook.return_value.rewrite.assert_has_calls(mock_calls) + mock_hook.return_value.list.assert_has_calls(mock_calls) @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') def test_execute_prefix_and_suffix(self, mock_hook):