From 5cf6a94b5a527570a93c77d75dcb510c517b84df Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Mon, 15 Jun 2026 14:27:04 -0400 Subject: [PATCH 1/4] feature/issue-68575: Add bundle_name to Dag --- airflow-core/src/airflow/dag_processing/dagbag.py | 4 ++++ airflow-core/tests/unit/dag_processing/test_dagbag.py | 10 ++++++++++ generated/provider_dependencies.json.sha256sum | 2 +- task-sdk/src/airflow/sdk/definitions/dag.py | 2 ++ task-sdk/tests/task_sdk/definitions/test_dag.py | 9 +++++++++ 5 files changed, 26 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py b/airflow-core/src/airflow/dag_processing/dagbag.py index 34684810977c7..15095e458d368 100644 --- a/airflow-core/src/airflow/dag_processing/dagbag.py +++ b/airflow-core/src/airflow/dag_processing/dagbag.py @@ -337,6 +337,10 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): try: if dag.fileloc is None: dag.fileloc = filepath + + # Add the bundle_name to the DAG + dag.bundle_name = self.bundle_name + # Validate before adding to bag (matches original _process_modules behavior) dag.validate() _validate_executor_fields(dag, self.bundle_name) diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py b/airflow-core/tests/unit/dag_processing/test_dagbag.py index 99abd92a59f73..bd330fbc176dc 100644 --- a/airflow-core/tests/unit/dag_processing/test_dagbag.py +++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py @@ -360,6 +360,16 @@ def test_dagbag_with_bundle_name(self, tmp_path): dagbag2 = DagBag(dag_folder=os.fspath(tmp_path)) assert dagbag2.bundle_name is None + def test_dag_with_bundle_name(self, tmp_path): + """Test that bundle_name is attached to each Dag in the DagBag.""" + dagbag = DagBag(dag_folder=os.fspath(tmp_path), bundle_name="test_bundle") + for dag in dagbag.dags.values(): + assert dag.bundle_name == "test_bundle" + + dagbag2 = DagBag(dag_folder=os.fspath(tmp_path)) + for dag in dagbag2.dags.values(): + assert dag.bundle_name is None + def test_get_existing_dag(self, tmp_path, standard_example_dags_folder): """ Test that we're able to parse some example DAGs and retrieve them diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index d173961afb128..413fc17032478 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -b17f09d421b67d9d3925516c27c0fc4b4fb9f4fa4e4c495ebf3c643b3d12e59c +8609061b1d7c65722ca143c6e54bf569c2b3bb2bfeac9ecc85c97a114a5d83ac diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index fd47de467fea4..ad69f02dc9c31 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -542,6 +542,8 @@ def __rich_repr__(self): fileloc: str = attrs.field(init=False, factory=_default_fileloc) relative_fileloc: str | None = attrs.field(init=False, default=None) + bundle_name: str | None = attrs.field(init=False, default=None) + partial: bool = attrs.field(init=False, default=False) edge_info: dict[str, dict[str, EdgeInfoType]] = attrs.field(init=False, factory=dict) diff --git a/task-sdk/tests/task_sdk/definitions/test_dag.py b/task-sdk/tests/task_sdk/definitions/test_dag.py index c42eb8dfc80a1..7f3c6f2bf3b2c 100644 --- a/task-sdk/tests/task_sdk/definitions/test_dag.py +++ b/task-sdk/tests/task_sdk/definitions/test_dag.py @@ -703,6 +703,15 @@ def noop_pipeline(): ... assert dag.dag_id == "noop_pipeline" assert dag.fileloc == __file__ + def test_bundle_name_defaults_to_none(self): + dag = DAG("test_dag", schedule=None) + assert dag.bundle_name is None + + def test_bundle_name_can_be_set(self): + dag = DAG("test_dag", schedule=None) + dag.bundle_name = "my_bundle" + assert dag.bundle_name == "my_bundle" + def test_set_dag_id(self): """Test that checks you can set dag_id from decorator.""" From f6575a938765ea7c71e969a05587663bdc5e872b Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Mon, 15 Jun 2026 14:57:41 -0400 Subject: [PATCH 2/4] feature/issue-68575: Updating serialization --- airflow-core/src/airflow/serialization/schema.json | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/serialization/schema.json b/airflow-core/src/airflow/serialization/schema.json index b9efe88448039..872c3a1331ee3 100644 --- a/airflow-core/src/airflow/serialization/schema.json +++ b/airflow-core/src/airflow/serialization/schema.json @@ -185,6 +185,7 @@ "fail_fast": { "type": "boolean", "default": false }, "fileloc": { "type" : "string"}, "relative_fileloc": { "type" : "string"}, + "bundle_name": { "anyOf": [{ "type": "null" }, { "type": "string" }] }, "_processor_dags_folder": { "anyOf": [ { "type": "null" }, From 2bf846f6bf73102fca59d2febb7512528df904e6 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Tue, 16 Jun 2026 16:39:59 -0400 Subject: [PATCH 3/4] feature/issue-68575: Adding bundle_name to get_serialized_fields --- airflow-core/src/airflow/serialization/definitions/dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index 77f440987f58b..866c500615b8e 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -180,6 +180,7 @@ def get_serialized_fields(cls) -> frozenset[str]: "task_group", "timetable", "timezone", + "bundle_name", } ) From c66176d813af9d88bb0ce0e9afcc217b2baedb12 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Tue, 16 Jun 2026 18:24:58 -0400 Subject: [PATCH 4/4] feature/issue-68575: Fixing CI failures --- .../src/airflow/models/serialized_dag.py | 1 + .../airflow/serialization/definitions/dag.py | 1 + .../tests/unit/models/test_serialized_dag.py | 24 +++++++++++++++++++ .../serialization/test_dag_serialization.py | 16 +++++++++++++ uv.lock | 16 ++++++++++--- 5 files changed, 55 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index ce0333653114e..fd773e8ed4fe3 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -376,6 +376,7 @@ def hash(cls, dag_data): # bundle_path and relative fileloc more correctly determines the # dag file location. data_["dag"].pop("fileloc", None) + data_["dag"].pop("bundle_name", None) data_json = json.dumps(data_, sort_keys=True).encode("utf-8") return md5(data_json).hexdigest() diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index 866c500615b8e..9cf250c63b9af 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -115,6 +115,7 @@ class SerializedDAG: rerun_with_latest_version: bool | None = None doc_md: str | None = None edge_info: dict[str, dict[str, EdgeInfoType]] = attrs.field(factory=dict) + bundle_name: str | None = None end_date: datetime.datetime | None = None fail_fast: bool = False has_on_failure_callback: bool = False diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index a1ad63de5d688..03a4159eb2b70 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -719,6 +719,30 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self): assert "fileloc" in test_data["dag"] assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py" + def test_hash_method_removes_bundle_name_and_remains_consistent(self): + """Test that the hash method removes bundle_name before hashing.""" + test_data = { + "__version": 1, + "dag": { + "bundle_name": "bundle_a", + "dag_id": "test_dag", + "tasks": { + "task1": {"task_id": "task1"}, + }, + }, + } + + hash_with_bundle_name = SDM.hash(test_data) + + test_data["dag"]["bundle_name"] = "bundle_b" + + hash_with_different_bundle_name = SDM.hash(test_data) + + assert hash_with_bundle_name == hash_with_different_bundle_name + + # Verify original data is not mutated by hash() + assert test_data["dag"]["bundle_name"] == "bundle_b" + def test_hash_method_consistent_with_dict_ordering_in_template_fields(self, dag_maker): from airflow.sdk.bases.operator import BaseOperator diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 5bc5b0ae84768..4699415a04d72 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -2454,6 +2454,22 @@ def test_dag_rerun_with_latest_version_roundtrip(self, value, expected): deserialized_dag = DagSerialization.from_dict(serialized_dag) assert deserialized_dag.rerun_with_latest_version is expected + @pytest.mark.parametrize( + ("bundle_name", "expected"), + [ + ("my_bundle", "my_bundle"), + (None, None), + ], + ) + def test_dag_bundle_name_roundtrip(self, bundle_name, expected): + """Test that bundle_name survives serialization roundtrip.""" + dag = DAG(dag_id="test_dag_bundle_name_roundtrip", schedule=None) + BaseOperator(task_id="simple_task", dag=dag, start_date=datetime(2019, 8, 1)) + dag.bundle_name = bundle_name + serialized_dag = DagSerialization.to_dict(dag) + deserialized_dag = DagSerialization.from_dict(serialized_dag) + assert deserialized_dag.bundle_name == expected + @pytest.mark.parametrize( ("object_to_serialized", "expected_output"), [ diff --git a/uv.lock b/uv.lock index d506f861eb9b6..ac0ae9245eac6 100644 --- a/uv.lock +++ b/uv.lock @@ -5959,20 +5959,26 @@ dependencies = [ { name = "apache-airflow" }, { name = "apache-airflow-providers-common-compat" }, { name = "apache-airflow-providers-http" }, + { name = "sqlglot" }, ] [package.optional-dependencies] common-compat = [ { name = "apache-airflow-providers-common-compat" }, ] +common-sql = [ + { name = "apache-airflow-providers-common-sql" }, +] [package.dev-dependencies] dev = [ { name = "apache-airflow" }, { name = "apache-airflow-devel-common" }, { name = "apache-airflow-providers-common-compat" }, + { name = "apache-airflow-providers-common-sql" }, { name = "apache-airflow-providers-http" }, { name = "apache-airflow-task-sdk" }, + { name = "sqlglot" }, { name = "uuid6" }, ] docs = [ @@ -5984,17 +5990,21 @@ requires-dist = [ { name = "apache-airflow", editable = "." }, { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, { name = "apache-airflow-providers-common-compat", marker = "extra == 'common-compat'", editable = "providers/common/compat" }, + { name = "apache-airflow-providers-common-sql", marker = "extra == 'common-sql'", editable = "providers/common/sql" }, { name = "apache-airflow-providers-http", editable = "providers/http" }, + { name = "sqlglot", specifier = ">=30.0.0" }, ] -provides-extras = ["common-compat"] +provides-extras = ["common-compat", "common-sql"] [package.metadata.requires-dev] dev = [ { name = "apache-airflow", editable = "." }, { name = "apache-airflow-devel-common", editable = "devel-common" }, { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, + { name = "apache-airflow-providers-common-sql", editable = "providers/common/sql" }, { name = "apache-airflow-providers-http", editable = "providers/http" }, { name = "apache-airflow-task-sdk", editable = "task-sdk" }, + { name = "sqlglot", specifier = ">=30.0.0" }, { name = "uuid6", specifier = ">=2024.7.10" }, ] docs = [{ name = "apache-airflow-devel-common", extras = ["docs"], editable = "devel-common" }] @@ -21579,8 +21589,8 @@ name = "secretstorage" version = "3.5.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cryptography", marker = "python_full_version >= '3.14' or platform_machine != 'arm64' or sys_platform != 'darwin'" }, - { name = "jeepney", marker = "python_full_version >= '3.14' or platform_machine != 'arm64' or sys_platform != 'darwin'" }, + { name = "cryptography", marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or (platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" }, + { name = "jeepney", marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or (platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/1c/03/e834bcd866f2f8a49a85eaff47340affa3bfa391ee9912a952a1faa68c7b/secretstorage-3.5.0.tar.gz", hash = "sha256:f04b8e4689cbce351744d5537bf6b1329c6fc68f91fa666f60a380edddcd11be", size = 19884, upload-time = "2025-11-23T19:02:53.191Z" } wheels = [