From 40159906e18d5a27791a26301c173c59644a6221 Mon Sep 17 00:00:00 2001 From: goingforstudying-ctrl Date: Mon, 8 Jun 2026 14:12:53 -0400 Subject: [PATCH 1/2] fix(google): extract Cloud Logging labels from AF3 log path In Airflow 3 the supervisor process runs REMOTE_TASK_LOG handlers, but record.task_instance is never set in supervisor context (it is a task-subprocess concept). When task_instance is missing the proc() closure shipped log entries with empty labels, making Cloud Logging entries unsearchable by dag_id / task_id. Parse dag_id, task_id, and try_number from the structured AF3 log path (dag_id=/run_id=/task_id=/attempt=.log) instead. This requires zero DB access and works regardless of whether the handler runs in a task subprocess or the supervisor. relates to #68240 --- .../cloud/log/stackdriver_task_handler.py | 41 ++++++++++++++++- .../log/test_stackdriver_task_handler.py | 46 +++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py index dd184c230ebf0..8ea852ed3f2b1 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -139,7 +139,8 @@ def proc( method_name: str, event: structlog.typing.EventDict, ): - if not logger or not relative_path_from_logger(logger): + relative = relative_path_from_logger(logger) if logger else None + if not logger or not relative: return event name = event.get("logger_name") or event.get("logger", "") @@ -182,7 +183,10 @@ def proc( labels[LABEL_TRY_NUMBER] = str(try_number) if map_index := event.get("map_index"): labels["map_index"] = str(map_index) - + # In AF3 supervisor context record.task_instance is not set. + # Parse labels from the structured log path as additional fallback. + path_labels = _labels_from_path(str(relative)) + labels.update(path_labels) _transport.send(record, str(msg.get("event", "")), resource=self.resource, labels=labels) return event @@ -278,6 +282,39 @@ def _read_single_logs_page(self, log_filter: str, page_token: str | None = None) return "\n".join(messages), page.next_page_token +def _labels_from_path(relative_path: str) -> dict[str, str]: + """Parse AF3 log path into Stackdriver labels. + + AF3's log path template is:: + + dag_id=/run_id=/task_id=/attempt=.log + + All four label fields are extracted with zero DB access. When the path + does not match the expected format the function returns an empty dict + so callers can fall back to other label sources. + """ + # Strip the trailing file extension (.log) and split into segments + stem = relative_path.rsplit(".", 1)[0] if "." in relative_path else relative_path + segments = stem.split("/") + labels: dict[str, str] = {} + for segment in segments: + if "=" not in segment: + continue + key, _, value = segment.partition("=") + if key == "dag_id": + labels[LABEL_DAG_ID] = value + elif key == "task_id": + labels[LABEL_TASK_ID] = value + elif key == "attempt": + labels[LABEL_TRY_NUMBER] = value + elif key == "run_id": + # run_id is NOT a standard Stackdriver label yet, but it is used + # on the write side via the log path template. Store it so the + # read path can filter on it (Bug 2 will wire this up). + pass + return labels + + def _task_instance_to_labels(ti) -> dict[str, str]: """Convert a task instance to Stackdriver labels.""" return { diff --git a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py index da0068ae67b97..2656ac3903cc0 100644 --- a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py @@ -325,6 +325,52 @@ def test_processors_skips_non_task_logger(self, mock_client, mock_get_creds_and_ assert result is event mock_transport_type.return_value.send.assert_not_called() + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="airflow.sdk.log only exists in Airflow 3+") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client") + def test_processors_extracts_labels_from_path_in_supervisor_context( + self, mock_client, mock_get_creds_and_project_id + ): + """Labels are parsed from the AF3 log path when task_instance is not set. + + In AF3 supervisor context ``record.task_instance`` is None because it is + a task-subprocess concept. The handler should fall back to extracting + ``dag_id``, ``task_id``, and ``try_number`` from the structured log path. + """ + mock_get_creds_and_project_id.return_value = ("creds", "project_id") + + mock_transport_type = mock.MagicMock() + af3_path = "dag_id=my_dag/run_id=scheduled__2026-06-08T00:00:00+00:00/task_id=print_date/attempt=3.log" + with mock.patch("airflow.sdk.log.relative_path_from_logger", return_value=af3_path): + io = StackdriverRemoteLogIO( + base_log_folder=self.local_log_location, + gcp_log_name="airflow", + transport_type=mock_transport_type, + ) + proc = io.processors[0] + + event = { + "event": "task log line", + "logger_name": "airflow.task", + "timestamp": "2026-06-08T10:30:00+00:00", + } + proc(mock.MagicMock(), "info", event) + + mock_transport = mock_transport_type.return_value + mock_transport.send.assert_called_once() + labels = mock_transport.send.call_args[1]["labels"] + assert labels["dag_id"] == "my_dag" + assert labels["task_id"] == "print_date" + assert labels["try_number"] == "3" + + def test_labels_from_path_returns_empty_on_unexpected_format(self): + """_labels_from_path returns an empty dict when the path doesn't match.""" + from airflow.providers.google.cloud.log.stackdriver_task_handler import _labels_from_path + + assert _labels_from_path("") == {} + assert _labels_from_path("no_equals_here.log") == {} + assert _labels_from_path("random/path/without/keys.log") == {} + @pytest.mark.usefixtures("clean_stackdriver_handlers") @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") From 432d9683971f5681f6df9d109e8324700667e554 Mon Sep 17 00:00:00 2001 From: goingforstudying-ctrl Date: Wed, 17 Jun 2026 07:40:09 -0400 Subject: [PATCH 2/2] fix(google): address review feedback on stackdriver label extraction - Rename _labels_from_path to _extract_labels_from_path - Use path labels as fallback instead of overriding event-derived labels - Actually store run_id instead of no-op pass branch --- .../cloud/log/stackdriver_task_handler.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py index 8ea852ed3f2b1..4bcc6522f7a31 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -184,9 +184,12 @@ def proc( if map_index := event.get("map_index"): labels["map_index"] = str(map_index) # In AF3 supervisor context record.task_instance is not set. - # Parse labels from the structured log path as additional fallback. - path_labels = _labels_from_path(str(relative)) - labels.update(path_labels) + # Parse labels from the structured log path as fallback only for + # keys that are still missing after the event-derived labels. + path_labels = _extract_labels_from_path(str(relative)) + for key, value in path_labels.items(): + if key not in labels: + labels[key] = value _transport.send(record, str(msg.get("event", "")), resource=self.resource, labels=labels) return event @@ -282,14 +285,14 @@ def _read_single_logs_page(self, log_filter: str, page_token: str | None = None) return "\n".join(messages), page.next_page_token -def _labels_from_path(relative_path: str) -> dict[str, str]: +def _extract_labels_from_path(relative_path: str) -> dict[str, str]: """Parse AF3 log path into Stackdriver labels. AF3's log path template is:: dag_id=/run_id=/task_id=/attempt=.log - All four label fields are extracted with zero DB access. When the path + All label fields are extracted with zero DB access. When the path does not match the expected format the function returns an empty dict so callers can fall back to other label sources. """ @@ -308,10 +311,7 @@ def _labels_from_path(relative_path: str) -> dict[str, str]: elif key == "attempt": labels[LABEL_TRY_NUMBER] = value elif key == "run_id": - # run_id is NOT a standard Stackdriver label yet, but it is used - # on the write side via the log path template. Store it so the - # read path can filter on it (Bug 2 will wire this up). - pass + labels["run_id"] = value return labels