Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import secrets
import string
import warnings
from collections.abc import Container
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING, Any, Sequence

Expand Down Expand Up @@ -292,7 +293,7 @@ def __init__(
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_exit_code: int | None = None,
skip_on_exit_code: int | None = None,
skip_on_exit_code: int | Container[int] | None = None,
base_container_name: str | None = None,
deferrable: bool = False,
poll_interval: float = 2,
Expand Down Expand Up @@ -366,9 +367,15 @@ def __init__(
warnings.warn(
"skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2
)
self.skip_on_exit_code: int | None = skip_exit_code
else:
self.skip_on_exit_code = skip_on_exit_code
skip_on_exit_code = skip_exit_code

self.skip_on_exit_code = (
skip_on_exit_code
if isinstance(skip_on_exit_code, Container)
else [skip_on_exit_code]
if skip_on_exit_code
else []
)
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.deferrable = deferrable
self.poll_interval = poll_interval
Expand Down Expand Up @@ -696,7 +703,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
and base_container_status.last_state.terminated
else None
)
if exit_code == self.skip_on_exit_code:
if exit_code in self.skip_on_exit_code:
raise AirflowSkipException(
f"Pod {pod and pod.metadata.name} returned exit code "
f"{self.skip_on_exit_code}. Skipping."
Expand Down
17 changes: 12 additions & 5 deletions airflow/providers/docker/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pickle
import tarfile
import warnings
from collections.abc import Container
from io import BytesIO, StringIO
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Iterable, Sequence
Expand Down Expand Up @@ -215,7 +216,7 @@ def __init__(
log_opts_max_file: str | None = None,
ipc_mode: str | None = None,
skip_exit_code: int | None = None,
skip_on_exit_code: int | None = None,
skip_on_exit_code: int | Container[int] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down Expand Up @@ -281,9 +282,15 @@ def __init__(
warnings.warn(
"skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2
)
self.skip_on_exit_code: int | None = skip_exit_code
else:
self.skip_on_exit_code = skip_on_exit_code
skip_on_exit_code = skip_exit_code

self.skip_on_exit_code = (
skip_on_exit_code
if isinstance(skip_on_exit_code, Container)
else [skip_on_exit_code]
if skip_on_exit_code
else []
)

@cached_property
def hook(self) -> DockerHook:
Expand Down Expand Up @@ -384,7 +391,7 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[
self.log.info("%s", log_chunk)

result = self.cli.wait(self.container["Id"])
if result["StatusCode"] == self.skip_on_exit_code:
if result["StatusCode"] in self.skip_on_exit_code:
raise AirflowSkipException(
f"Docker container returned exit code {self.skip_on_exit_code}. Skipping."
)
Expand Down
10 changes: 9 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,11 @@ def test_task_id_as_name_dag_id_is_ignored(self):
({"skip_on_exit_code": 100}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": [100]}, 100, AirflowSkipException),
({"skip_on_exit_code": (100, 101)}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": [100, 102]}, 101, AirflowException),
({"skip_on_exit_code": None}, 0, AirflowException),

@eladkal eladkal Apr 20, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KPO throws AirflowException for the 0 case:

],
)
@patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
Expand All @@ -1111,8 +1116,11 @@ def test_task_skip_when_pod_exit_with_certain_code(
sidecar_container.last_state.terminated.exit_code = 0
remote_pod.return_value.status.container_statuses = [base_container, sidecar_container]

with pytest.raises(expected_exc):
if expected_exc is None:
self.run_pod(k)
else:
with pytest.raises(expected_exc):
self.run_pod(k)


class TestSuppress:
Expand Down
10 changes: 9 additions & 1 deletion tests/providers/docker/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,11 @@ def test_execute_unicode_logs(self):
({"skip_on_exit_code": 100}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": [100]}, 100, AirflowSkipException),
({"skip_on_exit_code": (100, 101)}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": [100, 102]}, 101, AirflowException),
({"skip_on_exit_code": None}, 0, None),
],
)
def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
Expand All @@ -530,8 +535,11 @@ def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
kwargs.update(**extra_kwargs)
operator = DockerOperator(**kwargs)

with pytest.raises(expected_exc):
if expected_exc is None:
operator.execute({})
else:
with pytest.raises(expected_exc):
operator.execute({})

def test_execute_container_fails(self):
failed_msg = {"StatusCode": 1}
Expand Down