diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index e7f4278fad3dd..ad08149fbd8bb 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import logging import warnings from contextlib import suppress from functools import cached_property @@ -121,6 +122,8 @@ RESOURCE_ASSET_ALIAS, ) +log = logging.getLogger(__name__) + _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, ...]] = { DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,), @@ -240,7 +243,10 @@ async def cleanup_session_middleware(request, call_next): from airflow import settings if settings.Session: - settings.Session.remove() + try: + settings.Session.remove() + except Exception: + log.warning("Failed to remove session during cleanup", exc_info=True) app.mount("/", WSGIMiddleware(flask_app)) diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py index 0e3bee5ed82cd..0b46e242f48e9 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py @@ -1110,3 +1110,84 @@ def test_session_cleanup_middleware_on_fastapi_route(self, mock_create_app): # Verify Session.remove() was called mock_session.remove.assert_called() + + +class TestFabAuthManagerSessionCleanupErrorHandling: + """Test that cleanup_session_middleware handles Session.remove() failures gracefully. + + When the underlying database connection is dead (e.g., MySQL 'Server has gone away', + PostgreSQL connection timeout), Session.remove() internally attempts a ROLLBACK which + raises an OperationalError. The middleware should catch and log this error instead of + propagating it as a 500 Internal Server Error to the client. + + See: https://github.com/apache/airflow/issues/XXXXX + """ + + @mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app") + def test_session_remove_db_error_does_not_propagate(self, mock_create_app): + """When Session.remove() raises OperationalError, request should still succeed. + + Simulates MySQL 'Server has gone away' or similar DB errors during session + cleanup. The middleware should catch the exception and log a warning. + """ + from unittest.mock import patch + + from fastapi.testclient import TestClient + from sqlalchemy.exc import OperationalError + + mock_flask_app = MagicMock() + mock_create_app.return_value = mock_flask_app + + auth_manager = FabAuthManager() + fastapi_app = auth_manager.get_fastapi_app() + + client = TestClient(fastapi_app, raise_server_exceptions=False) + + with ( + patch("airflow.settings.Session") as mock_session, + patch("airflow.providers.fab.auth_manager.fab_auth_manager.log") as mock_log, + ): + # Simulate MySQL 'Server has gone away' error on Session.remove() + mock_session.remove.side_effect = OperationalError( + "ROLLBACK", {}, Exception("(2006, 'Server has gone away')") + ) + + response = client.get("/users/list/") + + # The request should NOT get a 500 from the middleware error + # (it may get other status codes from the mock Flask app, but not + # an unhandled exception from cleanup_session_middleware) + mock_session.remove.assert_called() + mock_log.warning.assert_called() + assert response is not None + + @mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app") + def test_session_remove_generic_error_does_not_propagate(self, mock_create_app): + """Any exception from Session.remove() should be caught during cleanup. + + This covers edge cases beyond OperationalError, such as AttributeError + when the session registry is in an unexpected state. + """ + from unittest.mock import patch + + from fastapi.testclient import TestClient + + mock_flask_app = MagicMock() + mock_create_app.return_value = mock_flask_app + + auth_manager = FabAuthManager() + fastapi_app = auth_manager.get_fastapi_app() + + client = TestClient(fastapi_app, raise_server_exceptions=False) + + with ( + patch("airflow.settings.Session") as mock_session, + patch("airflow.providers.fab.auth_manager.fab_auth_manager.log") as mock_log, + ): + mock_session.remove.side_effect = RuntimeError("unexpected session error") + + # Should not raise - the middleware catches all exceptions from Session.remove() + response = client.get("/users/list/") + mock_session.remove.assert_called() + mock_log.warning.assert_called() + assert response is not None