Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ def serialize_dag(cls, dag: DAG) -> dict:
for dep in SerializedBaseOperator.detect_dependencies(task)
}
dag_deps.update(DependencyDetector.detect_dag_dependencies(dag))
serialized_dag["dag_dependencies"] = [x.__dict__ for x in dag_deps]
serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)]
serialized_dag["_task_group"] = TaskGroupSerialization.serialize_task_group(dag.task_group)

# Edge info in the JSON exactly matches our internal structure
Expand Down Expand Up @@ -1444,7 +1444,7 @@ def set_ref(task: Operator) -> Operator:
return group


@dataclass(frozen=True)
@dataclass(frozen=True, order=True)
class DagDependency:
"""Dataclass for representing dependencies between DAGs.
These are calculated during serialization and attached to serialized DAGs.
Expand Down
46 changes: 45 additions & 1 deletion tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

from unittest import mock

import pendulum
import pytest

from airflow import DAG, example_dags as example_dags_module
from airflow import DAG, Dataset, example_dags as example_dags_module
from airflow.models import DagBag
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel as SDM
from airflow.operators.bash import BashOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.settings import json
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import create_session
from tests.test_utils import db
from tests.test_utils.asserts import assert_queries_count
Expand Down Expand Up @@ -196,3 +200,43 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields):

expected_dependencies = {dag_id: [] for dag_id in example_dags}
assert SDM.get_dag_dependencies() == expected_dependencies

def test_order_of_deps_is_consistent(self):
"""
Previously the 'dag_dependencies' node in serialized dag was converted to list from set.
This caused the order, and thus the hash value, to be unreliable, which could produce
excessive dag parsing.
"""
first_dag_hash = None
for r in range(10):
with DAG(
dag_id="example",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[
Dataset("1"),
Dataset("2"),
Dataset("3"),
Dataset("4"),
Dataset("5"),
],
) as dag6:
BashOperator(
task_id="any",
outlets=[Dataset("0*"), Dataset("6*")],
bash_command="sleep 5",
)
deps_order = [x["dependency_id"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]]
# in below assert, 0 and 6 both come at end because "source" is different for them and source
# is the first field in DagDependency class
assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"]

# for good measure, let's check that the dag hash is consistent
dag_json = json.dumps(SerializedDAG.to_dict(dag6), sort_keys=True).encode("utf-8")
this_dag_hash = md5(dag_json).hexdigest()

# set first dag hash on first pass
if first_dag_hash is None:
first_dag_hash = this_dag_hash

# dag hash should not change without change in structure (we're in a loop)
assert this_dag_hash == first_dag_hash