From 8594b969f6b4331716c882bcb1ddc5dec9b50726 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 2 Jun 2022 09:36:08 -0700 Subject: [PATCH] Don't crash scheduler if exec config has old k8s objects From time to time k8s library objects change their attrs. If executor config is stored with old version, and unpickled with new version, we can get attribute errors that can crash the scheduler (see https://github.com/apache/airflow/issues/23727). Here we update handling so that we fail the task but don't crash the scheduler. --- airflow/exceptions.py | 4 +++ airflow/executors/kubernetes_executor.py | 12 +++++-- airflow/kubernetes/pod_generator.py | 7 ++-- airflow/models/taskinstance.py | 16 ++++++++- tests/executors/test_kubernetes_executor.py | 39 +++++++++++++++++++++ tests/kubernetes/test_pod_generator.py | 30 +++++++++++++++- tests/models/test_taskinstance.py | 21 ++++++++++- 7 files changed, 122 insertions(+), 7 deletions(-) diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 2f1e53e182a10..1f30ef525f82d 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -306,3 +306,7 @@ def __repr__(self) -> str: class TaskDeferralError(AirflowException): """Raised when a task failed during deferral for some reason.""" + + +class PodReconciliationError(AirflowException): + """Raised when an error is encountered while trying to merge pod configs.""" diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index c76cf58f418d4..e510da2b314d6 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -35,7 +35,7 @@ from kubernetes.client.rest import ApiException from urllib3.exceptions import ReadTimeoutError -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, PodReconciliationError from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType from airflow.kubernetes import pod_generator from airflow.kubernetes.kube_client import get_kube_client @@ -300,8 +300,9 @@ def run_next(self, next_job: KubernetesJobType) -> None: and store relevant info in the current_jobs map so we can track the job's status """ - self.log.info('Kubernetes job is %s', str(next_job).replace("\n", " ")) key, command, kube_executor_config, pod_template_file = next_job + self.log.info('Kubernetes job is %s', key) + dag_id, task_id, run_id, try_number, map_index = key if command[0:3] != ["airflow", "tasks", "run"]: @@ -617,6 +618,13 @@ def sync(self) -> None: task = self.task_queue.get_nowait() try: self.kube_scheduler.run_next(task) + except PodReconciliationError as e: + self.log.error( + "Pod reconciliation failed, likely due to kubernetes library upgrade. " + "Try clearing the task to re-run.", + exc_info=True, + ) + self.fail(task[0], e) except ApiException as e: # These codes indicate something is wrong with pod definition; otherwise we assume pod diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 52b45801ccabc..8a86919a65b75 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -34,7 +34,7 @@ from kubernetes.client import models as k8s from kubernetes.client.api_client import ApiClient -from airflow.exceptions import AirflowConfigException +from airflow.exceptions import AirflowConfigException, PodReconciliationError from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated from airflow.utils import yaml from airflow.version import version as airflow_version @@ -389,7 +389,10 @@ def construct_pod( # Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor pod_list = [base_worker_pod, pod_override_object, dynamic_pod] - return reduce(PodGenerator.reconcile_pods, pod_list) + try: + return reduce(PodGenerator.reconcile_pods, pod_list) + except Exception as e: + raise PodReconciliationError from e @staticmethod def serialize_pod(pod: k8s.V1Pod) -> dict: diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 0f5d49b819762..7685a53764a12 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -400,6 +400,20 @@ def key(self) -> "TaskInstanceKey": return self +def _executor_config_comparator(x, y): + """ + The TaskInstance.executor_config attribute is a pickled object that may contain + kubernetes objects. If the installed library version has changed since the + object was originally pickled, due to the underlying ``__eq__`` method on these + objects (which converts them to JSON), we may encounter attribute errors. In this + case we should replace the stored object. + """ + try: + return x == y + except AttributeError: + return False + + class TaskInstance(Base, LoggingMixin): """ Task instances store the state of a task instance. This table is the @@ -442,7 +456,7 @@ class TaskInstance(Base, LoggingMixin): queued_dttm = Column(UtcDateTime) queued_by_job_id = Column(Integer) pid = Column(Integer) - executor_config = Column(PickleType(pickler=dill)) + executor_config = Column(PickleType(pickler=dill, comparator=_executor_config_comparator)) external_executor_id = Column(String(ID_LEN, **COLLATION_ARGS)) diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 954f4f06001db..8ffeb5624b8cc 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -29,6 +29,7 @@ from urllib3 import HTTPResponse from airflow import AirflowException +from airflow.exceptions import PodReconciliationError from airflow.models.taskinstance import TaskInstanceKey from airflow.operators.bash import BashOperator from airflow.utils import timezone @@ -272,6 +273,44 @@ def test_run_next_exception_requeue( assert kubernetes_executor.task_queue.empty() assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed' + ) + @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') + @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') + def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kubernetes_job_watcher): + """ + When construct_pod raises PodReconciliationError, we should fail the task. + """ + import sys + + path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' + + mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True) + fail_msg = 'test message' + mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=PodReconciliationError(fail_msg)) + mock_get_kube_client.return_value = mock_kube_client + mock_api_client = mock.MagicMock() + mock_api_client.sanitize_for_serialization.return_value = {} + mock_kube_client.api_client = mock_api_client + config = {('kubernetes', 'pod_template_file'): path} + with conf_vars(config): + kubernetes_executor = self.kubernetes_executor + kubernetes_executor.start() + # Execute a task while the Api Throws errors + try_number = 1 + task_instance_key = TaskInstanceKey('dag', 'task', 'run_id', try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=['airflow', 'tasks', 'run', 'true', 'some_parameter'], + ) + kubernetes_executor.sync() + + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg + @mock.patch('airflow.executors.kubernetes_executor.KubeConfig') @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync') @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks') diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index d220872187cb1..df5efb0d06c9f 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -19,6 +19,7 @@ import sys import uuid from unittest import mock +from unittest.mock import MagicMock import pytest from dateutil import parser @@ -26,7 +27,7 @@ from parameterized import parameterized from airflow import __version__ -from airflow.exceptions import AirflowConfigException +from airflow.exceptions import AirflowConfigException, PodReconciliationError from airflow.kubernetes.pod_generator import ( PodDefaults, PodGenerator, @@ -520,6 +521,33 @@ def test_construct_pod_empty_executor_config(self, mock_uuid): worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config) assert worker_config_result == sanitized_result + @mock.patch('uuid.uuid4') + def test_construct_pod_attribute_error(self, mock_uuid): + """ + After upgrading k8s library we might get attribute error. + In this case it should raise PodReconciliationError + """ + path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' + worker_config = PodGenerator.deserialize_model_file(path) + mock_uuid.return_value = self.static_uuid + executor_config = MagicMock() + executor_config.side_effect = AttributeError('error') + + with pytest.raises(PodReconciliationError): + PodGenerator.construct_pod( + dag_id='dag_id', + task_id='task_id', + pod_id='pod_id', + kube_image='test-image', + try_number=3, + date=self.execution_date, + args=['command'], + pod_override_object=executor_config, + base_worker_pod=worker_config, + namespace='namespace', + scheduler_job_id='uuid', + ) + @mock.patch('uuid.uuid4') def test_ensure_max_label_length(self, mock_uuid): mock_uuid.return_value = self.static_uuid diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index a1d180fa1ee56..1b17ccc773c05 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -57,7 +57,7 @@ ) from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskfail import TaskFail -from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstance import TaskInstance, _executor_config_comparator from airflow.models.taskmap import TaskMap from airflow.models.xcom import XCOM_RETURN_KEY from airflow.operators.bash import BashOperator @@ -2859,3 +2859,22 @@ def get_extra_env(): echo_task = dag.get_task("echo") assert "get_extra_env" in echo_task.upstream_task_ids + + +def test_executor_config_comparator(): + """ + When comparison raises AttributeError, return False. + This can happen when executor config contains kubernetes objects pickled + under older kubernetes library version. + """ + + class MockAttrError: + def __eq__(self, other): + raise AttributeError('hello') + + a = MockAttrError() + with pytest.raises(AttributeError): + # just verify for ourselves that this throws + assert a == a + assert _executor_config_comparator(a, a) is False + assert _executor_config_comparator('a', 'a') is True