From dbde7e6365546c277033aa132636203f66459edd Mon Sep 17 00:00:00 2001 From: Taehoon Kim Date: Tue, 9 Jun 2026 18:32:32 +0900 Subject: [PATCH] Fix StackdriverRemoteLogIO supervisor crash on transport send failure --- .../cloud/log/stackdriver_task_handler.py | 5 ++- .../log/test_stackdriver_task_handler.py | 32 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py index dd184c230ebf0..59fc14a444245 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -183,7 +183,10 @@ def proc( if map_index := event.get("map_index"): labels["map_index"] = str(map_index) - _transport.send(record, str(msg.get("event", "")), resource=self.resource, labels=labels) + try: + _transport.send(record, str(msg.get("event", "")), resource=self.resource, labels=labels) + except Exception: + _logger.warning("Failed to send log to Cloud Logging", exc_info=True) return event return (proc,) diff --git a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py index da0068ae67b97..d61afa8a0a90b 100644 --- a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py @@ -325,6 +325,38 @@ def test_processors_skips_non_task_logger(self, mock_client, mock_get_creds_and_ assert result is event mock_transport_type.return_value.send.assert_not_called() + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="airflow.sdk.log only exists in Airflow 3+") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client") + def test_processors_survives_transport_send_failure( + self, mock_client, mock_get_creds_and_project_id, caplog + ): + """Test that transport.send() failure does not crash the logging processor.""" + mock_get_creds_and_project_id.return_value = ("creds", "project_id") + + mock_transport_type = mock.MagicMock() + mock_transport_type.return_value.send.side_effect = RuntimeError("IAM permission denied") + with mock.patch("airflow.sdk.log.relative_path_from_logger", return_value="dag/task/1.log"): + io = StackdriverRemoteLogIO( + base_log_folder=self.local_log_location, + gcp_log_name="airflow", + transport_type=mock_transport_type, + ) + proc = io.processors[0] + + event = { + "event": "log message test", + "logger_name": "airflow.task", + "timestamp": "2026-06-30T00:00:00+09:00", + } + with caplog.at_level(logging.WARNING): + result = proc(mock.MagicMock(), "info", event) + + # Ensure that it still returns the event + assert result is event + # Ensure that the error is logged + assert "Failed to send log to Cloud Logging" in caplog.text + @pytest.mark.usefixtures("clean_stackdriver_handlers") @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")