diff --git a/airflow/providers/amazon/aws/sensors/quicksight.py b/airflow/providers/amazon/aws/sensors/quicksight.py index fed0faf3dadce..fc90ecbe45c5f 100644 --- a/airflow/providers/amazon/aws/sensors/quicksight.py +++ b/airflow/providers/amazon/aws/sensors/quicksight.py @@ -20,7 +20,7 @@ from functools import cached_property from typing import TYPE_CHECKING, Sequence -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook from airflow.providers.amazon.aws.hooks.sts import StsHook from airflow.sensors.base import BaseSensorOperator @@ -78,7 +78,10 @@ def poke(self, context: Context) -> bool: self.log.info("QuickSight Status: %s", quicksight_ingestion_state) if quicksight_ingestion_state in self.errored_statuses: error = self.quicksight_hook.get_error_info(aws_account_id, self.data_set_id, self.ingestion_id) - raise AirflowException(f"The QuickSight Ingestion failed. Error info: {error}") + message = f"The QuickSight Ingestion failed. Error info: {error}" + if self.soft_fail: + raise AirflowSkipException(message) + raise AirflowException(message) return quicksight_ingestion_state == self.success_status @cached_property diff --git a/tests/providers/amazon/aws/sensors/test_quicksight.py b/tests/providers/amazon/aws/sensors/test_quicksight.py index e85c21e896547..ba3ce83789abb 100644 --- a/tests/providers/amazon/aws/sensors/test_quicksight.py +++ b/tests/providers/amazon/aws/sensors/test_quicksight.py @@ -23,7 +23,7 @@ from moto import mock_sts from moto.core import DEFAULT_ACCOUNT_ID -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor @@ -71,3 +71,18 @@ def test_poke_initialized(self, mock_get_status): mock_get_status.return_value = "INITIALIZED" assert self.sensor.poke({}) is False mock_get_status.assert_called_once_with(DEFAULT_ACCOUNT_ID, DATA_SET_ID, INGESTION_ID) + + @pytest.mark.parametrize( + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + ) + @mock.patch("airflow.providers.amazon.aws.hooks.sts.StsHook.get_account_number") + @mock.patch("airflow.providers.amazon.aws.hooks.quicksight.QuickSightHook.get_status") + @mock.patch("airflow.providers.amazon.aws.hooks.quicksight.QuickSightHook.get_error_info") + def test_fail_poke(self, get_error_info, get_status, _, soft_fail, expected_exception): + self.sensor.soft_fail = soft_fail + error = "expected error" + message = f"The QuickSight Ingestion failed. Error info: {error}" + with pytest.raises(expected_exception, match=message): + get_status.return_value = "FAILED" + get_error_info.return_value = message + self.sensor.poke(context={})