From 748817f694697df51b283a52fb88d35a39469ab0 Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Mon, 13 May 2024 17:13:33 +0300 Subject: [PATCH 1/2] move kubernetes cli to provider package --- airflow/cli/cli_config.py | 4 +- airflow/cli/commands/kubernetes_command.py | 7 + .../providers/cncf/kubernetes/cli/__init__.py | 16 + .../cncf/kubernetes/cli/kubernetes_command.py | 164 +++++++++++ .../executors/kubernetes_executor.py | 4 +- .../providers/cncf/kubernetes/cli/__init__.py | 16 + .../kubernetes/cli/test_kubernetes_command.py | 273 ++++++++++++++++++ 7 files changed, 480 insertions(+), 4 deletions(-) create mode 100644 airflow/providers/cncf/kubernetes/cli/__init__.py create mode 100644 airflow/providers/cncf/kubernetes/cli/kubernetes_command.py create mode 100644 tests/providers/cncf/kubernetes/cli/__init__.py create mode 100644 tests/providers/cncf/kubernetes/cli/test_kubernetes_command.py diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 82b00f0ff2d55..d9bb2f5897050 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -1860,14 +1860,14 @@ class GroupCommand(NamedTuple): "(created by KubernetesExecutor/KubernetesPodOperator) " "in evicted/failed/succeeded/pending states" ), - func=lazy_load_command("airflow.cli.commands.kubernetes_command.cleanup_pods"), + func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.cleanup_pods"), args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE), ), ActionCommand( name="generate-dag-yaml", help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without " "launching into a cluster", - func=lazy_load_command("airflow.cli.commands.kubernetes_command.generate_pod_yaml"), + func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.generate_pod_yaml"), args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH, ARG_VERBOSE), ), ) diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py index e05d5d2326321..2a6fccf14d186 100644 --- a/airflow/cli/commands/kubernetes_command.py +++ b/airflow/cli/commands/kubernetes_command.py @@ -20,6 +20,7 @@ import os import sys +import warnings from datetime import datetime, timedelta from kubernetes import client @@ -36,6 +37,12 @@ from airflow.utils.cli import get_dag from airflow.utils.providers_configuration_loader import providers_configuration_loaded +warnings.warn( + "Use kubernetes command from providers package, Use cncf.kubernetes provider >= 8.2.1", + DeprecationWarning, + stacklevel=2, +) + @cli_utils.action_cli @providers_configuration_loaded diff --git a/airflow/providers/cncf/kubernetes/cli/__init__.py b/airflow/providers/cncf/kubernetes/cli/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/cncf/kubernetes/cli/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py b/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py new file mode 100644 index 0000000000000..e05d5d2326321 --- /dev/null +++ b/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py @@ -0,0 +1,164 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Kubernetes sub-commands.""" + +from __future__ import annotations + +import os +import sys +from datetime import datetime, timedelta + +from kubernetes import client +from kubernetes.client.api_client import ApiClient +from kubernetes.client.rest import ApiException + +from airflow.models import DagRun, TaskInstance +from airflow.providers.cncf.kubernetes import pod_generator +from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig +from airflow.providers.cncf.kubernetes.kube_client import get_kube_client +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id +from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator +from airflow.utils import cli as cli_utils, yaml +from airflow.utils.cli import get_dag +from airflow.utils.providers_configuration_loader import providers_configuration_loaded + + +@cli_utils.action_cli +@providers_configuration_loaded +def generate_pod_yaml(args): + """Generate yaml files for each task in the DAG. Used for testing output of KubernetesExecutor.""" + execution_date = args.execution_date + dag = get_dag(subdir=args.subdir, dag_id=args.dag_id) + yaml_output_path = args.output_path + dr = DagRun(dag.dag_id, execution_date=execution_date) + kube_config = KubeConfig() + for task in dag.tasks: + ti = TaskInstance(task, None) + ti.dag_run = dr + pod = PodGenerator.construct_pod( + dag_id=args.dag_id, + task_id=ti.task_id, + pod_id=create_pod_id(args.dag_id, ti.task_id), + try_number=ti.try_number, + kube_image=kube_config.kube_image, + date=ti.execution_date, + args=ti.command_as_list(), + pod_override_object=PodGenerator.from_obj(ti.executor_config), + scheduler_job_id="worker-config", + namespace=kube_config.executor_namespace, + base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file), + with_mutation_hook=True, + ) + api_client = ApiClient() + date_string = pod_generator.datetime_to_label_safe_datestring(execution_date) + yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml" + os.makedirs(os.path.dirname(yaml_output_path + "/airflow_yaml_output/"), exist_ok=True) + with open(yaml_output_path + "/airflow_yaml_output/" + yaml_file_name, "w") as output: + sanitized_pod = api_client.sanitize_for_serialization(pod) + output.write(yaml.dump(sanitized_pod)) + print(f"YAML output can be found at {yaml_output_path}/airflow_yaml_output/") + + +@cli_utils.action_cli +@providers_configuration_loaded +def cleanup_pods(args): + """Clean up k8s pods in evicted/failed/succeeded/pending states.""" + namespace = args.namespace + + min_pending_minutes = args.min_pending_minutes + # protect newly created pods from deletion + if min_pending_minutes < 5: + min_pending_minutes = 5 + + # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + # All Containers in the Pod have terminated in success, and will not be restarted. + pod_succeeded = "succeeded" + + # The Pod has been accepted by the Kubernetes cluster, + # but one or more of the containers has not been set up and made ready to run. + pod_pending = "pending" + + # All Containers in the Pod have terminated, and at least one Container has terminated in failure. + # That is, the Container either exited with non-zero status or was terminated by the system. + pod_failed = "failed" + + # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/ + pod_reason_evicted = "evicted" + # If pod is failed and restartPolicy is: + # * Always: Restart Container; Pod phase stays Running. + # * OnFailure: Restart Container; Pod phase stays Running. + # * Never: Pod phase becomes Failed. + pod_restart_policy_never = "never" + + print("Loading Kubernetes configuration") + kube_client = get_kube_client() + print(f"Listing pods in namespace {namespace}") + airflow_pod_labels = [ + "dag_id", + "task_id", + "try_number", + "airflow_version", + ] + list_kwargs = {"namespace": namespace, "limit": 500, "label_selector": ",".join(airflow_pod_labels)} + + while True: + pod_list = kube_client.list_namespaced_pod(**list_kwargs) + for pod in pod_list.items: + pod_name = pod.metadata.name + print(f"Inspecting pod {pod_name}") + pod_phase = pod.status.phase.lower() + pod_reason = pod.status.reason.lower() if pod.status.reason else "" + pod_restart_policy = pod.spec.restart_policy.lower() + current_time = datetime.now(pod.metadata.creation_timestamp.tzinfo) + + if ( + pod_phase == pod_succeeded + or (pod_phase == pod_failed and pod_restart_policy == pod_restart_policy_never) + or (pod_reason == pod_reason_evicted) + or ( + pod_phase == pod_pending + and current_time - pod.metadata.creation_timestamp + > timedelta(minutes=min_pending_minutes) + ) + ): + print( + f'Deleting pod "{pod_name}" phase "{pod_phase}" and reason "{pod_reason}", ' + f'restart policy "{pod_restart_policy}"' + ) + try: + _delete_pod(pod.metadata.name, namespace) + except ApiException as e: + print(f"Can't remove POD: {e}", file=sys.stderr) + else: + print(f"No action taken on pod {pod_name}") + continue_token = pod_list.metadata._continue + if not continue_token: + break + list_kwargs["_continue"] = continue_token + + +def _delete_pod(name, namespace): + """ + Delete a namespaced pod. + + Helper Function for cleanup_pods. + """ + kube_client = get_kube_client() + delete_options = client.V1DeleteOptions() + print(f'Deleting POD "{name}" from "{namespace}" namespace') + api_response = kube_client.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options) + print(api_response) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index f62f021fd264d..e2c22cdc03075 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -128,14 +128,14 @@ "(created by KubernetesExecutor/KubernetesPodOperator) " "in evicted/failed/succeeded/pending states" ), - func=lazy_load_command("airflow.cli.commands.kubernetes_command.cleanup_pods"), + func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.cleanup_pods"), args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE), ), ActionCommand( name="generate-dag-yaml", help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without " "launching into a cluster", - func=lazy_load_command("airflow.cli.commands.kubernetes_command.generate_pod_yaml"), + func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.generate_pod_yaml"), args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH, ARG_VERBOSE), ), ) diff --git a/tests/providers/cncf/kubernetes/cli/__init__.py b/tests/providers/cncf/kubernetes/cli/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/cncf/kubernetes/cli/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/cncf/kubernetes/cli/test_kubernetes_command.py b/tests/providers/cncf/kubernetes/cli/test_kubernetes_command.py new file mode 100644 index 0000000000000..790801a3bbcf9 --- /dev/null +++ b/tests/providers/cncf/kubernetes/cli/test_kubernetes_command.py @@ -0,0 +1,273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import importlib +import os +from unittest import mock +from unittest.mock import MagicMock, call + +import kubernetes +import pytest +from dateutil.parser import parse + +from airflow.cli import cli_parser +from airflow.executors import executor_loader +from airflow.providers.cncf.kubernetes.cli import kubernetes_command +from tests.test_utils.config import conf_vars + +pytestmark = pytest.mark.db_test + + +class TestGenerateDagYamlCommand: + @classmethod + def setup_class(cls): + with conf_vars({("core", "executor"): "KubernetesExecutor"}): + importlib.reload(executor_loader) + importlib.reload(cli_parser) + cls.parser = cli_parser.get_parser() + + def test_generate_dag_yaml(self, tmp_path): + path = tmp_path / "miscellaneous_test_dag_run_after_loop_2020-11-03T00_00_00_plus_00_00.yml" + kubernetes_command.generate_pod_yaml( + self.parser.parse_args( + [ + "kubernetes", + "generate-dag-yaml", + "miscellaneous_test_dag", + "2020-11-03", + "--output-path", + os.fspath(path.parent), + ] + ) + ) + assert sum(1 for _ in path.parent.iterdir()) == 1 + output_path = path.parent / "airflow_yaml_output" + assert sum(1 for _ in output_path.iterdir()) == 6 + assert os.path.isfile(output_path / path.name) + assert (output_path / path.name).stat().st_size > 0 + + +class TestCleanUpPodsCommand: + label_selector = "dag_id,task_id,try_number,airflow_version" + + @classmethod + def setup_class(cls): + with conf_vars({("core", "executor"): "KubernetesExecutor"}): + importlib.reload(executor_loader) + importlib.reload(cli_parser) + cls.parser = cli_parser.get_parser() + + @mock.patch("kubernetes.client.CoreV1Api.delete_namespaced_pod") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") + def test_delete_pod(self, load_incluster_config, delete_namespaced_pod): + kubernetes_command._delete_pod("dummy", "awesome-namespace") + delete_namespaced_pod.assert_called_with(body=mock.ANY, name="dummy", namespace="awesome-namespace") + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") + def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = "dummy" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Running" + pod1.status.reason = None + pods = MagicMock() + pods.metadata._continue = None + pods.items = [pod1] + list_namespaced_pod.return_value = pods + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + list_namespaced_pod.assert_called_once_with( + namespace="awesome-namespace", limit=500, label_selector=self.label_selector + ) + delete_pod.assert_not_called() + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") + def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = "dummy" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Succeeded" + pod1.status.reason = None + pods = MagicMock() + pods.metadata._continue = None + pods.items = [pod1] + list_namespaced_pod.return_value = pods + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + list_namespaced_pod.assert_called_once_with( + namespace="awesome-namespace", limit=500, label_selector=self.label_selector + ) + delete_pod.assert_called_with("dummy", "awesome-namespace") + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("kubernetes.config.load_incluster_config") + def test_no_cleanup_failed_pods_wo_restart_policy_never( + self, load_incluster_config, list_namespaced_pod, delete_pod + ): + pod1 = MagicMock() + pod1.metadata.name = "dummy2" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Failed" + pod1.status.reason = None + pod1.spec.restart_policy = "Always" + pods = MagicMock() + pods.metadata._continue = None + pods.items = [pod1] + list_namespaced_pod.return_value = pods + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + list_namespaced_pod.assert_called_once_with( + namespace="awesome-namespace", limit=500, label_selector=self.label_selector + ) + delete_pod.assert_not_called() + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("kubernetes.config.load_incluster_config") + def test_cleanup_failed_pods_w_restart_policy_never( + self, load_incluster_config, list_namespaced_pod, delete_pod + ): + pod1 = MagicMock() + pod1.metadata.name = "dummy3" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Failed" + pod1.status.reason = None + pod1.spec.restart_policy = "Never" + pods = MagicMock() + pods.metadata._continue = None + pods.items = [pod1] + list_namespaced_pod.return_value = pods + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + list_namespaced_pod.assert_called_once_with( + namespace="awesome-namespace", limit=500, label_selector=self.label_selector + ) + delete_pod.assert_called_with("dummy3", "awesome-namespace") + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("kubernetes.config.load_incluster_config") + def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = "dummy4" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Failed" + pod1.status.reason = "Evicted" + pod1.spec.restart_policy = "Never" + pods = MagicMock() + pods.metadata._continue = None + pods.items = [pod1] + list_namespaced_pod.return_value = pods + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + list_namespaced_pod.assert_called_once_with( + namespace="awesome-namespace", limit=500, label_selector=self.label_selector + ) + delete_pod.assert_called_with("dummy4", "awesome-namespace") + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("kubernetes.config.load_incluster_config") + def test_cleanup_pending_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = "dummy5" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Pending" + pod1.status.reason = "Unschedulable" + pods = MagicMock() + pods.metadata._continue = None + pods.items = [pod1] + list_namespaced_pod.return_value = pods + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + list_namespaced_pod.assert_called_once_with( + namespace="awesome-namespace", limit=500, label_selector=self.label_selector + ) + delete_pod.assert_called_with("dummy5", "awesome-namespace") + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("kubernetes.config.load_incluster_config") + def test_cleanup_api_exception_continue(self, load_incluster_config, list_namespaced_pod, delete_pod): + delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0) + pod1 = MagicMock() + pod1.metadata.name = "dummy" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Succeeded" + pod1.status.reason = None + pods = MagicMock() + pods.metadata._continue = None + pods.items = [pod1] + list_namespaced_pod.return_value = pods + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + list_namespaced_pod.assert_called_once_with( + namespace="awesome-namespace", limit=500, label_selector=self.label_selector + ) + load_incluster_config.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") + @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") + @mock.patch("kubernetes.config.load_incluster_config") + def test_list_pod_with_continue_token(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = "dummy" + pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") + pod1.status.phase = "Succeeded" + pod1.status.reason = None + pods = MagicMock() + pods.metadata._continue = "dummy-token" + pods.items = [pod1] + next_pods = MagicMock() + next_pods.metadata._continue = None + next_pods.items = [pod1] + list_namespaced_pod.side_effect = [pods, next_pods] + kubernetes_command.cleanup_pods( + self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) + ) + calls = [ + call.first(namespace="awesome-namespace", limit=500, label_selector=self.label_selector), + call.second( + namespace="awesome-namespace", + limit=500, + label_selector=self.label_selector, + _continue="dummy-token", + ), + ] + list_namespaced_pod.assert_has_calls(calls) + delete_pod.assert_called_with("dummy", "awesome-namespace") + load_incluster_config.assert_called_once() From b03e9d59d7d9592c77013a35e151933cd6f7a536 Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Mon, 13 May 2024 19:59:39 +0300 Subject: [PATCH 2/2] add deprecation ignore for cncf provider --- tests/deprecations_ignore.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index 1a8c8223f68a0..b3c704381d348 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -335,6 +335,7 @@ - tests/providers/common/sql/operators/test_sql.py::TestSqlBranch::test_invalid_query_result_with_dag_run - tests/providers/common/sql/operators/test_sql.py::TestSqlBranch::test_with_skip_in_branch_downstream_dependencies - tests/providers/common/sql/operators/test_sql.py::TestSqlBranch::test_with_skip_in_branch_downstream_dependencies2 +- tests/providers/cncf/kubernetes/cli/test_kubernetes_command.py::TestGenerateDagYamlCommand::test_generate_dag_yaml - tests/providers/databricks/hooks/test_databricks_sql.py::test_incorrect_column_names - tests/providers/databricks/hooks/test_databricks_sql.py::test_no_query - tests/providers/databricks/hooks/test_databricks_sql.py::test_query