diff --git a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py index dd184c230ebf0..4bcc6522f7a31 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -139,7 +139,8 @@ def proc( method_name: str, event: structlog.typing.EventDict, ): - if not logger or not relative_path_from_logger(logger): + relative = relative_path_from_logger(logger) if logger else None + if not logger or not relative: return event name = event.get("logger_name") or event.get("logger", "") @@ -182,7 +183,13 @@ def proc( labels[LABEL_TRY_NUMBER] = str(try_number) if map_index := event.get("map_index"): labels["map_index"] = str(map_index) - + # In AF3 supervisor context record.task_instance is not set. + # Parse labels from the structured log path as fallback only for + # keys that are still missing after the event-derived labels. + path_labels = _extract_labels_from_path(str(relative)) + for key, value in path_labels.items(): + if key not in labels: + labels[key] = value _transport.send(record, str(msg.get("event", "")), resource=self.resource, labels=labels) return event @@ -278,6 +285,36 @@ def _read_single_logs_page(self, log_filter: str, page_token: str | None = None) return "\n".join(messages), page.next_page_token +def _extract_labels_from_path(relative_path: str) -> dict[str, str]: + """Parse AF3 log path into Stackdriver labels. + + AF3's log path template is:: + + dag_id=/run_id=/task_id=/attempt=.log + + All label fields are extracted with zero DB access. When the path + does not match the expected format the function returns an empty dict + so callers can fall back to other label sources. + """ + # Strip the trailing file extension (.log) and split into segments + stem = relative_path.rsplit(".", 1)[0] if "." in relative_path else relative_path + segments = stem.split("/") + labels: dict[str, str] = {} + for segment in segments: + if "=" not in segment: + continue + key, _, value = segment.partition("=") + if key == "dag_id": + labels[LABEL_DAG_ID] = value + elif key == "task_id": + labels[LABEL_TASK_ID] = value + elif key == "attempt": + labels[LABEL_TRY_NUMBER] = value + elif key == "run_id": + labels["run_id"] = value + return labels + + def _task_instance_to_labels(ti) -> dict[str, str]: """Convert a task instance to Stackdriver labels.""" return { diff --git a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py index da0068ae67b97..2656ac3903cc0 100644 --- a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py @@ -325,6 +325,52 @@ def test_processors_skips_non_task_logger(self, mock_client, mock_get_creds_and_ assert result is event mock_transport_type.return_value.send.assert_not_called() + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="airflow.sdk.log only exists in Airflow 3+") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client") + def test_processors_extracts_labels_from_path_in_supervisor_context( + self, mock_client, mock_get_creds_and_project_id + ): + """Labels are parsed from the AF3 log path when task_instance is not set. + + In AF3 supervisor context ``record.task_instance`` is None because it is + a task-subprocess concept. The handler should fall back to extracting + ``dag_id``, ``task_id``, and ``try_number`` from the structured log path. + """ + mock_get_creds_and_project_id.return_value = ("creds", "project_id") + + mock_transport_type = mock.MagicMock() + af3_path = "dag_id=my_dag/run_id=scheduled__2026-06-08T00:00:00+00:00/task_id=print_date/attempt=3.log" + with mock.patch("airflow.sdk.log.relative_path_from_logger", return_value=af3_path): + io = StackdriverRemoteLogIO( + base_log_folder=self.local_log_location, + gcp_log_name="airflow", + transport_type=mock_transport_type, + ) + proc = io.processors[0] + + event = { + "event": "task log line", + "logger_name": "airflow.task", + "timestamp": "2026-06-08T10:30:00+00:00", + } + proc(mock.MagicMock(), "info", event) + + mock_transport = mock_transport_type.return_value + mock_transport.send.assert_called_once() + labels = mock_transport.send.call_args[1]["labels"] + assert labels["dag_id"] == "my_dag" + assert labels["task_id"] == "print_date" + assert labels["try_number"] == "3" + + def test_labels_from_path_returns_empty_on_unexpected_format(self): + """_labels_from_path returns an empty dict when the path doesn't match.""" + from airflow.providers.google.cloud.log.stackdriver_task_handler import _labels_from_path + + assert _labels_from_path("") == {} + assert _labels_from_path("no_equals_here.log") == {} + assert _labels_from_path("random/path/without/keys.log") == {} + @pytest.mark.usefixtures("clean_stackdriver_handlers") @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")