Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions airflow/jobs/triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
to_create: Deque[Tuple[int, BaseTrigger]]

# Inbound queue of deleted triggers
to_delete: Deque[int]
to_cancel: Deque[int]

# Outbound queue of events
events: Deque[Tuple[int, TriggerEvent]]
Expand All @@ -221,7 +221,7 @@ def __init__(self):
self.triggers = {}
self.trigger_cache = {}
self.to_create = deque()
self.to_delete = deque()
self.to_cancel = deque()
self.events = deque()
self.failed_triggers = deque()

Expand All @@ -241,7 +241,7 @@ async def arun(self):
while not self.stop:
# Run core logic
await self.create_triggers()
await self.delete_triggers()
await self.cancel_triggers()
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
Expand Down Expand Up @@ -269,13 +269,13 @@ async def create_triggers(self):
self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
await asyncio.sleep(0)

async def delete_triggers(self):
async def cancel_triggers(self):
"""
Drain the to_delete queue and ensure all triggers that are not in the
Drain the to_cancel queue and ensure all triggers that are not in the
DB are cancelled, so the cleanup job deletes them.
"""
while self.to_delete:
trigger_id = self.to_delete.popleft()
while self.to_cancel:
trigger_id = self.to_cancel.popleft()
if trigger_id in self.triggers:
# We only delete if it did not exit already
self.triggers[trigger_id]["task"].cancel()
Expand Down Expand Up @@ -383,7 +383,7 @@ def update_triggers(self, requested_trigger_ids: Set[int]):
current_trigger_ids = set(self.triggers.keys())
# Work out the two difference sets
new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids)
old_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
# Bulk-fetch new trigger records
new_triggers = Trigger.bulk_fetch(new_trigger_ids)
# Add in new triggers
Expand All @@ -400,9 +400,9 @@ def update_triggers(self, requested_trigger_ids: Set[int]):
self.failed_triggers.append(new_id)
continue
self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
# Remove old triggers
for old_id in old_trigger_ids:
self.to_delete.append(old_id)
# Enqueue orphaned triggers for cancellation
for old_id in cancel_trigger_ids:
self.to_cancel.append(old_id)

def get_trigger_by_classpath(self, classpath: str) -> Type[BaseTrigger]:
"""
Expand Down