Apache Airflow version
2.2.1
Operating System
Ubuntu 16.04
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
What happened
When I mark a defferd task to Failed-State then clear it or clear a defferd task which in Defferd-State, this task is marked success immediately.
I check the task instance table, this task's next_method is not null when I clear this task.This cause task's execute function is replaced by next_method function.
What you expected to happen
When I clear a defferd task, I want to re-judge it's trigger condition, re-execute the execute function.
How to reproduce
Sensor
class ExternalTaskFromOtherAirflowSensor(BaseSensorOperator):
def execute(self, context):
trigger = ExternalTaskFromOtherAirflowTrigger(args)
self.defer(trigger=trigger, method_name="execute_complete")
def execute_complete(self, context, event=None):
return
Trigger
class ExternalTaskFromOtherAirflowTrigger(BaseTrigger):
def _get_task_instance_state(self):
sql_query = "SELECT state FROM {}.task_instance WHERE dag_id = '{}' and run_id like '%{}%';".format(
self.old_airflow_db_name,
self.old_external_dag_id,
self.old_external_task_id,
self.old_external_execution_date_str
)
async def run(self):
while self._get_task_instance_state() == False:
await asyncio.sleep(5)
yield TriggerEvent('success')
- Clear this task when it is in defferd State.
- Mark this task as failed and clear this task.
Both operations above will cause this task is marked as success instead of re-execute the execute function.
Anything else
Similar issue:
#18146
#19120
I try to add this code in models/taskinstance.py clear_task_instances function

Are you willing to submit PR?
Code of Conduct
Apache Airflow version
2.2.1
Operating System
Ubuntu 16.04
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
What happened
When I mark a defferd task to Failed-State then clear it or clear a defferd task which in Defferd-State, this task is marked success immediately.
I check the task instance table, this task's next_method is not null when I clear this task.This cause task's execute function is replaced by next_method function.
What you expected to happen
When I clear a defferd task, I want to re-judge it's trigger condition, re-execute the execute function.
How to reproduce
Sensor
Trigger
Both operations above will cause this task is marked as success instead of re-execute the execute function.
Anything else
Similar issue:
#18146
#19120
I try to add this code in models/taskinstance.py clear_task_instances function

Are you willing to submit PR?
Code of Conduct