From 98aa505102ed604e240dcfc486a13f4e14f107d9 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Tue, 27 Aug 2024 11:00:49 +0200 Subject: [PATCH] fix: cast list to flattened string in openlineage InfoJsonEncodable Signed-off-by: Maciej Obuchowski --- airflow/providers/openlineage/utils/utils.py | 2 +- .../openlineage/plugins/test_adapter.py | 99 +++++++++---------- .../openlineage/plugins/test_utils.py | 4 +- .../providers/openlineage/utils/test_utils.py | 2 +- 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 316666f2c009f..ec58c6e2d743e 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -199,7 +199,7 @@ def _cast_basic_types(value): return value.isoformat() if isinstance(value, datetime.timedelta): return f"{value.total_seconds()} seconds" - if isinstance(value, (set, tuple)): + if isinstance(value, (set, list, tuple)): return str(list(value)) return value diff --git a/tests/providers/openlineage/plugins/test_adapter.py b/tests/providers/openlineage/plugins/test_adapter.py index 19bba5fffb9ce..18f457c5be1f5 100644 --- a/tests/providers/openlineage/plugins/test_adapter.py +++ b/tests/providers/openlineage/plugins/test_adapter.py @@ -528,7 +528,7 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta mock_stats_timer.assert_called_with("ol.emit.attempts") -@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) +@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=False) @mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") @@ -578,7 +578,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "description": "dag desc", "owner": "airflow", "start_date": "2024-06-01T00:00:00+00:00", - "tags": [], + "tags": "[]", "fileloc": pathlib.Path(__file__).resolve().as_posix(), } if hasattr(dag, "schedule_interval"): # Airflow 2 compat. @@ -587,56 +587,53 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat expected_dag_info["timetable_summary"] = "1 day, 0:00:00" assert len(client.emit.mock_calls) == 1 - assert ( - call( - RunEvent( - eventType=RunState.START, - eventTime=event_time.isoformat(), - run=Run( - runId=random_uuid, - facets={ - "nominalTime": nominal_time_run.NominalTimeRunFacet( - nominalStartTime=event_time.isoformat(), - nominalEndTime=event_time.isoformat(), - ), - "airflowDagRun": AirflowDagRunFacet( - dag=expected_dag_info, - dagRun={ - "conf": {}, - "dag_id": "dag_id", - "data_interval_start": event_time.isoformat(), - "data_interval_end": event_time.isoformat(), - "external_trigger": None, - "run_id": run_id, - "run_type": None, - "start_date": event_time.isoformat(), - }, - ), - "debug": AirflowDebugRunFacet(packages=ANY), - }, - ), - job=Job( - namespace=namespace(), - name="dag_id", - facets={ - "documentation": documentation_job.DocumentationJobFacet(description="dag desc"), - "ownership": ownership_job.OwnershipJobFacet( - owners=[ - ownership_job.Owner(name="airflow", type=None), - ] - ), - **job_facets, - "jobType": job_type_job.JobTypeJobFacet( - processingType="BATCH", integration="AIRFLOW", jobType="DAG" - ), - }, - ), - producer=_PRODUCER, - inputs=[], - outputs=[], - ) + client.emit.assert_called_once_with( + RunEvent( + eventType=RunState.START, + eventTime=event_time.isoformat(), + run=Run( + runId=random_uuid, + facets={ + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime=event_time.isoformat(), + nominalEndTime=event_time.isoformat(), + ), + "airflowDagRun": AirflowDagRunFacet( + dag=expected_dag_info, + dagRun={ + "conf": {}, + "dag_id": "dag_id", + "data_interval_start": event_time.isoformat(), + "data_interval_end": event_time.isoformat(), + "external_trigger": None, + "run_id": run_id, + "run_type": None, + "start_date": event_time.isoformat(), + }, + ), + # "debug": AirflowDebugRunFacet(packages=ANY), + }, + ), + job=Job( + namespace=namespace(), + name="dag_id", + facets={ + "documentation": documentation_job.DocumentationJobFacet(description="dag desc"), + "ownership": ownership_job.OwnershipJobFacet( + owners=[ + ownership_job.Owner(name="airflow", type=None), + ] + ), + **job_facets, + "jobType": job_type_job.JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="DAG" + ), + }, + ), + producer=_PRODUCER, + inputs=[], + outputs=[], ) - in client.emit.mock_calls ) mock_stats_incr.assert_not_called() diff --git a/tests/providers/openlineage/plugins/test_utils.py b/tests/providers/openlineage/plugins/test_utils.py index 0be62a5b0ca7f..382f806f23345 100644 --- a/tests/providers/openlineage/plugins/test_utils.py +++ b/tests/providers/openlineage/plugins/test_utils.py @@ -164,7 +164,7 @@ class Test: } -def test_info_json_encodable_list_does_not_flatten(): +def test_info_json_encodable_list_does_flatten(): class TestInfo(InfoJsonEncodable): includes = ["alist"] @@ -174,7 +174,7 @@ class Test: obj = Test(["a", "b", "c"]) - assert json.loads(json.dumps(TestInfo(obj))) == {"alist": ["a", "b", "c"]} + assert json.loads(json.dumps(TestInfo(obj))) == {"alist": "['a', 'b', 'c']"} def test_info_json_encodable_list_does_include_nonexisting(): diff --git a/tests/providers/openlineage/utils/test_utils.py b/tests/providers/openlineage/utils/test_utils.py index d340d39e17ebc..d97a447e99949 100644 --- a/tests/providers/openlineage/utils/test_utils.py +++ b/tests/providers/openlineage/utils/test_utils.py @@ -142,7 +142,7 @@ def test_get_airflow_dag_run_facet(): "owner": "airflow", "timetable": {}, "start_date": "2024-06-01T00:00:00+00:00", - "tags": ["test"], + "tags": "['test']", } if hasattr(dag, "schedule_interval"): # Airflow 2 compat. expected_dag_info["schedule_interval"] = "@once"