From 048bf11356bcb04cf0ec766e7e677c11a42c0797 Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Fri, 10 Jun 2022 14:02:18 +0300 Subject: [PATCH 1/2] Move callback_sink from executor to scheduler --- airflow/executors/base_executor.py | 14 ----- .../executors/celery_kubernetes_executor.py | 12 ----- .../executors/local_kubernetes_executor.py | 12 ----- airflow/jobs/scheduler_job.py | 33 ++++++++---- .../test_celery_kubernetes_executor.py | 12 ----- .../test_local_kubernetes_executor.py | 12 ----- tests/jobs/test_scheduler_job.py | 53 ++++++++++++------- 7 files changed, 57 insertions(+), 91 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 7fcbd0642e14d..626539a4379bc 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -19,8 +19,6 @@ from collections import OrderedDict from typing import Any, Counter, Dict, List, Optional, Set, Tuple, Union -from airflow.callbacks.base_callback_sink import BaseCallbackSink -from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.stats import Stats @@ -63,7 +61,6 @@ class BaseExecutor(LoggingMixin): """ job_id: Union[None, int, str] = None - callback_sink: Optional[BaseCallbackSink] = None def __init__(self, parallelism: int = PARALLELISM): super().__init__() @@ -350,14 +347,3 @@ def debug_dump(self): len(self.event_buffer), "\n\t".join(map(repr, self.event_buffer.items())), ) - - def send_callback(self, request: CallbackRequest) -> None: - """Sends callback for execution. - - Provides a default implementation which sends the callback to the `callback_sink` object. - - :param request: Callback request to be executed. - """ - if not self.callback_sink: - raise ValueError("Callback sink is not ready.") - self.callback_sink.send(request) diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py index b1edc32235727..5b934160abe1e 100644 --- a/airflow/executors/celery_kubernetes_executor.py +++ b/airflow/executors/celery_kubernetes_executor.py @@ -17,8 +17,6 @@ # under the License. from typing import Dict, List, Optional, Set, Union -from airflow.callbacks.base_callback_sink import BaseCallbackSink -from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType from airflow.executors.celery_executor import CeleryExecutor @@ -37,7 +35,6 @@ class CeleryKubernetesExecutor(LoggingMixin): """ supports_ad_hoc_ti_run: bool = True - callback_sink: Optional[BaseCallbackSink] = None KUBERNETES_QUEUE = conf.get('celery_kubernetes_executor', 'kubernetes_queue') @@ -207,12 +204,3 @@ def debug_dump(self) -> None: self.celery_executor.debug_dump() self.log.info("Dumping KubernetesExecutor state") self.kubernetes_executor.debug_dump() - - def send_callback(self, request: CallbackRequest) -> None: - """Sends callback for execution. - - :param request: Callback request to be executed. - """ - if not self.callback_sink: - raise ValueError("Callback sink is not ready.") - self.callback_sink.send(request) diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py index 9944cfe1ef1fa..cb1ddf7c9d220 100644 --- a/airflow/executors/local_kubernetes_executor.py +++ b/airflow/executors/local_kubernetes_executor.py @@ -17,8 +17,6 @@ # under the License. from typing import Dict, List, Optional, Set, Union -from airflow.callbacks.base_callback_sink import BaseCallbackSink -from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType from airflow.executors.kubernetes_executor import KubernetesExecutor @@ -37,7 +35,6 @@ class LocalKubernetesExecutor(LoggingMixin): """ supports_ad_hoc_ti_run: bool = True - callback_sink: Optional[BaseCallbackSink] = None KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 'kubernetes_queue') @@ -206,12 +203,3 @@ def debug_dump(self) -> None: self.local_executor.debug_dump() self.log.info("Dumping KubernetesExecutor state") self.kubernetes_executor.debug_dump() - - def send_callback(self, request: CallbackRequest) -> None: - """Sends callback for execution. - - :param request: Callback request to be executed. - """ - if not self.callback_sink: - raise ValueError("Callback sink is not ready.") - self.callback_sink.send(request) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 22ba5decb4111..97c40b4cb0183 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -34,7 +34,13 @@ from sqlalchemy.orm.session import Session, make_transient from airflow import models, settings -from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest +from airflow.callbacks.base_callback_sink import BaseCallbackSink +from airflow.callbacks.callback_requests import ( + CallbackRequest, + DagCallbackRequest, + SlaCallbackRequest, + TaskCallbackRequest, +) from airflow.callbacks.database_callback_sink import DatabaseCallbackSink from airflow.callbacks.pipe_callback_sink import PipeCallbackSink from airflow.configuration import conf @@ -165,6 +171,8 @@ def __init__( DeprecationWarning, ) + self._callback_sink: Optional[BaseCallbackSink] = None + def register_signals(self) -> None: """Register signals that stop child processes""" signal.signal(signal.SIGINT, self._exit_gracefully) @@ -687,7 +695,7 @@ def _process_executor_events(self, session: Session = None) -> int: simple_task_instance=SimpleTaskInstance.from_ti(ti), msg=msg % (ti, state, ti.state, info), ) - self.executor.send_callback(request) + self._send_callback(request) else: ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session) @@ -721,12 +729,10 @@ def _execute(self) -> None: self.executor.job_id = self.id if self.processor_agent: self.log.debug("Using PipeCallbackSink as callback sink.") - self.executor.callback_sink = PipeCallbackSink( - get_sink_pipe=self.processor_agent.get_callbacks_pipe - ) + self._callback_sink = PipeCallbackSink(get_sink_pipe=self.processor_agent.get_callbacks_pipe) else: self.log.debug("Using DatabaseCallbackSink as callback sink.") - self.executor.callback_sink = DatabaseCallbackSink() + self._callback_sink = DatabaseCallbackSink() self.executor.start() @@ -1203,7 +1209,7 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session=None): def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallbackRequest] = None): self._send_sla_callbacks_to_processor(dag) if callback: - self.executor.send_callback(callback) + self._send_callback(callback) else: self.log.debug("callback is empty") @@ -1217,7 +1223,7 @@ def _send_sla_callbacks_to_processor(self, dag: DAG): return request = SlaCallbackRequest(full_filepath=dag.fileloc, dag_id=dag.dag_id) - self.executor.send_callback(request) + self._send_callback(request) @provide_session def _emit_pool_metrics(self, session: Session = None) -> None: @@ -1378,5 +1384,14 @@ def _find_zombies(self, session): msg=f"Detected {ti} as zombie", ) self.log.error("Detected zombie job: %s", request) - self.executor.send_callback(request) + self._send_callback(request) Stats.incr('zombies_killed') + + def _send_callback(self, request: CallbackRequest) -> None: + """Sends callback for execution. + + :param request: Callback request to be executed. + """ + if not self._callback_sink: + raise ValueError("Callback sink is not ready.") + self._callback_sink.send(request) diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index 5681476274cc9..84ca14c5f08d7 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -19,7 +19,6 @@ from parameterized import parameterized -from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.celery_executor import CeleryExecutor from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor @@ -224,14 +223,3 @@ def test_kubernetes_executor_knows_its_queue(self): assert k8s_executor_mock.kubernetes_queue == conf.get( 'celery_kubernetes_executor', 'kubernetes_queue' ) - - def test_send_callback(self): - cel_exec = CeleryExecutor() - k8s_exec = KubernetesExecutor() - cel_k8s_exec = CeleryKubernetesExecutor(cel_exec, k8s_exec) - cel_k8s_exec.callback_sink = mock.MagicMock() - - callback = CallbackRequest(full_filepath="fake") - cel_k8s_exec.send_callback(callback) - - cel_k8s_exec.callback_sink.send.assert_called_once_with(callback) diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py index 48d09ad99e5ed..274175f12750a 100644 --- a/tests/executors/test_local_kubernetes_executor.py +++ b/tests/executors/test_local_kubernetes_executor.py @@ -17,7 +17,6 @@ # under the License. from unittest import mock -from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.local_executor import LocalExecutor from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor @@ -68,14 +67,3 @@ def test_kubernetes_executor_knows_its_queue(self): LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) assert k8s_executor_mock.kubernetes_queue == conf.get('local_kubernetes_executor', 'kubernetes_queue') - - def test_send_callback(self): - local_executor_mock = mock.MagicMock() - k8s_executor_mock = mock.MagicMock() - local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) - local_k8s_exec.callback_sink = mock.MagicMock() - - callback = CallbackRequest(full_filepath="fake") - local_k8s_exec.send_callback(callback) - - local_k8s_exec.callback_sink.send.assert_called_once_with(callback) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 5837623bdacf2..f8358af6307c3 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -221,6 +221,7 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_ mock_task_callback.return_value = task_callback self.scheduler_job = SchedulerJob(executor=executor) self.scheduler_job.processor_agent = mock.MagicMock() + self.scheduler_job._callback_sink = mock.MagicMock() ti1.state = State.QUEUED session.merge(ti1) session.commit() @@ -230,7 +231,7 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_ self.scheduler_job._process_executor_events(session=session) ti1.refresh_from_db(session=session) assert ti1.state == State.FAILED - self.scheduler_job.executor.callback_sink.send.assert_not_called() + self.scheduler_job._callback_sink.send.assert_not_called() self.scheduler_job.processor_agent.reset_mock() # ti in success state @@ -242,7 +243,7 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_ self.scheduler_job._process_executor_events(session=session) ti1.refresh_from_db(session=session) assert ti1.state == State.SUCCESS - self.scheduler_job.executor.callback_sink.send.assert_not_called() + self.scheduler_job._callback_sink.send.assert_not_called() mock_stats_incr.assert_has_calls( [ mock.call('scheduler.tasks.killed_externally'), @@ -279,6 +280,7 @@ def test_process_executor_events_with_no_callback(self, mock_stats_incr, mock_ta mock_task_callback.return_value = task_callback self.scheduler_job = SchedulerJob(executor=executor) self.scheduler_job.processor_agent = mock.MagicMock() + self.scheduler_job._callback_sink = mock.MagicMock() ti1.state = State.QUEUED session.merge(ti1) session.commit() @@ -288,7 +290,7 @@ def test_process_executor_events_with_no_callback(self, mock_stats_incr, mock_ta self.scheduler_job._process_executor_events(session=session) ti1.refresh_from_db(session=session) assert ti1.state == State.UP_FOR_RETRY - self.scheduler_job.executor.callback_sink.send.assert_not_called() + self.scheduler_job._callback_sink.send.assert_not_called() # ti in success state ti1.state = State.SUCCESS @@ -299,7 +301,7 @@ def test_process_executor_events_with_no_callback(self, mock_stats_incr, mock_ta self.scheduler_job._process_executor_events(session=session) ti1.refresh_from_db(session=session) assert ti1.state == State.SUCCESS - self.scheduler_job.executor.callback_sink.send.assert_not_called() + self.scheduler_job._callback_sink.send.assert_not_called() mock_stats_incr.assert_has_calls( [ mock.call('scheduler.tasks.killed_externally'), @@ -326,6 +328,7 @@ def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_ mock_task_callback.return_value = task_callback self.scheduler_job = SchedulerJob(executor=executor) self.scheduler_job.processor_agent = mock.MagicMock() + self.scheduler_job._callback_sink = mock.MagicMock() session = settings.Session() ti1.state = State.QUEUED @@ -347,8 +350,8 @@ def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_ 'finished (failed) although the task says its queued. (Info: None) ' 'Was the task killed externally?', ) - self.scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback) - self.scheduler_job.executor.callback_sink.reset_mock() + self.scheduler_job._callback_sink.send.assert_called_once_with(task_callback) + self.scheduler_job._callback_sink.reset_mock() mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally') @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest') @@ -435,7 +438,7 @@ def test_setup_callback_sink_not_standalone_dag_processor(self): self.scheduler_job._execute() - assert isinstance(self.scheduler_job.executor.callback_sink, PipeCallbackSink) + assert isinstance(self.scheduler_job._callback_sink, PipeCallbackSink) @conf_vars({('scheduler', 'standalone_dag_processor'): 'True'}) def test_setup_callback_sink_standalone_dag_processor(self): @@ -443,7 +446,7 @@ def test_setup_callback_sink_standalone_dag_processor(self): self.scheduler_job._execute() - assert isinstance(self.scheduler_job.executor.callback_sink, DatabaseCallbackSink) + assert isinstance(self.scheduler_job._callback_sink, DatabaseCallbackSink) def test_find_executable_task_instances_backfill(self, dag_maker): dag_id = 'SchedulerJobTest.test_find_executable_task_instances_backfill' @@ -1552,6 +1555,7 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag self.scheduler_job.executor = MockExecutor() + self.scheduler_job._callback_sink = mock.MagicMock() session = settings.Session() orm_dag = session.query(DagModel).get(dag.dag_id) @@ -1597,7 +1601,7 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): ) # Verify dag failure callback request is sent to file processor - self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback) + self.scheduler_job._callback_sink.send.assert_called_once_with(expected_callback) session.rollback() session.close() @@ -1619,6 +1623,7 @@ def test_dagrun_timeout_fails_run(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag self.scheduler_job.executor = MockExecutor() + self.scheduler_job._callback_sink = mock.MagicMock() # Mock that processor_agent is started self.scheduler_job.processor_agent = mock.Mock() @@ -1638,7 +1643,7 @@ def test_dagrun_timeout_fails_run(self, dag_maker): ) # Verify dag failure callback request is sent to file processor - self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback) + self.scheduler_job._callback_sink.send.assert_called_once_with(expected_callback) session.rollback() session.close() @@ -1661,6 +1666,7 @@ def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag self.scheduler_job.executor = MockExecutor() + self.scheduler_job._callback_sink = mock.Mock() # Mock that processor_agent is started self.scheduler_job.processor_agent = mock.Mock() @@ -1695,6 +1701,7 @@ def test_dagrun_callbacks_are_called(self, state, expected_callback_msg, dag_mak self.scheduler_job.executor = MockExecutor() self.scheduler_job.dagbag = dag_maker.dagbag self.scheduler_job.processor_agent = mock.Mock() + self.scheduler_job._callback_sink = mock.MagicMock() session = settings.Session() dr = dag_maker.create_dagrun() @@ -1714,7 +1721,7 @@ def test_dagrun_callbacks_are_called(self, state, expected_callback_msg, dag_mak ) # Verify dag failure callback request is sent to file processor - self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback) + self.scheduler_job._callback_sink.send.assert_called_once_with(expected_callback) session.rollback() session.close() @@ -2885,9 +2892,10 @@ def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker): with patch.object(settings, "CHECK_SLAS", False): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.executor = MockExecutor() + self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._send_sla_callbacks_to_processor(dag) - self.scheduler_job.executor.callback_sink.send.assert_not_called() + self.scheduler_job._callback_sink.send.assert_not_called() def test_send_sla_callbacks_to_processor_sla_no_task_slas(self, dag_maker): """Test SLA Callbacks are not sent when no task SLAs are defined""" @@ -2898,9 +2906,10 @@ def test_send_sla_callbacks_to_processor_sla_no_task_slas(self, dag_maker): with patch.object(settings, "CHECK_SLAS", True): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.executor = MockExecutor() + self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._send_sla_callbacks_to_processor(dag) - self.scheduler_job.executor.callback_sink.send.assert_not_called() + self.scheduler_job._callback_sink.send.assert_not_called() def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, dag_maker): """Test SLA Callbacks are sent to the DAG Processor when SLAs are defined on tasks""" @@ -2911,11 +2920,12 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, dag_maker): with patch.object(settings, "CHECK_SLAS", True): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.executor = MockExecutor() + self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._send_sla_callbacks_to_processor(dag) expected_callback = SlaCallbackRequest(full_filepath=dag.fileloc, dag_id=dag.dag_id) - self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback) + self.scheduler_job._callback_sink.send.assert_called_once_with(expected_callback) def test_create_dag_runs(self, dag_maker): """ @@ -3173,6 +3183,7 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.executor = MockExecutor() self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + self.scheduler_job._callback_sink = mock.MagicMock() my_dag = session.query(DagModel).get(dag.dag_id) self.scheduler_job._create_dag_runs([my_dag], session) @@ -3782,10 +3793,11 @@ def test_find_zombies_nothing(self): executor = MockExecutor(do_update=False) self.scheduler_job = SchedulerJob(executor=executor) self.scheduler_job.processor_agent = mock.MagicMock() + self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._find_zombies(session=session) - self.scheduler_job.executor.callback_sink.send.assert_not_called() + self.scheduler_job._callback_sink.send.assert_not_called() def test_find_zombies(self): dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False) @@ -3816,11 +3828,12 @@ def test_find_zombies(self): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.executor = MockExecutor() self.scheduler_job.processor_agent = mock.MagicMock() + self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._find_zombies(session=session) - self.scheduler_job.executor.callback_sink.send.assert_called_once() - requests = self.scheduler_job.executor.callback_sink.send.call_args[0] + self.scheduler_job._callback_sink.send.assert_called_once() + requests = self.scheduler_job._callback_sink.send.call_args[0] assert 1 == len(requests) assert requests[0].full_filepath == dag.fileloc assert requests[0].msg == f"Detected {ti} as zombie" @@ -3878,13 +3891,13 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce ] self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() + self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job.processor_agent = mock.MagicMock() self.scheduler_job._find_zombies(session=session) - self.scheduler_job.executor.callback_sink.send.assert_called_once() - callback_requests = self.scheduler_job.executor.callback_sink.send.call_args[0] + self.scheduler_job._callback_sink.send.assert_called_once() + callback_requests = self.scheduler_job._callback_sink.send.call_args[0] assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == { result.simple_task_instance.key for result in callback_requests } From e34d6821b1493ec3d60c3609bf3d4665d0e5678e Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Fri, 10 Jun 2022 14:04:46 +0300 Subject: [PATCH 2/2] Move callback_sink from executor to scheduler --- tests/jobs/test_scheduler_job.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index f8358af6307c3..79e5fe7a789a6 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1476,7 +1476,6 @@ def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, dag_make session = settings.Session() self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job.processor_agent = mock.MagicMock() self.scheduler_job.dagbag = dag_maker.dagbag @@ -1554,7 +1553,6 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag - self.scheduler_job.executor = MockExecutor() self.scheduler_job._callback_sink = mock.MagicMock() session = settings.Session() @@ -1622,7 +1620,6 @@ def test_dagrun_timeout_fails_run(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag - self.scheduler_job.executor = MockExecutor() self.scheduler_job._callback_sink = mock.MagicMock() # Mock that processor_agent is started @@ -1665,7 +1662,6 @@ def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker): dag_maker.dag_model.next_dagrun == dr.execution_date self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag - self.scheduler_job.executor = MockExecutor() self.scheduler_job._callback_sink = mock.Mock() # Mock that processor_agent is started @@ -1698,7 +1694,6 @@ def test_dagrun_callbacks_are_called(self, state, expected_callback_msg, dag_mak EmptyOperator(task_id='dummy') self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job.dagbag = dag_maker.dagbag self.scheduler_job.processor_agent = mock.Mock() self.scheduler_job._callback_sink = mock.MagicMock() @@ -2891,7 +2886,6 @@ def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker): with patch.object(settings, "CHECK_SLAS", False): self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._send_sla_callbacks_to_processor(dag) @@ -2905,7 +2899,6 @@ def test_send_sla_callbacks_to_processor_sla_no_task_slas(self, dag_maker): with patch.object(settings, "CHECK_SLAS", True): self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._send_sla_callbacks_to_processor(dag) @@ -2919,7 +2912,6 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, dag_maker): with patch.object(settings, "CHECK_SLAS", True): self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job._callback_sink = mock.MagicMock() self.scheduler_job._send_sla_callbacks_to_processor(dag) @@ -3181,7 +3173,6 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker): ) self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) self.scheduler_job._callback_sink = mock.MagicMock() @@ -3826,7 +3817,6 @@ def test_find_zombies(self): session.flush() self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job.processor_agent = mock.MagicMock() self.scheduler_job._callback_sink = mock.MagicMock() @@ -4077,7 +4067,6 @@ def test_catchup_works_correctly(self, dag_maker): EmptyOperator(task_id='dummy') self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.executor = MockExecutor() self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)