diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 417db28aed79f..b29d4fe04639f 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -363,8 +363,19 @@ class DagRunInfo(InfoJsonEncodable): "run_id", "run_type", "start_date", + "end_date", ] + casts = {"duration": lambda dagrun: DagRunInfo.duration(dagrun)} + + @classmethod + def duration(cls, dagrun: DagRun) -> float | None: + if not getattr(dagrun, "end_date", None) or not isinstance(dagrun.end_date, datetime.datetime): + return None + if not getattr(dagrun, "start_date", None) or not isinstance(dagrun.start_date, datetime.datetime): + return None + return (dagrun.end_date - dagrun.start_date).total_seconds() + class TaskInstanceInfo(InfoJsonEncodable): """Defines encoding TaskInstance object to JSON.""" diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index e1da5438ad5bc..f38eeec232b69 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -21,6 +21,8 @@ import pathlib from unittest.mock import MagicMock, patch +import pytest + from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import BaseOperator @@ -28,6 +30,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceState from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowJobFacet from airflow.providers.openlineage.utils.utils import ( + DagRunInfo, _get_task_groups_details, _get_tasks_details, get_airflow_dag_run_facet, @@ -133,6 +136,7 @@ def test_get_airflow_dag_run_facet(): dagrun_mock.run_id = "manual_2024-06-01T00:00:00+00:00" dagrun_mock.run_type = DagRunType.MANUAL dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4, tzinfo=datetime.timezone.utc) + dagrun_mock.end_date = datetime.datetime(2024, 6, 1, 1, 2, 14, 34172, tzinfo=datetime.timezone.utc) result = get_airflow_dag_run_facet(dagrun_mock) @@ -161,11 +165,35 @@ def test_get_airflow_dag_run_facet(): "run_id": "manual_2024-06-01T00:00:00+00:00", "run_type": "manual", "start_date": "2024-06-01T01:02:04+00:00", + "end_date": "2024-06-01T01:02:14.034172+00:00", + "duration": 10.034172, }, ) } +@pytest.mark.parametrize( + ("dag_run_attrs", "expected_duration"), + ( + ({"start_date": None, "end_date": None}, None), + ({"start_date": datetime.datetime(2025, 1, 1), "end_date": None}, None), + ({"start_date": None, "end_date": datetime.datetime(2025, 1, 1)}, None), + ({"start_date": "2024-06-01T01:02:04+00:00", "end_date": "2024-06-01T01:02:14.034172+00:00"}, None), + ( + { + "start_date": datetime.datetime(2025, 1, 1, 6, 1, 1, tzinfo=datetime.timezone.utc), + "end_date": datetime.datetime(2025, 1, 1, 6, 1, 12, 3456, tzinfo=datetime.timezone.utc), + }, + 11.003456, + ), + ), +) +def test_dag_run_duration(dag_run_attrs, expected_duration): + dag_run = MagicMock(**dag_run_attrs) + result = DagRunInfo.duration(dag_run) + assert result == expected_duration + + def test_get_fully_qualified_class_name_serialized_operator(): op_module_path = BASH_OPERATOR_PATH op_name = "BashOperator"