Fix grid view task ordering by correcting topological_sort implementation#56321
Closed
dheerajturaga wants to merge 2 commits into
Closed
Fix grid view task ordering by correcting topological_sort implementation#56321dheerajturaga wants to merge 2 commits into
dheerajturaga wants to merge 2 commits into
Conversation
8e50430 to
0befe88
Compare
Member
Author
|
@ashb , Im not sure if the CI check fails are related to my change... |
Member
|
Did this change in 3.1? The test failures look unrelated -- try rebasing to see if that fixes it |
Member
Author
Looks like the task ordering changed between 3.0.6 and 3.1 Reg test failures, I already rebased, I suspect CI is broken at the moment |
Member
Author
Member
#55169 (comment) bit us? |
0befe88 to
0263a87
Compare
…tion The SerializedTaskGroup.topological_sort() method had a critical bug in its topological sorting algorithm. After checking if an upstream dependency's parent task group was still in the unsorted graph, the code failed to verify whether such a parent was found before proceeding. This caused the else clause to execute even when nodes had unresolved parent task group dependencies, resulting in tasks being sorted out of dependency order in the grid view. The fix adds the missing logic: 1. Check if a parent task group dependency exists (if tg:) and break if found 2. Track progress with an acyclic flag to detect cycles or stuck states 3. Break the loop if no nodes are resolved in an iteration Also added the missing hierarchical_alphabetical_sort() method to support the alternative grid_view_sorting_order configuration option. This ensures tasks are displayed in the correct dependency order in the grid view, matching how they are executed.
…tion
The SerializedTaskGroup.topological_sort() method had critical bugs that caused
tasks to display in incorrect order in the grid view:
1. Missing logic to check if parent task group dependencies exist before adding
nodes to the sorted list, causing premature sorting of dependent tasks.
2. Failed to handle task groups differently from tasks when checking upstream
dependencies. Task groups use upstream_group_ids/upstream_task_ids attributes,
while tasks use upstream_list. The original implementation only checked
upstream_list, causing task groups to appear to have no dependencies.
The fix:
- Added missing hierarchical_alphabetical_sort() method to support the alternative
grid_view_sorting_order configuration option
- Fixed topological_sort() to properly detect upstream dependencies for both tasks
(via upstream_list) and task groups (via upstream_group_ids/upstream_task_ids)
- Added check after the parent task group search loop to break if a dependency
was found
- Added acyclic flag tracking and handling for cycle/stuck states
Updated unit tests to reflect correct topological ordering where tasks appear
after their dependencies rather than in arbitrary order.
This ensures tasks are displayed in correct dependency order in the grid view,
matching how they are executed.
0263a87 to
a99c496
Compare
Merged
Member
There was a problem hiding this comment.
As mentioned in the other PR, I think #56963 should be favored, the implementation is simpler and more consistent with the sdk counterpart
github-actions Bot
pushed a commit
that referenced
this pull request
Oct 21, 2025
Fixes: #55899 Closes #56321 Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph. Adjusted test, which indeed were not in the correct topological order. Testing dag code: ```python from __future__ import annotations import datetime import pendulum from airflow.sdk import dag, task, task_group @task def get_nums() -> list[int]: return [1, 2, 4] @task def times_2(n: int) -> int: return n * 2 @task_group(group_id="process_number") def process_number(n: int): value = times_2(n) return value @task def log_success() -> None: print("Processed successful!") @dag( schedule=None, catchup=False, start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"), dagrun_timeout=datetime.timedelta(minutes=30), dag_id="55899_bug", ) def test(): nums = get_nums() processed = process_number.expand(n=nums) processed >> log_success() test() ``` ### Before <img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" /> ### After <img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" /> (cherry picked from commit c3f53b1) Co-authored-by: Pierre Jeambrun <pierrejbrun@gmail.com>
kaxil
pushed a commit
to astronomer/airflow
that referenced
this pull request
Oct 21, 2025
Fixes: apache#55899 Closes apache#56321 Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph. Adjusted test, which indeed were not in the correct topological order. Testing dag code: ```python from __future__ import annotations import datetime import pendulum from airflow.sdk import dag, task, task_group @task def get_nums() -> list[int]: return [1, 2, 4] @task def times_2(n: int) -> int: return n * 2 @task_group(group_id="process_number") def process_number(n: int): value = times_2(n) return value @task def log_success() -> None: print("Processed successful!") @dag( schedule=None, catchup=False, start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"), dagrun_timeout=datetime.timedelta(minutes=30), dag_id="55899_bug", ) def test(): nums = get_nums() processed = process_number.expand(n=nums) processed >> log_success() test() ``` ### Before <img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" /> ### After <img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />
kaxil
pushed a commit
that referenced
this pull request
Oct 21, 2025
Fixes: #55899 Closes #56321 Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph. Adjusted test, which indeed were not in the correct topological order. Testing dag code: ```python from __future__ import annotations import datetime import pendulum from airflow.sdk import dag, task, task_group @task def get_nums() -> list[int]: return [1, 2, 4] @task def times_2(n: int) -> int: return n * 2 @task_group(group_id="process_number") def process_number(n: int): value = times_2(n) return value @task def log_success() -> None: print("Processed successful!") @dag( schedule=None, catchup=False, start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"), dagrun_timeout=datetime.timedelta(minutes=30), dag_id="55899_bug", ) def test(): nums = get_nums() processed = process_number.expand(n=nums) processed >> log_success() test() ``` ### Before <img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" /> ### After <img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" /> (cherry picked from commit c3f53b1)
kosteev
pushed a commit
to GoogleCloudPlatform/composer-airflow
that referenced
this pull request
Mar 2, 2026
Fixes: apache/airflow#55899 Closes apache/airflow#56321 Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph. Adjusted test, which indeed were not in the correct topological order. Testing dag code: ```python from __future__ import annotations import datetime import pendulum from airflow.sdk import dag, task, task_group @task def get_nums() -> list[int]: return [1, 2, 4] @task def times_2(n: int) -> int: return n * 2 @task_group(group_id="process_number") def process_number(n: int): value = times_2(n) return value @task def log_success() -> None: print("Processed successful!") @dag( schedule=None, catchup=False, start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"), dagrun_timeout=datetime.timedelta(minutes=30), dag_id="55899_bug", ) def test(): nums = get_nums() processed = process_number.expand(n=nums) processed >> log_success() test() ``` ### Before <img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" /> ### After <img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" /> (cherry picked from commit c3f53b1d598a55df42ba588fbd1dd10fab2f2ae8) GitOrigin-RevId: ccc33ffd109b64c6e41512d9cbaa38c53cabef7d
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The SerializedTaskGroup.topological_sort() method had a critical bug in its
topological sorting algorithm. After checking if an upstream dependency's parent
task group was still in the unsorted graph, the code failed to verify whether
such a parent was found before proceeding. This caused the else clause to execute
even when nodes had unresolved parent task group dependencies, resulting in tasks
being sorted out of dependency order in the grid view.
The fix adds the missing logic:
Also added the missing hierarchical_alphabetical_sort() method to support the
alternative grid_view_sorting_order configuration option.
This ensures tasks are displayed in the correct dependency order in the grid view,
matching how they are executed.
Before:
After: