From 41919161cd1bd2d7f1d0634274cfad334c37ef07 Mon Sep 17 00:00:00 2001 From: Mateusz Henc Date: Sat, 17 Dec 2022 22:12:00 +0100 Subject: [PATCH 1/2] AIP-44 Mark Scheduler, Webserver, Internal API Server components not using Internal API at all. --- airflow/api_internal/internal_api_call.py | 10 ++++++++++ airflow/cli/commands/internal_api_command.py | 3 +++ airflow/cli/commands/scheduler_command.py | 3 +++ airflow/www/app.py | 3 +++ 4 files changed, 19 insertions(+) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 226b82944fd72..4179188baa1dd 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -39,6 +39,16 @@ class InternalApiConfig: _use_internal_api = False _internal_api_endpoint = "" + @staticmethod + def force_database_direct_access(): + """Current component will not use Internal API. + + All methods decorated with internal_api_call will always be executed locally. + This mode is needed for "trusted" components like Scheduler, Webserver or Internal Api server. + """ + InternalApiConfig._initialized = True + InternalApiConfig._use_internal_api = False + @staticmethod def get_use_internal_api(): if not InternalApiConfig._initialized: diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index adb7f73a51c9b..19ab7ab98501c 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -38,6 +38,7 @@ from sqlalchemy.engine.url import make_url from airflow import settings +from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.cli.commands.webserver_command import GunicornMonitor from airflow.configuration import conf from airflow.exceptions import AirflowConfigException @@ -225,6 +226,8 @@ def create_app(config=None, testing=False): if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config: flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args() + InternalApiConfig.force_database_direct_access() + csrf = CSRFProtect() csrf.init_app(flask_app) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index 3b06c8510b90e..3965b552b736b 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -25,6 +25,7 @@ from daemon.pidfile import TimeoutPIDLockFile from airflow import settings +from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.configuration import conf from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.scheduler_job import SchedulerJob @@ -34,6 +35,8 @@ def _run_scheduler_job(args): + InternalApiConfig.force_database_direct_access() + job = SchedulerJob( subdir=process_subdir(args.subdir), num_runs=args.num_runs, diff --git a/airflow/www/app.py b/airflow/www/app.py index 19d2831dfdbcb..8d8bc9b069787 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -28,6 +28,7 @@ from sqlalchemy.engine.url import make_url from airflow import settings +from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.configuration import conf from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning from airflow.logging_config import configure_logging @@ -117,6 +118,8 @@ def create_app(config=None, testing=False): flask_app.json_provider_class = AirflowJsonProvider flask_app.json = AirflowJsonProvider(flask_app) + InternalApiConfig.force_database_direct_access() + csrf.init_app(flask_app) init_wsgi_middleware(flask_app) From c00d5f88466b212ee48d0c9c17e11b973a62c271 Mon Sep 17 00:00:00 2001 From: Mateusz Henc Date: Mon, 23 Jan 2023 21:33:53 +0100 Subject: [PATCH 2/2] Add tests --- tests/api_internal/test_internal_api_call.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index ed7bf725ec2eb..c96b2bde32a11 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -54,6 +54,16 @@ def test_get_use_internal_api_enabled(self): assert InternalApiConfig.get_use_internal_api() is True assert InternalApiConfig.get_internal_api_endpoint() == "http://localhost:8888/internal_api/v1/rpcapi" + @conf_vars( + { + ("core", "database_access_isolation"): "true", + ("core", "internal_api_url"): "http://localhost:8888", + } + ) + def test_force_database_direct_access(self): + InternalApiConfig.force_database_direct_access() + assert InternalApiConfig.get_use_internal_api() is False + class TestInternalApiCall: @staticmethod