From a83c5fd693c688bdf396488db09e7ae3ff0cb18d Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Thu, 17 Jul 2025 17:22:06 -0400 Subject: [PATCH 1/8] Add ToTriggerSupervisor message types --- .../src/airflow/jobs/triggerer_job_runner.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 19241a8c54847..fea7b484d277d 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -48,6 +48,8 @@ CommsDecoder, ConnectionResult, DagRunStateResult, + DeleteVariable, + DeleteXCom, DRCount, ErrorResponse, GetConnection, @@ -58,6 +60,8 @@ GetTICount, GetVariable, GetXCom, + PutVariable, + SetXCom, TaskStatesResult, TICount, UpdateHITLDetail, @@ -252,8 +256,12 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe ToTriggerSupervisor = Annotated[ messages.TriggerStateChanges | GetConnection + | DeleteVariable | GetVariable + | PutVariable + | DeleteXCom | GetXCom + | SetXCom | GetTICount | GetTaskStates | GetDagRunState @@ -419,6 +427,14 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r dump_opts = {"exclude_unset": True, "by_alias": True} else: resp = conn + elif isinstance(msg, DeleteVariable): + var = self.client.variables.delete(msg.key) + if isinstance(var, VariableResponse): + var_result = VariableResult.from_variable_response(var) + resp = var_result + dump_opts = {"exclude_unset": True} + else: + resp = var elif isinstance(msg, GetVariable): var = self.client.variables.get(msg.key) if isinstance(var, VariableResponse): @@ -427,6 +443,22 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r dump_opts = {"exclude_unset": True} else: resp = var + elif isinstance(msg, PutVariable): + var = self.client.variables.set(msg.key, msg.value, msg.description) + if isinstance(var, VariableResponse): + var_result = VariableResult.from_variable_response(var) + resp = var_result + dump_opts = {"exclude_unset": True} + else: + resp = var + elif isinstance(msg, DeleteXCom): + xcom = self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) + if isinstance(xcom, XComResponse): + xcom_result = XComResult.from_xcom_response(xcom) + resp = xcom_result + dump_opts = {"exclude_unset": True} + else: + resp = xcom elif isinstance(msg, GetXCom): xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) if isinstance(xcom, XComResponse): @@ -435,6 +467,16 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r dump_opts = {"exclude_unset": True} else: resp = xcom + elif isinstance(msg, SetXCom): + xcom = self.client.xcoms.set( + msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.value, msg.map_index, msg.mapped_length + ) + if isinstance(xcom, XComResponse): + xcom_result = XComResult.from_xcom_response(xcom) + resp = xcom_result + dump_opts = {"exclude_unset": True} + else: + resp = xcom elif isinstance(msg, GetDRCount): dr_count = self.client.dag_runs.get_count( dag_id=msg.dag_id, From 74753fc1925e2a34ec2b6c64713335b6de044ecb Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Fri, 18 Jul 2025 18:10:22 -0400 Subject: [PATCH 2/8] Add OKResponse as ToTriggerRunner message type --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index fea7b484d277d..1f1b1988d7c03 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -60,6 +60,7 @@ GetTICount, GetVariable, GetXCom, + OKResponse, PutVariable, SetXCom, TaskStatesResult, @@ -244,7 +245,8 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe | TICount | TaskStatesResult | HITLDetailResponseResult - | ErrorResponse, + | ErrorResponse + | OKResponse, Field(discriminator="type"), ] """ From 754d31a0f5dda79ac747bc3dd360f744315a6188 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Mon, 28 Jul 2025 16:38:03 -0400 Subject: [PATCH 3/8] Test Variable and XCom delete and set from trigger --- .../tests/unit/jobs/test_triggerer_job.py | 78 +++++++++++++++++-- 1 file changed, 72 insertions(+), 6 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 384bf6a76fc60..4457c11eb054f 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -648,9 +648,15 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]: conn = await sync_to_async(BaseHook.get_connection)("test_connection") self.log.info("Loaded conn %s", conn.conn_id) - variable = await sync_to_async(Variable.get)("test_variable") + variable = await sync_to_async(Variable.get)("test_get_variable") self.log.info("Loaded variable %s", variable) + await sync_to_async(Variable.set)(key="test_set_variable", value="set_value") + self.log.info("Set variable with key test_set_variable") + + await sync_to_async(Variable.delete)("test_delete_variable") + self.log.info("Deleted variable with key test_delete_variable") + xcom = await sync_to_async(XCom.get_one)( key="test_xcom", dag_id=self.dag_id, @@ -660,6 +666,25 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]: ) self.log.info("Loaded XCom %s", xcom) + await sync_to_async(XCom.set)( + key="test_set_xcom", + dag_id=self.dag_id, + run_id=self.run_id, + task_id=self.task_id, + map_index=self.map_index, + value="set_xcom", + ) + self.log.info("Set xcom with key test_set_xcom") + + await sync_to_async(XCom.delete)( + key="test_delete_xcom", + dag_id=self.dag_id, + run_id=self.run_id, + task_id=self.task_id, + map_index=self.map_index, + ) + self.log.info("Delete xcom with key test_delete_xcom") + yield TriggerEvent({"connection": attrs.asdict(conn), "variable": variable, "xcom": xcom}) def serialize(self) -> tuple[str, dict[str, Any]]: @@ -687,8 +712,8 @@ def handle_events(self): @pytest.mark.asyncio @pytest.mark.execution_timeout(20) -async def test_trigger_can_access_variables_connections_and_xcoms(session, dag_maker): - """Checks that the trigger will successfully access Variables, Connections and XComs.""" +async def test_trigger_can_call_variables_connections_and_xcoms_methods(session, dag_maker): + """Checks that the trigger will successfully call Variables, Connections and XComs methods.""" # Create the test DAG and task with dag_maker(dag_id="trigger_accessing_variable_connection_and_xcom", session=session): EmptyOperator(task_id="dummy1") @@ -704,7 +729,7 @@ async def test_trigger_can_access_variables_connections_and_xcoms(session, dag_m kwargs={"dag_id": dr.dag_id, "run_id": dr.run_id, "task_id": task_instance.task_id, "map_index": -1}, ) session.add(trigger_orm) - session.commit() + session.flush() task_instance.trigger_id = trigger_orm.id # Create the appropriate Connection, Variable and XCom @@ -718,7 +743,9 @@ async def test_trigger_can_access_variables_connections_and_xcoms(session, dag_m port=443, host="example.com", ) - variable = Variable(key="test_variable", val="some_variable_value") + get_variable = Variable(key="test_get_variable", val="some_variable_value") + delete_variable = Variable(key="test_delete_variable", val="delete_value") + XComModel.set( key="test_xcom", value="some_xcom_value", @@ -728,8 +755,20 @@ async def test_trigger_can_access_variables_connections_and_xcoms(session, dag_m map_index=-1, session=session, ) + + XComModel.set( + key="test_delete_xcom", + value="some_xcom_value", + task_id=task_instance.task_id, + dag_id=dr.dag_id, + run_id=dr.run_id, + map_index=-1, + session=session, + ) + session.add(connection) - session.add(variable) + session.add(get_variable) + session.add(delete_variable) job = Job() session.add(job) @@ -758,6 +797,33 @@ async def test_trigger_can_access_variables_connections_and_xcoms(session, dag_m "xcom": '"some_xcom_value"', } } + variable_set_val = await sync_to_async(Variable.get)("test_set_variable") + assert variable_set_val == "set_value" + + variable_delete_val = await sync_to_async(Variable.get)(key="test_delete_variable", default_var=None) + assert variable_delete_val is None + + xcom_set_query = await sync_to_async(XComModel.get_many)( + key="test_set_xcom", + dag_ids=dr.dag_id, + run_id=dr.run_id, + task_ids=task_instance.task_id, + map_indexes=-1, + session=session, + ) + xcom_set_model = xcom_set_query.first() + assert xcom_set_model.value == "set_xcom" + + xcom_delete_query = await sync_to_async(XComModel.get_many)( + key="test_delete_xcom", + dag_ids=dr.dag_id, + run_id=dr.run_id, + task_ids=task_instance.task_id, + map_indexes=-1, + session=session, + ) + xcom_delete_model = xcom_delete_query.first() + assert xcom_delete_model is None class CustomTriggerDagRun(BaseTrigger): From ee4b701393ba7c10b972508efd43b513536940c6 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Tue, 29 Jul 2025 15:54:57 -0400 Subject: [PATCH 4/8] Remove incorrect response type checks --- .../src/airflow/jobs/triggerer_job_runner.py | 32 +++---------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index a645b404dc2e2..05b6064ebbe5e 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -430,13 +430,7 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r else: resp = conn elif isinstance(msg, DeleteVariable): - var = self.client.variables.delete(msg.key) - if isinstance(var, VariableResponse): - var_result = VariableResult.from_variable_response(var) - resp = var_result - dump_opts = {"exclude_unset": True} - else: - resp = var + resp = self.client.variables.delete(msg.key) elif isinstance(msg, GetVariable): var = self.client.variables.get(msg.key) if isinstance(var, VariableResponse): @@ -446,21 +440,9 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r else: resp = var elif isinstance(msg, PutVariable): - var = self.client.variables.set(msg.key, msg.value, msg.description) - if isinstance(var, VariableResponse): - var_result = VariableResult.from_variable_response(var) - resp = var_result - dump_opts = {"exclude_unset": True} - else: - resp = var + resp = self.client.variables.set(msg.key, msg.value, msg.description) elif isinstance(msg, DeleteXCom): - xcom = self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) - if isinstance(xcom, XComResponse): - xcom_result = XComResult.from_xcom_response(xcom) - resp = xcom_result - dump_opts = {"exclude_unset": True} - else: - resp = xcom + resp = self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) elif isinstance(msg, GetXCom): xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) if isinstance(xcom, XComResponse): @@ -470,15 +452,9 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r else: resp = xcom elif isinstance(msg, SetXCom): - xcom = self.client.xcoms.set( + resp = self.client.xcoms.set( msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.value, msg.map_index, msg.mapped_length ) - if isinstance(xcom, XComResponse): - xcom_result = XComResult.from_xcom_response(xcom) - resp = xcom_result - dump_opts = {"exclude_unset": True} - else: - resp = xcom elif isinstance(msg, GetDRCount): dr_count = self.client.dag_runs.get_count( dag_id=msg.dag_id, From 3c13f9c3ec338737d39c8f67950ad0d17e9bd2ef Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Thu, 31 Jul 2025 14:39:59 -0400 Subject: [PATCH 5/8] Remove resp variable assignment for OkResponse --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 05b6064ebbe5e..69f104b070f01 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -430,7 +430,7 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r else: resp = conn elif isinstance(msg, DeleteVariable): - resp = self.client.variables.delete(msg.key) + self.client.variables.delete(msg.key) elif isinstance(msg, GetVariable): var = self.client.variables.get(msg.key) if isinstance(var, VariableResponse): @@ -440,9 +440,9 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r else: resp = var elif isinstance(msg, PutVariable): - resp = self.client.variables.set(msg.key, msg.value, msg.description) + self.client.variables.set(msg.key, msg.value, msg.description) elif isinstance(msg, DeleteXCom): - resp = self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) + self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) elif isinstance(msg, GetXCom): xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) if isinstance(xcom, XComResponse): @@ -452,7 +452,7 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r else: resp = xcom elif isinstance(msg, SetXCom): - resp = self.client.xcoms.set( + self.client.xcoms.set( msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.value, msg.map_index, msg.mapped_length ) elif isinstance(msg, GetDRCount): From f9c664a413c9b5ae3a4c6ac8e2f8aea2bc31abd9 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Thu, 31 Jul 2025 16:17:20 -0400 Subject: [PATCH 6/8] Modify unit test --- .../tests/unit/jobs/test_triggerer_job.py | 106 +++++++++--------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 4457c11eb054f..5b9918ecb7f81 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -648,44 +648,64 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]: conn = await sync_to_async(BaseHook.get_connection)("test_connection") self.log.info("Loaded conn %s", conn.conn_id) - variable = await sync_to_async(Variable.get)("test_get_variable") - self.log.info("Loaded variable %s", variable) + get_variable_value = await sync_to_async(Variable.get)("test_get_variable") + self.log.info("Loaded variable %s", get_variable_value) - await sync_to_async(Variable.set)(key="test_set_variable", value="set_value") - self.log.info("Set variable with key test_set_variable") - - await sync_to_async(Variable.delete)("test_delete_variable") - self.log.info("Deleted variable with key test_delete_variable") - - xcom = await sync_to_async(XCom.get_one)( - key="test_xcom", + get_xcom_value = await sync_to_async(XCom.get_one)( + key="test_get_xcom", dag_id=self.dag_id, run_id=self.run_id, task_id=self.task_id, map_index=self.map_index, ) - self.log.info("Loaded XCom %s", xcom) + self.log.info("Loaded XCom %s", get_xcom_value) + set_variable_key = "test_set_variable" + set_variable_value = "set_value" + await sync_to_async(Variable.set)(key=set_variable_key, value=set_variable_value) + self.log.info("Set variable with key %s and value %s" % (set_variable_key, set_variable_value)) + + set_xcom_key = "test_set_xcom" + set_xcom_value = "set_xcom" await sync_to_async(XCom.set)( - key="test_set_xcom", + key=set_xcom_key, dag_id=self.dag_id, run_id=self.run_id, task_id=self.task_id, map_index=self.map_index, - value="set_xcom", + value=set_xcom_value, ) - self.log.info("Set xcom with key test_set_xcom") + self.log.info("Set xcom with key %s and value %s" % (set_xcom_key, set_xcom_value)) + + delete_variable_key = "test_delete_variable" + await sync_to_async(Variable.delete)(delete_variable_key) + self.log.info("Deleted variable with key %s" % delete_variable_key) + delete_xcom_key = "test_delete_xcom" await sync_to_async(XCom.delete)( - key="test_delete_xcom", + key=delete_xcom_key, dag_id=self.dag_id, run_id=self.run_id, task_id=self.task_id, map_index=self.map_index, ) - self.log.info("Delete xcom with key test_delete_xcom") + self.log.info("Delete xcom with key %s" % delete_xcom_key) - yield TriggerEvent({"connection": attrs.asdict(conn), "variable": variable, "xcom": xcom}) + yield TriggerEvent( + { + "connection": attrs.asdict(conn), + "variable": { + "get_variable": get_variable_value, + "set_variable": set_variable_value, + "delete_variable": delete_variable_key, + }, + "xcom": { + "get_xcom": get_xcom_value, + "set_xcom": set_xcom_value, + "delete_xcom": delete_xcom_key, + }, + } + ) def serialize(self) -> tuple[str, dict[str, Any]]: return ( @@ -746,8 +766,12 @@ async def test_trigger_can_call_variables_connections_and_xcoms_methods(session, get_variable = Variable(key="test_get_variable", val="some_variable_value") delete_variable = Variable(key="test_delete_variable", val="delete_value") + session.add(connection) + session.add(get_variable) + session.add(delete_variable) + XComModel.set( - key="test_xcom", + key="test_get_xcom", value="some_xcom_value", task_id=task_instance.task_id, dag_id=dr.dag_id, @@ -766,10 +790,6 @@ async def test_trigger_can_call_variables_connections_and_xcoms_methods(session, session=session, ) - session.add(connection) - session.add(get_variable) - session.add(delete_variable) - job = Job() session.add(job) session.commit() @@ -780,7 +800,7 @@ async def test_trigger_can_call_variables_connections_and_xcoms_methods(session, task_instance.refresh_from_db() assert task_instance.state == TaskInstanceState.SCHEDULED assert task_instance.next_method != "__fail__" - assert task_instance.next_kwargs == { + expected_event = { "event": { "connection": { "conn_id": "test_connection", @@ -793,37 +813,19 @@ async def test_trigger_can_call_variables_connections_and_xcoms_methods(session, "port": 443, "extra": '{"key": "value"}', }, - "variable": "some_variable_value", - "xcom": '"some_xcom_value"', + "variable": { + "get_variable": "some_variable_value", + "set_variable": "set_value", + "delete_variable": "test_delete_variable", + }, + "xcom": { + "get_xcom": '"some_xcom_value"', + "set_xcom": "set_xcom", + "delete_xcom": "test_delete_xcom", + }, } } - variable_set_val = await sync_to_async(Variable.get)("test_set_variable") - assert variable_set_val == "set_value" - - variable_delete_val = await sync_to_async(Variable.get)(key="test_delete_variable", default_var=None) - assert variable_delete_val is None - - xcom_set_query = await sync_to_async(XComModel.get_many)( - key="test_set_xcom", - dag_ids=dr.dag_id, - run_id=dr.run_id, - task_ids=task_instance.task_id, - map_indexes=-1, - session=session, - ) - xcom_set_model = xcom_set_query.first() - assert xcom_set_model.value == "set_xcom" - - xcom_delete_query = await sync_to_async(XComModel.get_many)( - key="test_delete_xcom", - dag_ids=dr.dag_id, - run_id=dr.run_id, - task_ids=task_instance.task_id, - map_indexes=-1, - session=session, - ) - xcom_delete_model = xcom_delete_query.first() - assert xcom_delete_model is None + assert task_instance.next_kwargs == expected_event class CustomTriggerDagRun(BaseTrigger): From e578061447ed62403a25cbc94906ff7492ace497 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Thu, 31 Jul 2025 17:21:43 -0400 Subject: [PATCH 7/8] Correct log formatting --- airflow-core/tests/unit/jobs/test_triggerer_job.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 5b9918ecb7f81..ef065cf2c8d16 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -663,7 +663,7 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]: set_variable_key = "test_set_variable" set_variable_value = "set_value" await sync_to_async(Variable.set)(key=set_variable_key, value=set_variable_value) - self.log.info("Set variable with key %s and value %s" % (set_variable_key, set_variable_value)) + self.log.info("Set variable with key %s and value %s", set_variable_key, set_variable_value) set_xcom_key = "test_set_xcom" set_xcom_value = "set_xcom" @@ -675,11 +675,11 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]: map_index=self.map_index, value=set_xcom_value, ) - self.log.info("Set xcom with key %s and value %s" % (set_xcom_key, set_xcom_value)) + self.log.info("Set xcom with key %s and value %s", set_xcom_key, set_xcom_value) delete_variable_key = "test_delete_variable" await sync_to_async(Variable.delete)(delete_variable_key) - self.log.info("Deleted variable with key %s" % delete_variable_key) + self.log.info("Deleted variable with key %s", delete_variable_key) delete_xcom_key = "test_delete_xcom" await sync_to_async(XCom.delete)( @@ -689,7 +689,7 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]: task_id=self.task_id, map_index=self.map_index, ) - self.log.info("Delete xcom with key %s" % delete_xcom_key) + self.log.info("Delete xcom with key %s", delete_xcom_key) yield TriggerEvent( { From d2318debf32f098f13d9d461344059b9ef523855 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Mon, 4 Aug 2025 09:25:52 -0400 Subject: [PATCH 8/8] Add resp assignment for variable deletion --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 69f104b070f01..5e6d6d613b47f 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -430,7 +430,7 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r else: resp = conn elif isinstance(msg, DeleteVariable): - self.client.variables.delete(msg.key) + resp = self.client.variables.delete(msg.key) elif isinstance(msg, GetVariable): var = self.client.variables.get(msg.key) if isinstance(var, VariableResponse):