diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 0f319a1afb363..3a528e9c8a824 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1238,7 +1238,7 @@ def serialize_dag(cls, dag: DAG) -> dict: for dep in SerializedBaseOperator.detect_dependencies(task) } dag_deps.update(DependencyDetector.detect_dag_dependencies(dag)) - serialized_dag["dag_dependencies"] = [x.__dict__ for x in dag_deps] + serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)] serialized_dag["_task_group"] = TaskGroupSerialization.serialize_task_group(dag.task_group) # Edge info in the JSON exactly matches our internal structure @@ -1444,7 +1444,7 @@ def set_ref(task: Operator) -> Operator: return group -@dataclass(frozen=True) +@dataclass(frozen=True, order=True) class DagDependency: """Dataclass for representing dependencies between DAGs. These are calculated during serialization and attached to serialized DAGs. diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index b425cd8f658cd..61be12676f58b 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -20,13 +20,17 @@ from unittest import mock +import pendulum import pytest -from airflow import DAG, example_dags as example_dags_module +from airflow import DAG, Dataset, example_dags as example_dags_module from airflow.models import DagBag from airflow.models.dagcode import DagCode from airflow.models.serialized_dag import SerializedDagModel as SDM +from airflow.operators.bash import BashOperator from airflow.serialization.serialized_objects import SerializedDAG +from airflow.settings import json +from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import create_session from tests.test_utils import db from tests.test_utils.asserts import assert_queries_count @@ -196,3 +200,43 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields): expected_dependencies = {dag_id: [] for dag_id in example_dags} assert SDM.get_dag_dependencies() == expected_dependencies + + def test_order_of_deps_is_consistent(self): + """ + Previously the 'dag_dependencies' node in serialized dag was converted to list from set. + This caused the order, and thus the hash value, to be unreliable, which could produce + excessive dag parsing. + """ + first_dag_hash = None + for r in range(10): + with DAG( + dag_id="example", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=[ + Dataset("1"), + Dataset("2"), + Dataset("3"), + Dataset("4"), + Dataset("5"), + ], + ) as dag6: + BashOperator( + task_id="any", + outlets=[Dataset("0*"), Dataset("6*")], + bash_command="sleep 5", + ) + deps_order = [x["dependency_id"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]] + # in below assert, 0 and 6 both come at end because "source" is different for them and source + # is the first field in DagDependency class + assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"] + + # for good measure, let's check that the dag hash is consistent + dag_json = json.dumps(SerializedDAG.to_dict(dag6), sort_keys=True).encode("utf-8") + this_dag_hash = md5(dag_json).hexdigest() + + # set first dag hash on first pass + if first_dag_hash is None: + first_dag_hash = this_dag_hash + + # dag hash should not change without change in structure (we're in a loop) + assert this_dag_hash == first_dag_hash