From 53633e56275716f9bdca78aa5536521b48bfa339 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 7 Nov 2025 19:08:31 +0800 Subject: [PATCH 1/3] fix(asset-alias): fix asset extra missing --- .../src/airflow/models/taskinstance.py | 26 ++++++++++++------- .../serialization/serialized_objects.py | 1 + .../serialization/test_serialized_objects.py | 3 ++- .../airflow/sdk/definitions/asset/__init__.py | 1 + .../src/airflow/sdk/execution_time/context.py | 1 + .../task_sdk/execution_time/test_context.py | 24 ++++++++++------- 6 files changed, 36 insertions(+), 20 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index d396d9a0f3b40..15e9d8d371849 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1402,7 +1402,7 @@ def register_asset_changes_in_db( session=session, ) - def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, str], set[str]]: + def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, str, str], set[str]]: d = defaultdict(set) for event in outlet_events: try: @@ -1412,31 +1412,37 @@ def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, str], set[s if alias_name not in outlet_alias_names: continue asset_key = AssetUniqueKey(**event["dest_asset_key"]) - extra_json = json.dumps(event["extra"], sort_keys=True) - d[asset_key, extra_json].add(alias_name) + asset_extra_json = json.dumps(event["dest_asset_extra"], sort_keys=True) + asset_event_extra_json = json.dumps(event["extra"], sort_keys=True) + d[asset_key, asset_extra_json, asset_event_extra_json].add(alias_name) return d outlet_alias_names = {o.name for o in task_outlets if o.type == AssetAlias.__name__ and o.name} if outlet_alias_names and (event_extras_from_aliases := _asset_event_extras_from_aliases()): - for (asset_key, extra_json), event_aliase_names in event_extras_from_aliases.items(): - extra = json.loads(extra_json) + for ( + asset_key, + asset_extra_json, + asset_event_extras_json, + ), event_aliase_names in event_extras_from_aliases.items(): + asset_event_extra = json.loads(asset_event_extras_json) + asset = Asset(name=asset_key.name, uri=asset_key.uri, extra=json.loads(asset_extra_json)) ti.log.debug("register event for asset %s with aliases %s", asset_key, event_aliase_names) event = asset_manager.register_asset_change( task_instance=ti, - asset=asset_key, + asset=asset, source_alias_names=event_aliase_names, - extra=extra, + extra=asset_event_extra, session=session, ) if event is None: ti.log.info("Dynamically creating AssetModel %s", asset_key) - session.add(AssetModel(name=asset_key.name, uri=asset_key.uri)) + session.add(AssetModel.from_public(asset)) session.flush() # So event can set up its asset fk. asset_manager.register_asset_change( task_instance=ti, - asset=asset_key, + asset=asset, source_alias_names=event_aliase_names, - extra=extra, + extra=asset_event_extra, session=session, ) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 0a009834e89f8..5307eedad8b6d 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -399,6 +399,7 @@ def decode_outlet_event_accessor(var: dict[str, Any]) -> OutletEventAccessor: dest_asset_key=AssetUniqueKey( name=e["dest_asset_key"]["name"], uri=e["dest_asset_key"]["uri"] ), + dest_asset_extra=e["dest_asset_extra"], extra=e["extra"], ) for e in asset_alias_events diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 93d33fd4fc582..eb4f57e537b71 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -409,7 +409,8 @@ def __len__(self) -> int: AssetAliasEvent( source_alias_name="test_alias", dest_asset_key=AssetUniqueKey(name="test_name", uri="test://asset-uri"), - extra={}, + dest_asset_extra={"extra": "from asset itsefl"}, + extra={"extra": "from event"}, ) ], ), diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index 85fcca8e9a17c..485584bd8ddb1 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -689,4 +689,5 @@ class AssetAliasEvent(attrs.AttrsInstance): source_alias_name: str dest_asset_key: AssetUniqueKey + dest_asset_extra: dict[str, JsonValue] extra: dict[str, JsonValue] diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index f5f4aed589e8d..4cc148030e4c9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -499,6 +499,7 @@ def add(self, asset: Asset | AssetRef, extra: dict[str, JsonValue] | None = None event = AssetAliasEvent( source_alias_name=asset_alias_name, dest_asset_key=asset_key, + dest_asset_extra=asset.extra if isinstance(asset, Asset) else {}, extra=extra or {}, ) self.asset_alias_events.append(event) diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py b/task-sdk/tests/task_sdk/execution_time/test_context.py index e9780c6e2a293..8372de7efa9a8 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_context.py +++ b/task-sdk/tests/task_sdk/execution_time/test_context.py @@ -363,6 +363,7 @@ class TestOutletEventAccessor: AssetAliasEvent( source_alias_name="test_alias", dest_asset_key=AssetUniqueKey(name="name", uri="uri"), + dest_asset_extra={}, extra={}, ) ], @@ -377,12 +378,13 @@ def test_add(self, add_arg, key, asset_alias_events, mock_supervisor_comms): assert outlet_event_accessor.asset_alias_events == asset_alias_events @pytest.mark.parametrize( - "add_arg", + "add_args", [ - Asset("name", "uri"), - Asset.ref(name="name"), - Asset.ref(uri="uri"), + (Asset(name="name", uri="uri", extra={"extra": "from asset itself"}), {"extra": "from event"}), + (Asset.ref(name="name"), {"extra": "from event"}), + (Asset.ref(uri="uri"), {"extra": "from event"}), ], + ids=["asset", "asset name ref", "asset uri ref"], ) @pytest.mark.parametrize( ("key", "asset_alias_events"), @@ -394,17 +396,21 @@ def test_add(self, add_arg, key, asset_alias_events, mock_supervisor_comms): AssetAliasEvent( source_alias_name="test_alias", dest_asset_key=AssetUniqueKey(name="name", uri="uri"), - extra={}, + dest_asset_extra={"extra": "from asset itself"}, + extra={"extra": "from event"}, ) ], ), ), + ids=["inactive asset", "active asset"], ) - def test_add_with_db(self, add_arg, key, asset_alias_events, mock_supervisor_comms): - mock_supervisor_comms.send.return_value = AssetResponse(name="name", uri="uri", group="") + def test_add_with_db(self, add_args, key, asset_alias_events, mock_supervisor_comms): + mock_supervisor_comms.send.return_value = AssetResponse( + name="name", uri="uri", group="", extra={"extra": "from asset itself"} + ) - outlet_event_accessor = OutletEventAccessor(key=key, extra={"not": ""}) - outlet_event_accessor.add(add_arg, extra={}) + outlet_event_accessor = OutletEventAccessor(key=key) + outlet_event_accessor.add(*add_args) assert outlet_event_accessor.asset_alias_events == asset_alias_events From 1c114673a228b45112be69c6e9038453b43c9db6 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 13 Nov 2025 18:42:37 +0800 Subject: [PATCH 2/3] fix(asset): handle asset ref cases --- .../src/airflow/sdk/execution_time/context.py | 17 +++++++------- .../task_sdk/execution_time/test_context.py | 22 +++++++++++-------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 4cc148030e4c9..cb774ee2b4e75 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -428,9 +428,9 @@ def __hash__(self): class _AssetRefResolutionMixin: - _asset_ref_cache: dict[AssetRef, AssetUniqueKey] = {} + _asset_ref_cache: dict[AssetRef, tuple[AssetUniqueKey, dict[str, JsonValue]]] = {} - def _resolve_asset_ref(self, ref: AssetRef) -> AssetUniqueKey: + def _resolve_asset_ref(self, ref: AssetRef) -> tuple[AssetUniqueKey, dict[str, JsonValue]]: with contextlib.suppress(KeyError): return self._asset_ref_cache[ref] @@ -445,8 +445,8 @@ def _resolve_asset_ref(self, ref: AssetRef) -> AssetUniqueKey: raise TypeError(f"Unimplemented asset ref: {type(ref)}") unique_key = AssetUniqueKey.from_asset(asset) for ref in refs_to_cache: - self._asset_ref_cache[ref] = unique_key - return unique_key + self._asset_ref_cache[ref] = (unique_key, asset.extra) + return (unique_key, asset.extra) # TODO: This is temporary to avoid code duplication between here & airflow/models/taskinstance.py @staticmethod @@ -491,15 +491,16 @@ def add(self, asset: Asset | AssetRef, extra: dict[str, JsonValue] | None = None return if isinstance(asset, AssetRef): - asset_key = self._resolve_asset_ref(asset) + asset_key, asset_extra = self._resolve_asset_ref(asset) else: asset_key = AssetUniqueKey.from_asset(asset) + asset_extra = asset.extra asset_alias_name = self.key.name event = AssetAliasEvent( source_alias_name=asset_alias_name, dest_asset_key=asset_key, - dest_asset_extra=asset.extra if isinstance(asset, Asset) else {}, + dest_asset_extra=asset_extra, extra=extra or {}, ) self.asset_alias_events.append(event) @@ -560,7 +561,7 @@ def __getitem__(self, key: Asset | AssetAlias | AssetRef) -> OutletEventAccessor elif isinstance(key, AssetAlias): hashable_key = AssetAliasUniqueKey.from_asset_alias(key) elif isinstance(key, AssetRef): - hashable_key = self._resolve_asset_ref(key) + hashable_key, _ = self._resolve_asset_ref(key) else: raise TypeError(f"Key should be either an asset or an asset alias, not {type(key)}") @@ -769,7 +770,7 @@ def __getitem__(self, key: Asset | AssetAlias | AssetRef) -> Sequence[AssetEvent if isinstance(key, Asset): hashable_key = AssetUniqueKey.from_asset(key) elif isinstance(key, AssetRef): - hashable_key = self._resolve_asset_ref(key) + hashable_key, _ = self._resolve_asset_ref(key) elif isinstance(key, AssetAlias): hashable_key = AssetAliasUniqueKey.from_asset_alias(key) else: diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py b/task-sdk/tests/task_sdk/execution_time/test_context.py index 8372de7efa9a8..36f1a3eaca860 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_context.py +++ b/task-sdk/tests/task_sdk/execution_time/test_context.py @@ -346,12 +346,13 @@ def test_nested_context(self): class TestOutletEventAccessor: @pytest.mark.parametrize( - "add_arg", + "add_args", [ - Asset("name", "uri"), - Asset.ref(name="name"), - Asset.ref(uri="uri"), + (Asset("name", "uri", extra={"extra": "from asset itself"}), {"extra": "from event"}), + (Asset.ref(name="name"), {"extra": "from event"}), + (Asset.ref(uri="uri"), {"extra": "from event"}), ], + ids=["asset", "asset name ref", "asset uri ref"], ) @pytest.mark.parametrize( ("key", "asset_alias_events"), @@ -363,18 +364,21 @@ class TestOutletEventAccessor: AssetAliasEvent( source_alias_name="test_alias", dest_asset_key=AssetUniqueKey(name="name", uri="uri"), - dest_asset_extra={}, - extra={}, + dest_asset_extra={"extra": "from asset itself"}, + extra={"extra": "from event"}, ) ], ), ), + ids=["inactive asset", "active asset"], ) - def test_add(self, add_arg, key, asset_alias_events, mock_supervisor_comms): - mock_supervisor_comms.send.return_value = AssetResponse(name="name", uri="uri", group="") + def test_add(self, add_args, key, asset_alias_events, mock_supervisor_comms): + mock_supervisor_comms.send.return_value = AssetResponse( + name="name", uri="uri", group="", extra={"extra": "from asset itself"} + ) outlet_event_accessor = OutletEventAccessor(key=key, extra={}) - outlet_event_accessor.add(add_arg) + outlet_event_accessor.add(*add_args) assert outlet_event_accessor.asset_alias_events == asset_alias_events @pytest.mark.parametrize( From 6824b86c7185dd62a5f6db6b678534ff60d0aca1 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 14 Nov 2025 17:37:11 +0800 Subject: [PATCH 3/3] fix(asset): add backward compatibility --- airflow-core/src/airflow/models/taskinstance.py | 3 ++- airflow-core/src/airflow/serialization/serialized_objects.py | 3 ++- .../tests/unit/serialization/test_serialized_objects.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 15e9d8d371849..6d11d1472af66 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1412,7 +1412,8 @@ def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, str, str], if alias_name not in outlet_alias_names: continue asset_key = AssetUniqueKey(**event["dest_asset_key"]) - asset_extra_json = json.dumps(event["dest_asset_extra"], sort_keys=True) + # fallback for backward compatibility + asset_extra_json = json.dumps(event.get("dest_asset_extra", {}), sort_keys=True) asset_event_extra_json = json.dumps(event["extra"], sort_keys=True) d[asset_key, asset_extra_json, asset_event_extra_json].add(alias_name) return d diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 5307eedad8b6d..8c1446fb919db 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -399,7 +399,8 @@ def decode_outlet_event_accessor(var: dict[str, Any]) -> OutletEventAccessor: dest_asset_key=AssetUniqueKey( name=e["dest_asset_key"]["name"], uri=e["dest_asset_key"]["uri"] ), - dest_asset_extra=e["dest_asset_extra"], + # fallback for backward compatibility + dest_asset_extra=e.get("dest_asset_extra", {}), extra=e["extra"], ) for e in asset_alias_events diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index eb4f57e537b71..327c505497a8a 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -409,7 +409,7 @@ def __len__(self) -> int: AssetAliasEvent( source_alias_name="test_alias", dest_asset_key=AssetUniqueKey(name="test_name", uri="test://asset-uri"), - dest_asset_extra={"extra": "from asset itsefl"}, + dest_asset_extra={"extra": "from asset itself"}, extra={"extra": "from event"}, ) ],