diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 5f564df6188ed..f315ff12f6a0b 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -23,12 +23,16 @@ from airflow._shared.secrets_masker import redact from airflow.configuration import conf from airflow.settings import json +from airflow.utils.module_loading import qualname def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float: """ Return a serializable representation of the templated field. + If ``templated_field`` is provided via a callable, compute MD5 hash of source + and return following serialized value: `` Any: try: serialized = template_field.serialize() except AttributeError: - serialized = str(template_field) + if callable(template_field): + full_qualified_name = qualname(template_field, True) + serialized = f"" + else: + serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) return ( diff --git a/airflow-core/src/airflow/utils/module_loading.py b/airflow-core/src/airflow/utils/module_loading.py index e0ec74bcb1f03..50ae287c36dc3 100644 --- a/airflow-core/src/airflow/utils/module_loading.py +++ b/airflow-core/src/airflow/utils/module_loading.py @@ -46,10 +46,13 @@ def import_string(dotted_path: str): raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class') -def qualname(o: object | Callable) -> str: - """Convert an attribute/class/function to a string importable by ``import_string``.""" - if callable(o) and hasattr(o, "__module__") and hasattr(o, "__name__"): - return f"{o.__module__}.{o.__name__}" +def qualname(o: object | Callable, use_qualname: bool = False) -> str: + """Convert an attribute/class/callable to a string importable by ``import_string``.""" + if callable(o) and hasattr(o, "__module__"): + if use_qualname and hasattr(o, "__qualname__"): + return f"{o.__module__}.{o.__qualname__}" + if hasattr(o, "__name__"): + return f"{o.__module__}.{o.__name__}" cls = o diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 337d5cd725460..24c0bb46df6e4 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -1593,6 +1593,31 @@ def test_task_resources(self): assert deserialized_task.resources == task.resources assert isinstance(deserialized_task.resources, Resources) + def test_template_field_via_callable_serialization(self): + """ + Test operator template fields serialization when provided as a callable. + """ + + def fn_template_field_callable(context, jinja_env): + pass + + def fn_returns_callable(): + def get_arg(context, jinja_env): + pass + + return get_arg + + task = MockOperator(task_id="task1", arg1=fn_template_field_callable, arg2=fn_returns_callable()) + serialized_task = SerializedBaseOperator.serialize_operator(task) + assert ( + serialized_task.get("arg1") + == ".fn_template_field_callable>" + ) + assert ( + serialized_task.get("arg2") + == ".fn_returns_callable..get_arg>" + ) + def test_task_group_serialization(self): """ Test TaskGroup serialization/deserialization.