Skip to content

Fix topological sort#56963

Merged
kaxil merged 1 commit into
apache:mainfrom
astronomer:fix-topological-sort-odering
Oct 21, 2025
Merged

Fix topological sort#56963
kaxil merged 1 commit into
apache:mainfrom
astronomer:fix-topological-sort-odering

Conversation

@pierrejeambrun

@pierrejeambrun pierrejeambrun commented Oct 21, 2025

Copy link
Copy Markdown
Member

Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:

from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group


@task
def get_nums() -> list[int]:
    return [1, 2, 4]


@task
def times_2(n: int) -> int:
    return n * 2


@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value


@task
def log_success() -> None:
    print("Processed successful!")


@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()


test()

Before

Screenshot 2025-10-21 at 17 57 20

After

Screenshot 2025-10-21 at 17 56 57

@dheerajturaga

Copy link
Copy Markdown
Member

I tried to fix this in #56321
however seems like your implementation is simpler. let me know if I need to close mine

@pierrejeambrun

pierrejeambrun commented Oct 21, 2025

Copy link
Copy Markdown
Member Author

Yes, I Think this implementation is simpler and should be favored, also it is more in lined with the sdk implementation

@pierrejeambrun

Copy link
Copy Markdown
Member Author

Cc: @kaxil

@dheerajturaga

Copy link
Copy Markdown
Member

Yes, I Think this implementation is simpler and should be favored, also it is more in lined with the sdk implementation

Agreed! Thanks for fixing this!

@kaxil kaxil merged commit c3f53b1 into apache:main Oct 21, 2025
114 checks passed
@kaxil kaxil deleted the fix-topological-sort-odering branch October 21, 2025 17:27
github-actions Bot pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
    return [1, 2, 4]

@task
def times_2(n: int) -> int:
    return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value

@task
def log_success() -> None:
    print("Processed successful!")

@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()

test()
```

### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />
(cherry picked from commit c3f53b1)

Co-authored-by: Pierre Jeambrun <pierrejbrun@gmail.com>
@github-actions

Copy link
Copy Markdown
Contributor

Backport successfully created: v3-1-test

Status Branch Result
v3-1-test PR Link

kaxil pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
    return [1, 2, 4]

@task
def times_2(n: int) -> int:
    return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value

@task
def log_success() -> None:
    print("Processed successful!")

@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()

test()
```

### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />

(cherry picked from commit c3f53b1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Order tasks topologically in Grid view

3 participants