From b64ac5df5adead0aedb21abbfbdf90e9b9b84db0 Mon Sep 17 00:00:00 2001 From: anmolxlight Date: Tue, 16 Jun 2026 15:54:13 +0000 Subject: [PATCH 1/2] feat: store hash of dag_version.version_data to avoid loading/comparing large manifests Persist a version_data_hash (md5 of canonical JSON) on DagVersion and compare/prefetch that instead of the full version_data blob. Changes: - Add version_data_hash column (String(32), nullable) to DagVersion model - Add compute_version_data_hash() static method on DagVersion - In _prefetch_dag_write_metadata, use load_only() to skip loading the potentially-large version_data JSON column - In write_dag fast path, compare version_data_hash instead of full dicts - Update in-place refresh and no-TI-update paths to set version_data_hash - Alembic migration 0123 (rev: 9e8d7c6b5a4f) for the new column Closes: #68567 --- ..._0_add_version_data_hash_to_dag_version.py | 55 +++++++++++++++++++ .../src/airflow/models/dag_version.py | 20 ++++++- .../src/airflow/models/serialized_dag.py | 22 +++++++- .../provider_dependencies.json.sha256sum | 2 +- uv.lock | 16 +++++- 5 files changed, 107 insertions(+), 8 deletions(-) create mode 100644 airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_version_data_hash_to_dag_version.py diff --git a/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_version_data_hash_to_dag_version.py b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_version_data_hash_to_dag_version.py new file mode 100644 index 0000000000000..8f41a2380a01a --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_version_data_hash_to_dag_version.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add version_data_hash column to dag_version table. + +Persisting a hash of version_data avoids loading and comparing the full +JSON blob on every DAG parse, keeping DB-side parsing cheap and +memory-flat as version_data grows (large manifests from S3/custom bundles). + +Revision ID: 9e8d7c6b5a4f +Revises: 9ff64e1c35d3 +Create Date: 2026-06-16 12:00:00.000000 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +revision = "9e8d7c6b5a4f" +down_revision = "9ff64e1c35d3" +branch_labels = None +depends_on = None +airflow_version = "3.3.0" + + +def upgrade(): + """Add version_data_hash column to dag_version.""" + with op.batch_alter_table("dag_version", schema=None) as batch_op: + batch_op.add_column(sa.Column("version_data_hash", sa.String(32), nullable=True)) + + +def downgrade(): + """Remove version_data_hash column from dag_version.""" + from airflow.migrations.utils import disable_sqlite_fkeys + + with disable_sqlite_fkeys(op): + with op.batch_alter_table("dag_version", schema=None) as batch_op: + batch_op.drop_column("version_data_hash") diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py index e6564a6da0668..556d9e8ee5304 100644 --- a/airflow-core/src/airflow/models/dag_version.py +++ b/airflow-core/src/airflow/models/dag_version.py @@ -24,12 +24,14 @@ import sqlalchemy as sa import uuid6 -from sqlalchemy import ForeignKey, Integer, UniqueConstraint, select +from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint, select from sqlalchemy.orm import Mapped, joinedload, mapped_column, relationship from airflow._shared.timezones import timezone from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models.base import Base, StringID +from airflow.settings import json +from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks @@ -54,6 +56,7 @@ class DagVersion(Base): bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True) bundle_version: Mapped[str | None] = mapped_column(StringID(), nullable=True) version_data: Mapped[dict | None] = mapped_column(sa.JSON(), nullable=True) + version_data_hash: Mapped[str | None] = mapped_column(String(32), nullable=True) bundle = relationship( "DagBundleModel", primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name", @@ -104,6 +107,20 @@ def bundle_url(self) -> str | None: except ValueError: return None + @staticmethod + def compute_version_data_hash(version_data: dict | None) -> str | None: + """ + Compute a deterministic hash of *version_data*. + + Uses md5 of the JSON sorted by key so two equivalent dicts always + produce the same hash. Returns ``None`` when *version_data* is + ``None`` (the common case for built-in bundles). + """ + if version_data is None: + return None + data_json = json.dumps(version_data, sort_keys=True).encode("utf-8") + return md5(data_json).hexdigest() + @classmethod @provide_session def write_dag( @@ -141,6 +158,7 @@ def write_dag( bundle_name=bundle_name, bundle_version=bundle_version, version_data=version_data, + version_data_hash=cls.compute_version_data_hash(version_data), ) log.debug("Writing DagVersion %s to the DB", dag_version) session.add(dag_version) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index ce0333653114e..9a2ec60bf9247 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -29,7 +29,7 @@ import uuid6 from sqlalchemy import JSON, ForeignKey, LargeBinary, String, Uuid, exists, select, tuple_, update from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import Mapped, backref, foreign, mapped_column, relationship +from sqlalchemy.orm import Mapped, backref, foreign, load_only, mapped_column, relationship from sqlalchemy.sql.expression import func, literal from airflow._shared.timezones import timezone @@ -580,7 +580,19 @@ def _prefetch_dag_write_metadata( .subquery() ) dag_versions = session.scalars( - select(DagVersion).join(dv_subq, DagVersion.id == dv_subq.c.id).where(dv_subq.c.rn == 1) + select(DagVersion) + .options( + load_only( + DagVersion.id, + DagVersion.dag_id, + DagVersion.bundle_name, + DagVersion.bundle_version, + DagVersion.version_data_hash, + DagVersion.version_number, + ) + ) + .join(dv_subq, DagVersion.id == dv_subq.c.id) + .where(dv_subq.c.rn == 1) ).all() dv_by_dag_id: dict[str, DagVersion] = {dv.dag_id: dv for dv in dag_versions} @@ -694,12 +706,15 @@ def write_dag( # But if the bundle advanced, refresh the latest version's pointer in place — tasks resolve # their code from ``ti.dag_version.bundle_version`` at run time, so a stale # pointer makes runs execute an outdated commit. + incoming_version_data_hash = DagVersion.compute_version_data_hash(version_data) bundle_metadata_changed = ( - dag_version.bundle_version != bundle_version or dag_version.version_data != version_data + dag_version.bundle_version != bundle_version + or dag_version.version_data_hash != incoming_version_data_hash ) if bundle_metadata_changed: dag_version.bundle_version = bundle_version dag_version.version_data = version_data + dag_version.version_data_hash = incoming_version_data_hash session.merge(dag_version) DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session) if name_updated or bundle_metadata_changed: @@ -759,6 +774,7 @@ def write_dag( dag_version.bundle_name = bundle_name dag_version.bundle_version = bundle_version dag_version.version_data = version_data + dag_version.version_data_hash = DagVersion.compute_version_data_hash(version_data) session.merge(dag_version) # Update the latest DagCode DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session) diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index d173961afb128..6bc72fc0c608f 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -b17f09d421b67d9d3925516c27c0fc4b4fb9f4fa4e4c495ebf3c643b3d12e59c +e14a01deac3579ec86383046f5e8fee680fae21d446e6d81cbf10395a9837cad \ No newline at end of file diff --git a/uv.lock b/uv.lock index a164ae529245e..eca04e9cb3c27 100644 --- a/uv.lock +++ b/uv.lock @@ -5959,20 +5959,26 @@ dependencies = [ { name = "apache-airflow" }, { name = "apache-airflow-providers-common-compat" }, { name = "apache-airflow-providers-http" }, + { name = "sqlglot" }, ] [package.optional-dependencies] common-compat = [ { name = "apache-airflow-providers-common-compat" }, ] +common-sql = [ + { name = "apache-airflow-providers-common-sql" }, +] [package.dev-dependencies] dev = [ { name = "apache-airflow" }, { name = "apache-airflow-devel-common" }, { name = "apache-airflow-providers-common-compat" }, + { name = "apache-airflow-providers-common-sql" }, { name = "apache-airflow-providers-http" }, { name = "apache-airflow-task-sdk" }, + { name = "sqlglot" }, { name = "uuid6" }, ] docs = [ @@ -5984,17 +5990,21 @@ requires-dist = [ { name = "apache-airflow", editable = "." }, { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, { name = "apache-airflow-providers-common-compat", marker = "extra == 'common-compat'", editable = "providers/common/compat" }, + { name = "apache-airflow-providers-common-sql", marker = "extra == 'common-sql'", editable = "providers/common/sql" }, { name = "apache-airflow-providers-http", editable = "providers/http" }, + { name = "sqlglot", specifier = ">=30.0.0" }, ] -provides-extras = ["common-compat"] +provides-extras = ["common-compat", "common-sql"] [package.metadata.requires-dev] dev = [ { name = "apache-airflow", editable = "." }, { name = "apache-airflow-devel-common", editable = "devel-common" }, { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, + { name = "apache-airflow-providers-common-sql", editable = "providers/common/sql" }, { name = "apache-airflow-providers-http", editable = "providers/http" }, { name = "apache-airflow-task-sdk", editable = "task-sdk" }, + { name = "sqlglot", specifier = ">=30.0.0" }, { name = "uuid6", specifier = ">=2024.7.10" }, ] docs = [{ name = "apache-airflow-devel-common", extras = ["docs"], editable = "devel-common" }] @@ -21579,8 +21589,8 @@ name = "secretstorage" version = "3.5.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cryptography", marker = "python_full_version >= '3.14' or platform_machine != 'arm64' or sys_platform != 'darwin'" }, - { name = "jeepney", marker = "python_full_version >= '3.14' or platform_machine != 'arm64' or sys_platform != 'darwin'" }, + { name = "cryptography", marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or (platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" }, + { name = "jeepney", marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or (platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/1c/03/e834bcd866f2f8a49a85eaff47340affa3bfa391ee9912a952a1faa68c7b/secretstorage-3.5.0.tar.gz", hash = "sha256:f04b8e4689cbce351744d5537bf6b1329c6fc68f91fa666f60a380edddcd11be", size = 19884, upload-time = "2025-11-23T19:02:53.191Z" } wheels = [ From e5f8968183bda1f68b51fae49f0d72721a24c430 Mon Sep 17 00:00:00 2001 From: Anmol Mishra Date: Fri, 19 Jun 2026 03:55:21 +0000 Subject: [PATCH 2/2] fix: update _REVISION_HEADS_MAP and add trailing newline to provider_dependencies.sha256sum - Point 3.3.0 head to new migration revision 9e8d7c6b5a4f - Fix end-of-file-fixer on generated/provider_dependencies.json.sha256sum --- airflow-core/src/airflow/utils/db.py | 2 +- generated/provider_dependencies.json.sha256sum | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 2155ca50d33b3..d72871f5f767d 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -116,7 +116,7 @@ class MappedClassProtocol(Protocol): "3.1.0": "cc92b33c6709", "3.1.8": "509b94a1042d", "3.2.0": "1d6611b6ab7c", - "3.3.0": "9ff64e1c35d3", + "3.3.0": "9e8d7c6b5a4f", } # Prefix used to identify tables holding data moved during migration. diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index 6bc72fc0c608f..ee204e3203e0f 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -e14a01deac3579ec86383046f5e8fee680fae21d446e6d81cbf10395a9837cad \ No newline at end of file +e14a01deac3579ec86383046f5e8fee680fae21d446e6d81cbf10395a9837cad