From 8f306d1e917f1e850d5679bdc99ecc49502ef5b9 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 28 Sep 2023 17:12:46 +0530 Subject: [PATCH 1/3] Add testcase to ensure the soft_fail param is respected --- tests/sensors/test_external_task_sensor.py | 169 ++++++++++++++++++++- 1 file changed, 168 insertions(+), 1 deletion(-) diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 4af96d2d40d27..8eb70e44095f4 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -19,6 +19,7 @@ import logging import os +import re import tempfile import zipfile from datetime import time, timedelta @@ -28,7 +29,7 @@ from airflow import exceptions, settings from airflow.decorators import task as task_deco -from airflow.exceptions import AirflowException, AirflowSensorTimeout, TaskDeferred +from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException, TaskDeferred from airflow.models import DagBag, DagRun, TaskInstance from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel @@ -837,6 +838,172 @@ def test_external_task_group_when_there_is_no_TIs(self): ignore_ti_state=True, ) + @pytest.mark.parametrize( + "soft_fail, expected_exception, kwargs, expected_message", + ( + ( + False, + AirflowException, + { + "external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], + "failed_states": [State.FAILED], + }, + f"Some of the external tasks {re.escape(str([TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]))}" + f" in DAG {TEST_DAG_ID} failed.", + ), + ( + False, + AirflowException, + { + "external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], + "failed_states": [State.FAILED], + }, + f"The external task_group '{re.escape(str([TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]))}'" + f" in DAG '{TEST_DAG_ID}' failed.", + ), + ( + False, + AirflowException, + {"failed_states": [State.FAILED]}, + f"The external DAG {TEST_DAG_ID} failed.", + ), + ( + True, + AirflowSkipException, + { + "external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], + "failed_states": [State.FAILED], + }, + "Skipping due to soft_fail is set to True.", + ), + ( + True, + AirflowSkipException, + { + "external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], + "failed_states": [State.FAILED], + }, + "Skipping due to soft_fail is set to True.", + ), + ( + True, + AirflowSkipException, + {"failed_states": [State.FAILED]}, + "Skipping due to soft_fail is set to True.", + ), + ), + ) + @mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count") + @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + def test_fail_poke( + self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message + ): + _get_dttm_filter.return_value = [] + get_count.return_value = 1 + op = ExternalTaskSensor( + task_id="test_external_task_duplicate_task_ids", + external_dag_id=TEST_DAG_ID, + allowed_states=["success"], + dag=self.dag, + soft_fail=soft_fail, + deferrable=False, + **kwargs, + ) + with pytest.raises(expected_exception, match=expected_message): + op.execute(context={}) + + @pytest.mark.parametrize( + "soft_fail, expected_exception, response_get_current, response_exists, kwargs, expected_message", + ( + (False, AirflowException, None, None, {}, f"The external DAG {TEST_DAG_ID} does not exist."), + ( + False, + AirflowException, + DAG(dag_id="test"), + False, + {}, + f"The external DAG {TEST_DAG_ID} was deleted.", + ), + ( + False, + AirflowException, + DAG(dag_id="test"), + True, + {"external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, + f"The external task {TEST_TASK_ID} in DAG {TEST_DAG_ID} does not exist.", + ), + ( + False, + AirflowException, + DAG(dag_id="test"), + True, + {"external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, + f"The external task group '{re.escape(str([TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]))}'" + f" in DAG '{TEST_DAG_ID}' does not exist.", + ), + (True, AirflowSkipException, None, None, {}, "Skipping due to soft_fail is set to True."), + ( + True, + AirflowSkipException, + DAG(dag_id="test"), + False, + {}, + "Skipping due to soft_fail is set to True.", + ), + ( + True, + AirflowSkipException, + DAG(dag_id="test"), + True, + {"external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, + "Skipping due to soft_fail is set to True.", + ), + ( + True, + AirflowSkipException, + DAG(dag_id="test"), + True, + {"external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, + "Skipping due to soft_fail is set to True.", + ), + ), + ) + @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + @mock.patch("airflow.models.dagbag.DagBag.get_dag") + @mock.patch("os.path.exists") + @mock.patch("airflow.models.dag.DagModel.get_current") + def test_fail__check_for_existence( + self, + get_current, + exists, + get_dag, + _get_dttm_filter, + soft_fail, + expected_exception, + response_get_current, + response_exists, + kwargs, + expected_message, + ): + _get_dttm_filter.return_value = [] + get_current.return_value = response_get_current + exists.return_value = response_exists + get_dag_response = mock.MagicMock() + get_dag.return_value = get_dag_response + get_dag_response.has_task.return_value = False + get_dag_response.has_task_group.return_value = False + op = ExternalTaskSensor( + task_id="test_external_task_duplicate_task_ids", + external_dag_id=TEST_DAG_ID, + allowed_states=["success"], + dag=self.dag, + soft_fail=soft_fail, + check_existence=True, + **kwargs, + ) + with pytest.raises(expected_exception, match=expected_message): + op.execute(context={}) + class TestExternalTaskAsyncSensor: TASK_ID = "external_task_sensor_check" From c2db74685b489bbabed63fe18f49e5f27c0da1c8 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 28 Sep 2023 21:25:58 +0530 Subject: [PATCH 2/3] Refactor testcase --- tests/sensors/test_external_task_sensor.py | 33 ++++++---------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 1bbd22923c2ba..539ce25dfd2cd 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -840,11 +840,9 @@ def test_external_task_group_when_there_is_no_TIs(self): ) @pytest.mark.parametrize( - "soft_fail, expected_exception, kwargs, expected_message", + "kwargs, expected_message", ( ( - False, - AirflowException, { "external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], "failed_states": [State.FAILED], @@ -853,8 +851,6 @@ def test_external_task_group_when_there_is_no_TIs(self): f" in DAG {TEST_DAG_ID} failed.", ), ( - False, - AirflowException, { "external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], "failed_states": [State.FAILED], @@ -863,34 +859,21 @@ def test_external_task_group_when_there_is_no_TIs(self): f" in DAG '{TEST_DAG_ID}' failed.", ), ( - False, - AirflowException, {"failed_states": [State.FAILED]}, f"The external DAG {TEST_DAG_ID} failed.", ), + ), + ) + @pytest.mark.parametrize( + "soft_fail, expected_exception", + ( ( - True, - AirflowSkipException, - { - "external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], - "failed_states": [State.FAILED], - }, - "Skipping due to soft_fail is set to True.", - ), - ( - True, - AirflowSkipException, - { - "external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE], - "failed_states": [State.FAILED], - }, - "Skipping due to soft_fail is set to True.", + False, + AirflowException, ), ( True, AirflowSkipException, - {"failed_states": [State.FAILED]}, - "Skipping due to soft_fail is set to True.", ), ), ) From 73c4af3ca961a06a85838b1fbfa6c27aa023948c Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Fri, 29 Sep 2023 12:11:26 +0530 Subject: [PATCH 3/3] Fix testcase --- tests/sensors/test_external_task_sensor.py | 35 ++++++---------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 539ce25dfd2cd..376aa27bb16f0 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -897,58 +897,40 @@ def test_fail_poke( op.execute(context={}) @pytest.mark.parametrize( - "soft_fail, expected_exception, response_get_current, response_exists, kwargs, expected_message", + "response_get_current, response_exists, kwargs, expected_message", ( - (False, AirflowException, None, None, {}, f"The external DAG {TEST_DAG_ID} does not exist."), + (None, None, {}, f"The external DAG {TEST_DAG_ID} does not exist."), ( - False, - AirflowException, DAG(dag_id="test"), False, {}, f"The external DAG {TEST_DAG_ID} was deleted.", ), ( - False, - AirflowException, DAG(dag_id="test"), True, {"external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, f"The external task {TEST_TASK_ID} in DAG {TEST_DAG_ID} does not exist.", ), ( - False, - AirflowException, DAG(dag_id="test"), True, {"external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, f"The external task group '{re.escape(str([TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]))}'" f" in DAG '{TEST_DAG_ID}' does not exist.", ), - (True, AirflowSkipException, None, None, {}, "Skipping due to soft_fail is set to True."), + ), + ) + @pytest.mark.parametrize( + "soft_fail, expected_exception", + ( ( - True, - AirflowSkipException, - DAG(dag_id="test"), False, - {}, - "Skipping due to soft_fail is set to True.", - ), - ( - True, - AirflowSkipException, - DAG(dag_id="test"), - True, - {"external_task_ids": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, - "Skipping due to soft_fail is set to True.", + AirflowException, ), ( True, AirflowSkipException, - DAG(dag_id="test"), - True, - {"external_task_group_id": [TEST_TASK_ID, TEST_TASK_ID_ALTERNATE]}, - "Skipping due to soft_fail is set to True.", ), ), ) @@ -985,6 +967,7 @@ def test_fail__check_for_existence( check_existence=True, **kwargs, ) + expected_message = "Skipping due to soft_fail is set to True." if soft_fail else expected_message with pytest.raises(expected_exception, match=expected_message): op.execute(context={})