diff --git a/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_version_data_hash_to_dag_version.py b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_version_data_hash_to_dag_version.py new file mode 100644 index 0000000000000..8f41a2380a01a --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_version_data_hash_to_dag_version.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add version_data_hash column to dag_version table. + +Persisting a hash of version_data avoids loading and comparing the full +JSON blob on every DAG parse, keeping DB-side parsing cheap and +memory-flat as version_data grows (large manifests from S3/custom bundles). + +Revision ID: 9e8d7c6b5a4f +Revises: 9ff64e1c35d3 +Create Date: 2026-06-16 12:00:00.000000 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +revision = "9e8d7c6b5a4f" +down_revision = "9ff64e1c35d3" +branch_labels = None +depends_on = None +airflow_version = "3.3.0" + + +def upgrade(): + """Add version_data_hash column to dag_version.""" + with op.batch_alter_table("dag_version", schema=None) as batch_op: + batch_op.add_column(sa.Column("version_data_hash", sa.String(32), nullable=True)) + + +def downgrade(): + """Remove version_data_hash column from dag_version.""" + from airflow.migrations.utils import disable_sqlite_fkeys + + with disable_sqlite_fkeys(op): + with op.batch_alter_table("dag_version", schema=None) as batch_op: + batch_op.drop_column("version_data_hash") diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py index e6564a6da0668..556d9e8ee5304 100644 --- a/airflow-core/src/airflow/models/dag_version.py +++ b/airflow-core/src/airflow/models/dag_version.py @@ -24,12 +24,14 @@ import sqlalchemy as sa import uuid6 -from sqlalchemy import ForeignKey, Integer, UniqueConstraint, select +from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint, select from sqlalchemy.orm import Mapped, joinedload, mapped_column, relationship from airflow._shared.timezones import timezone from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models.base import Base, StringID +from airflow.settings import json +from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks @@ -54,6 +56,7 @@ class DagVersion(Base): bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True) bundle_version: Mapped[str | None] = mapped_column(StringID(), nullable=True) version_data: Mapped[dict | None] = mapped_column(sa.JSON(), nullable=True) + version_data_hash: Mapped[str | None] = mapped_column(String(32), nullable=True) bundle = relationship( "DagBundleModel", primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name", @@ -104,6 +107,20 @@ def bundle_url(self) -> str | None: except ValueError: return None + @staticmethod + def compute_version_data_hash(version_data: dict | None) -> str | None: + """ + Compute a deterministic hash of *version_data*. + + Uses md5 of the JSON sorted by key so two equivalent dicts always + produce the same hash. Returns ``None`` when *version_data* is + ``None`` (the common case for built-in bundles). + """ + if version_data is None: + return None + data_json = json.dumps(version_data, sort_keys=True).encode("utf-8") + return md5(data_json).hexdigest() + @classmethod @provide_session def write_dag( @@ -141,6 +158,7 @@ def write_dag( bundle_name=bundle_name, bundle_version=bundle_version, version_data=version_data, + version_data_hash=cls.compute_version_data_hash(version_data), ) log.debug("Writing DagVersion %s to the DB", dag_version) session.add(dag_version) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index ce0333653114e..9a2ec60bf9247 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -29,7 +29,7 @@ import uuid6 from sqlalchemy import JSON, ForeignKey, LargeBinary, String, Uuid, exists, select, tuple_, update from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import Mapped, backref, foreign, mapped_column, relationship +from sqlalchemy.orm import Mapped, backref, foreign, load_only, mapped_column, relationship from sqlalchemy.sql.expression import func, literal from airflow._shared.timezones import timezone @@ -580,7 +580,19 @@ def _prefetch_dag_write_metadata( .subquery() ) dag_versions = session.scalars( - select(DagVersion).join(dv_subq, DagVersion.id == dv_subq.c.id).where(dv_subq.c.rn == 1) + select(DagVersion) + .options( + load_only( + DagVersion.id, + DagVersion.dag_id, + DagVersion.bundle_name, + DagVersion.bundle_version, + DagVersion.version_data_hash, + DagVersion.version_number, + ) + ) + .join(dv_subq, DagVersion.id == dv_subq.c.id) + .where(dv_subq.c.rn == 1) ).all() dv_by_dag_id: dict[str, DagVersion] = {dv.dag_id: dv for dv in dag_versions} @@ -694,12 +706,15 @@ def write_dag( # But if the bundle advanced, refresh the latest version's pointer in place — tasks resolve # their code from ``ti.dag_version.bundle_version`` at run time, so a stale # pointer makes runs execute an outdated commit. + incoming_version_data_hash = DagVersion.compute_version_data_hash(version_data) bundle_metadata_changed = ( - dag_version.bundle_version != bundle_version or dag_version.version_data != version_data + dag_version.bundle_version != bundle_version + or dag_version.version_data_hash != incoming_version_data_hash ) if bundle_metadata_changed: dag_version.bundle_version = bundle_version dag_version.version_data = version_data + dag_version.version_data_hash = incoming_version_data_hash session.merge(dag_version) DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session) if name_updated or bundle_metadata_changed: @@ -759,6 +774,7 @@ def write_dag( dag_version.bundle_name = bundle_name dag_version.bundle_version = bundle_version dag_version.version_data = version_data + dag_version.version_data_hash = DagVersion.compute_version_data_hash(version_data) session.merge(dag_version) # Update the latest DagCode DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 2155ca50d33b3..d72871f5f767d 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -116,7 +116,7 @@ class MappedClassProtocol(Protocol): "3.1.0": "cc92b33c6709", "3.1.8": "509b94a1042d", "3.2.0": "1d6611b6ab7c", - "3.3.0": "9ff64e1c35d3", + "3.3.0": "9e8d7c6b5a4f", } # Prefix used to identify tables holding data moved during migration.