diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index b91e75ac39895..d597b6f9e5583 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -25,6 +25,7 @@ import secrets import string import warnings +from collections.abc import Container from contextlib import AbstractContextManager from typing import TYPE_CHECKING, Any, Sequence @@ -292,7 +293,7 @@ def __init__( termination_grace_period: int | None = None, configmaps: list[str] | None = None, skip_exit_code: int | None = None, - skip_on_exit_code: int | None = None, + skip_on_exit_code: int | Container[int] | None = None, base_container_name: str | None = None, deferrable: bool = False, poll_interval: float = 2, @@ -366,9 +367,15 @@ def __init__( warnings.warn( "skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2 ) - self.skip_on_exit_code: int | None = skip_exit_code - else: - self.skip_on_exit_code = skip_on_exit_code + skip_on_exit_code = skip_exit_code + + self.skip_on_exit_code = ( + skip_on_exit_code + if isinstance(skip_on_exit_code, Container) + else [skip_on_exit_code] + if skip_on_exit_code + else [] + ) self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME self.deferrable = deferrable self.poll_interval = poll_interval @@ -696,7 +703,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): and base_container_status.last_state.terminated else None ) - if exit_code == self.skip_on_exit_code: + if exit_code in self.skip_on_exit_code: raise AirflowSkipException( f"Pod {pod and pod.metadata.name} returned exit code " f"{self.skip_on_exit_code}. Skipping." diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index 5af1ba7542ae6..cc2567baabb08 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -22,6 +22,7 @@ import pickle import tarfile import warnings +from collections.abc import Container from io import BytesIO, StringIO from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Iterable, Sequence @@ -215,7 +216,7 @@ def __init__( log_opts_max_file: str | None = None, ipc_mode: str | None = None, skip_exit_code: int | None = None, - skip_on_exit_code: int | None = None, + skip_on_exit_code: int | Container[int] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -281,9 +282,15 @@ def __init__( warnings.warn( "skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2 ) - self.skip_on_exit_code: int | None = skip_exit_code - else: - self.skip_on_exit_code = skip_on_exit_code + skip_on_exit_code = skip_exit_code + + self.skip_on_exit_code = ( + skip_on_exit_code + if isinstance(skip_on_exit_code, Container) + else [skip_on_exit_code] + if skip_on_exit_code + else [] + ) @cached_property def hook(self) -> DockerHook: @@ -384,7 +391,7 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[ self.log.info("%s", log_chunk) result = self.cli.wait(self.container["Id"]) - if result["StatusCode"] == self.skip_on_exit_code: + if result["StatusCode"] in self.skip_on_exit_code: raise AirflowSkipException( f"Docker container returned exit code {self.skip_on_exit_code}. Skipping." ) diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 83c5c8bdcf83f..b9d21b5151e26 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1092,6 +1092,11 @@ def test_task_id_as_name_dag_id_is_ignored(self): ({"skip_on_exit_code": 100}, 100, AirflowSkipException), ({"skip_on_exit_code": 100}, 101, AirflowException), ({"skip_on_exit_code": None}, 100, AirflowException), + ({"skip_on_exit_code": [100]}, 100, AirflowSkipException), + ({"skip_on_exit_code": (100, 101)}, 100, AirflowSkipException), + ({"skip_on_exit_code": 100}, 101, AirflowException), + ({"skip_on_exit_code": [100, 102]}, 101, AirflowException), + ({"skip_on_exit_code": None}, 0, AirflowException), ], ) @patch(f"{POD_MANAGER_CLASS}.await_pod_completion") @@ -1111,8 +1116,11 @@ def test_task_skip_when_pod_exit_with_certain_code( sidecar_container.last_state.terminated.exit_code = 0 remote_pod.return_value.status.container_statuses = [base_container, sidecar_container] - with pytest.raises(expected_exc): + if expected_exc is None: self.run_pod(k) + else: + with pytest.raises(expected_exc): + self.run_pod(k) class TestSuppress: diff --git a/tests/providers/docker/operators/test_docker.py b/tests/providers/docker/operators/test_docker.py index 6f2005589346f..f1e358d30ae45 100644 --- a/tests/providers/docker/operators/test_docker.py +++ b/tests/providers/docker/operators/test_docker.py @@ -519,6 +519,11 @@ def test_execute_unicode_logs(self): ({"skip_on_exit_code": 100}, 100, AirflowSkipException), ({"skip_on_exit_code": 100}, 101, AirflowException), ({"skip_on_exit_code": None}, 100, AirflowException), + ({"skip_on_exit_code": [100]}, 100, AirflowSkipException), + ({"skip_on_exit_code": (100, 101)}, 100, AirflowSkipException), + ({"skip_on_exit_code": 100}, 101, AirflowException), + ({"skip_on_exit_code": [100, 102]}, 101, AirflowException), + ({"skip_on_exit_code": None}, 0, None), ], ) def test_skip(self, extra_kwargs, actual_exit_code, expected_exc): @@ -530,8 +535,11 @@ def test_skip(self, extra_kwargs, actual_exit_code, expected_exc): kwargs.update(**extra_kwargs) operator = DockerOperator(**kwargs) - with pytest.raises(expected_exc): + if expected_exc is None: operator.execute({}) + else: + with pytest.raises(expected_exc): + operator.execute({}) def test_execute_container_fails(self): failed_msg = {"StatusCode": 1}