From 810e4a39260c20e15f00ce3344392bef36c3bbc8 Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Mon, 3 Jul 2023 04:06:54 +0530 Subject: [PATCH 1/3] bug-fix --- .../amazon/aws/transfers/gcs_to_s3.py | 3 + .../amazon/aws/transfers/test_gcs_to_s3.py | 66 +++++++++++++++++-- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 2213de2b60c1a..dd30b314caa4d 100644 --- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -161,6 +161,9 @@ def execute(self, context: Context) -> list[str]: # and only keep those files which are present in # Google Cloud Storage and not in S3 bucket_name, prefix = S3Hook.parse_s3_url(self.dest_s3_key) + # only if prefix is not empty + if prefix: + prefix = prefix if prefix.endswith("/") else f"{prefix}/" # look for the bucket and the prefix to avoid look into # parent directories/keys existing_files = s3_hook.list_keys(bucket_name, prefix=prefix) diff --git a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py index a7a0b2e4305ac..0ec4db38d9bd3 100644 --- a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py @@ -128,7 +128,65 @@ def test_execute_without_replace(self, mock_hook): assert [] == uploaded_files assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket", delimiter="/")) - # Test3: There are no files in destination bucket + # Test3: All the files (within some folders) are already in origin and destination without replace + @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") + def test_execute_without_replace_with_folder_structure(self, mock_hook): + mock_files_gcs = [f"test{idx}/{mock_file}" for idx, mock_file in enumerate(MOCK_FILES)] + mock_files_s3 = [f"test/test{idx}/{mock_file}" for idx, mock_file in enumerate(MOCK_FILES)] + mock_hook.return_value.list.return_value = mock_files_gcs + + hook, bucket = _create_test_bucket() + for mock_file_s3 in mock_files_s3: + bucket.put_object(Key=mock_file_s3, Body=b"testing") + + with NamedTemporaryFile() as f: + gcs_provide_file = mock_hook.return_value.provide_file + gcs_provide_file.return_value.__enter__.return_value.name = f.name + + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + # prefix value for gcs bucket does not matter + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + # endswith "/" + dest_s3_key=f"{S3_BUCKET}/test/", + replace=False, + ) + + # we expect nothing to be uploaded + # and all the MOCK_FILES to be present at the S3 bucket + uploaded_files = operator.execute(None) + + assert [] == uploaded_files + assert sorted(mock_files_s3) == sorted(hook.list_keys("bucket", prefix="test/")) + + with NamedTemporaryFile() as f: + gcs_provide_file = mock_hook.return_value.provide_file + gcs_provide_file.return_value.__enter__.return_value.name = f.name + + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + # prefix value for gcs bucket does not matter + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + # does not endswith "/" + dest_s3_key=f"{S3_BUCKET}/test", + replace=False, + ) + # we expect nothing to be uploaded + # and all the MOCK_FILES to be present at the S3 bucket + uploaded_files = operator.execute(None) + + assert [] == uploaded_files + assert sorted(mock_files_s3) == sorted(hook.list_keys("bucket", prefix="test")) + + # Test4: There are no files in destination bucket @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -154,7 +212,7 @@ def test_execute(self, mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket", delimiter="/")) - # Test4: Destination and Origin are in sync but replace all files in destination + # Test5: Destination and Origin are in sync but replace all files in destination @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute_with_replace(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -182,7 +240,7 @@ def test_execute_with_replace(self, mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket", delimiter="/")) - # Test5: Incremental sync with replace + # Test6: Incremental sync with replace @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute_incremental_with_replace(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -259,7 +317,7 @@ def test_execute_should_pass_dest_s3_extra_args_to_s3_hook(self, s3_mock_hook, m aws_conn_id="aws_default", extra_args={"ContentLanguage": "value"}, verify=None ) - # Test6: s3_acl_policy parameter is set + # Test7: s3_acl_policy parameter is set @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.load_file") def test_execute_with_s3_acl_policy(self, mock_load_file, mock_gcs_hook): From c45f28a3e8ce922190bb4fb03da7749def0951d9 Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Mon, 3 Jul 2023 12:10:15 +0530 Subject: [PATCH 2/3] code review changes --- .../amazon/aws/transfers/gcs_to_s3.py | 4 +- .../amazon/aws/transfers/test_gcs_to_s3.py | 63 +++++-------------- 2 files changed, 20 insertions(+), 47 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index dd30b314caa4d..d57de7e11efb2 100644 --- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -161,7 +161,9 @@ def execute(self, context: Context) -> list[str]: # and only keep those files which are present in # Google Cloud Storage and not in S3 bucket_name, prefix = S3Hook.parse_s3_url(self.dest_s3_key) - # only if prefix is not empty + # if prefix is empty, do not add "/" at end since it would + # filter all the objects (return empty list) instead of empty + # prefix returning all the objects if prefix: prefix = prefix if prefix.endswith("/") else f"{prefix}/" # look for the bucket and the prefix to avoid look into diff --git a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py index 0ec4db38d9bd3..16030a85f9fd9 100644 --- a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py @@ -33,6 +33,7 @@ S3_BUCKET = "s3://bucket/" MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"] S3_ACL_POLICY = "private-read" +deprecated_call_match = "Usage of 'delimiter' is deprecated, please use 'match_glob' instead" def _create_test_bucket(): @@ -47,8 +48,6 @@ def _create_test_bucket(): @mock_s3 class TestGCSToS3Operator: - - # Test0: match_glob @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute__match_glob(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -73,7 +72,6 @@ def test_execute__match_glob(self, mock_hook): bucket_name=GCS_BUCKET, delimiter=None, match_glob=f"**/*{DELIMITER}", prefix=PREFIX ) - # Test1: incremental behaviour (just some files missing) @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute_incremental(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -81,7 +79,7 @@ def test_execute_incremental(self, mock_hook): gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -100,7 +98,6 @@ def test_execute_incremental(self, mock_hook): assert sorted(MOCK_FILES[1:]) == sorted(uploaded_files) assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket", delimiter="/")) - # Test2: All the files are already in origin and destination without replace @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute_without_replace(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -108,7 +105,7 @@ def test_execute_without_replace(self, mock_hook): gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -128,9 +125,12 @@ def test_execute_without_replace(self, mock_hook): assert [] == uploaded_files assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket", delimiter="/")) - # Test3: All the files (within some folders) are already in origin and destination without replace + @pytest.mark.parametrize( + argnames="dest_s3_url", + argvalues=[f"{S3_BUCKET}/test/", f"{S3_BUCKET}/test"], + ) @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") - def test_execute_without_replace_with_folder_structure(self, mock_hook): + def test_execute_without_replace_with_folder_structure(self, mock_hook, dest_s3_url): mock_files_gcs = [f"test{idx}/{mock_file}" for idx, mock_file in enumerate(MOCK_FILES)] mock_files_s3 = [f"test/test{idx}/{mock_file}" for idx, mock_file in enumerate(MOCK_FILES)] mock_hook.return_value.list.return_value = mock_files_gcs @@ -143,16 +143,14 @@ def test_execute_without_replace_with_folder_structure(self, mock_hook): gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, - # prefix value for gcs bucket does not matter prefix=PREFIX, delimiter=DELIMITER, dest_aws_conn_id="aws_default", - # endswith "/" - dest_s3_key=f"{S3_BUCKET}/test/", + dest_s3_key=dest_s3_url, replace=False, ) @@ -163,30 +161,6 @@ def test_execute_without_replace_with_folder_structure(self, mock_hook): assert [] == uploaded_files assert sorted(mock_files_s3) == sorted(hook.list_keys("bucket", prefix="test/")) - with NamedTemporaryFile() as f: - gcs_provide_file = mock_hook.return_value.provide_file - gcs_provide_file.return_value.__enter__.return_value.name = f.name - - with pytest.deprecated_call(): - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - # prefix value for gcs bucket does not matter - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - # does not endswith "/" - dest_s3_key=f"{S3_BUCKET}/test", - replace=False, - ) - # we expect nothing to be uploaded - # and all the MOCK_FILES to be present at the S3 bucket - uploaded_files = operator.execute(None) - - assert [] == uploaded_files - assert sorted(mock_files_s3) == sorted(hook.list_keys("bucket", prefix="test")) - - # Test4: There are no files in destination bucket @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -194,7 +168,7 @@ def test_execute(self, mock_hook): gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -212,7 +186,6 @@ def test_execute(self, mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket", delimiter="/")) - # Test5: Destination and Origin are in sync but replace all files in destination @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute_with_replace(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -220,7 +193,7 @@ def test_execute_with_replace(self, mock_hook): gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -240,7 +213,6 @@ def test_execute_with_replace(self, mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket", delimiter="/")) - # Test6: Incremental sync with replace @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute_incremental_with_replace(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES @@ -248,7 +220,7 @@ def test_execute_incremental_with_replace(self, mock_hook): gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -276,7 +248,7 @@ def test_execute_should_handle_with_default_dest_s3_extra_args(self, s3_mock_hoo s3_mock_hook.return_value = mock.Mock() s3_mock_hook.parse_s3_url.return_value = mock.Mock() - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -299,7 +271,7 @@ def test_execute_should_pass_dest_s3_extra_args_to_s3_hook(self, s3_mock_hook, m s3_mock_hook.return_value = mock.Mock() s3_mock_hook.parse_s3_url.return_value = mock.Mock() - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -317,7 +289,6 @@ def test_execute_should_pass_dest_s3_extra_args_to_s3_hook(self, s3_mock_hook, m aws_conn_id="aws_default", extra_args={"ContentLanguage": "value"}, verify=None ) - # Test7: s3_acl_policy parameter is set @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.load_file") def test_execute_with_s3_acl_policy(self, mock_load_file, mock_gcs_hook): @@ -326,7 +297,7 @@ def test_execute_with_s3_acl_policy(self, mock_load_file, mock_gcs_hook): gcs_provide_file = mock_gcs_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, @@ -351,7 +322,7 @@ def test_execute_without_keep_director_structure(self, mock_hook): gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - with pytest.deprecated_call(): + with pytest.deprecated_call(match=deprecated_call_match): operator = GCSToS3Operator( task_id=TASK_ID, bucket=GCS_BUCKET, From 74ce3a417fa35a1d07e2a40d7f56685561bee62b Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Mon, 3 Jul 2023 13:13:42 +0530 Subject: [PATCH 3/3] added docstrings --- tests/providers/amazon/aws/transfers/test_gcs_to_s3.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py index 16030a85f9fd9..5e64f167ba453 100644 --- a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py @@ -100,6 +100,9 @@ def test_execute_incremental(self, mock_hook): @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute_without_replace(self, mock_hook): + """ + Tests scenario where all the files are already in origin and destination without replace + """ mock_hook.return_value.list.return_value = MOCK_FILES with NamedTemporaryFile() as f: gcs_provide_file = mock_hook.return_value.provide_file @@ -163,6 +166,9 @@ def test_execute_without_replace_with_folder_structure(self, mock_hook, dest_s3_ @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") def test_execute(self, mock_hook): + """ + Tests the scenario where there are no files in destination bucket + """ mock_hook.return_value.list.return_value = MOCK_FILES with NamedTemporaryFile() as f: gcs_provide_file = mock_hook.return_value.provide_file