From 339d3aba6179dce44c763051217e57a9007a758c Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Sun, 26 Apr 2026 14:53:18 +0900 Subject: [PATCH] Fix test_celery_integration: unreachable code and stale attributes --- .../celery/test_celery_executor.py | 53 +++++++------------ 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/providers/celery/tests/integration/celery/test_celery_executor.py b/providers/celery/tests/integration/celery/test_celery_executor.py index aeb769f415324..91f42e51405f8 100644 --- a/providers/celery/tests/integration/celery/test_celery_executor.py +++ b/providers/celery/tests/integration/celery/test_celery_executor.py @@ -117,6 +117,17 @@ def _prepare_app(broker_url=None, execute=None): set_event_loop(None) +def setup_dagrun_with_success_and_fail_workloads(dag_maker): + date = timezone.utcnow() + start_date = date - timedelta(days=2) + + with dag_maker("test_celery_integration"): + BaseOperator(task_id="success", start_date=start_date) + BaseOperator(task_id="fail", start_date=start_date) + + return dag_maker.create_dagrun(logical_date=date) + + @pytest.mark.integration("celery") @pytest.mark.backend("postgres") class TestCeleryExecutor: @@ -128,17 +139,6 @@ def teardown_method(self) -> None: db.clear_db_runs() db.clear_db_jobs() - -def setup_dagrun_with_success_and_fail_workloads(dag_maker): - date = timezone.utcnow() - start_date = date - timedelta(days=2) - - with dag_maker("test_celery_integration"): - BaseOperator(task_id="success", start_date=start_date) - BaseOperator(task_id="fail", start_date=start_date) - - return dag_maker.create_dagrun(logical_date=date) - @pytest.mark.flaky(reruns=5, reruns_delay=3) @pytest.mark.parametrize("broker_url", _prepare_test_bodies()) @pytest.mark.parametrize( @@ -196,16 +196,11 @@ def fake_execute(input: str) -> None: # Use same parameter name as Airflow 3 ve # Force single-process sending so mock patches survive (ProcessPoolExecutor # would fork new processes where the patches are not active). executor._sync_parallelism = 1 - assert executor.tasks == {} + assert executor.workloads == {} executor.start() with start_worker(app=app, logfile=sys.stdout, loglevel="info"): - dagrun_date = timezone.utcnow() - dagrun_start = dagrun_date - timedelta(days=2) - with dag_maker("test_celery_integration"): - BaseOperator(task_id="success", start_date=dagrun_start) - BaseOperator(task_id="fail", start_date=dagrun_start) - dagrun = dag_maker.create_dagrun(logical_date=dagrun_date) + dagrun = setup_dagrun_with_success_and_fail_workloads(dag_maker) ti_fail, ti_success = sorted(dagrun.task_instances, key=lambda ti: ti.task_id) # Derive keys from the real task instances so they match what the executor tracks key_fail = TaskInstanceKey( @@ -229,23 +224,11 @@ def fake_execute(input: str) -> None: # Use same parameter name as Airflow 3 ve bundle_info=BundleInfo(name="test"), log_path="test.log", ) - keys = [ - TaskInstanceKey("id", "success", "abc", 0, -1), - TaskInstanceKey("id", "fail", "abc", 0, -1), - ] - dagrun = setup_dagrun_with_success_and_fail_workloads(dag_maker) - ti_success, ti_fail = dagrun.task_instances - for w in ( - workloads.ExecuteTask.make( - ti=ti_success, - ), - workloads.ExecuteTask.make(ti=ti_fail), - ): executor.queue_workload(w, session=None) executor.trigger_tasks(open_slots=10) for _ in range(20): - num_tasks = len(executor.tasks.keys()) + num_tasks = len(executor.workloads.keys()) if num_tasks == 2: break logger.info( @@ -253,7 +236,7 @@ def fake_execute(input: str) -> None: # Use same parameter name as Airflow 3 ve num_tasks, ) sleep(0.4) - assert sorted(executor.tasks.keys()) == sorted(keys) + assert sorted(executor.workloads.keys()) == sorted(keys) assert executor.event_buffer[key_success][0] == State.QUEUED assert executor.event_buffer[key_fail][0] == State.QUEUED @@ -262,8 +245,8 @@ def fake_execute(input: str) -> None: # Use same parameter name as Airflow 3 ve assert executor.event_buffer[key_success][0] == State.SUCCESS assert executor.event_buffer[key_fail][0] == State.FAILED - assert key_success not in executor.tasks - assert key_fail not in executor.tasks + assert key_success not in executor.workloads + assert key_fail not in executor.workloads assert executor.queued_tasks == {} @@ -284,7 +267,7 @@ def test_error_sending_workload(self): key = (task.dag.dag_id, task.task_id, ti.run_id, 0, -1) executor.queued_tasks[key] = workload - executor.task_publish_retries[key] = 1 + executor.workload_publish_retries[key] = 1 # Mock send_workload_to_executor to return an error result. # This simulates a failure when sending the workload to Celery.