Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class InternalApiConfig:
_use_internal_api = False
_internal_api_endpoint = ""

@staticmethod
def force_database_direct_access():
"""Current component will not use Internal API.

All methods decorated with internal_api_call will always be executed locally.
This mode is needed for "trusted" components like Scheduler, Webserver or Internal Api server.
"""
InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = False

@staticmethod
def get_use_internal_api():
if not InternalApiConfig._initialized:
Expand Down
3 changes: 3 additions & 0 deletions airflow/cli/commands/internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from sqlalchemy.engine.url import make_url

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.cli.commands.webserver_command import GunicornMonitor
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
Expand Down Expand Up @@ -225,6 +226,8 @@ def create_app(config=None, testing=False):
if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args()

InternalApiConfig.force_database_direct_access()

csrf = CSRFProtect()
csrf.init_app(flask_app)

Expand Down
3 changes: 3 additions & 0 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from daemon.pidfile import TimeoutPIDLockFile

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.scheduler_job import SchedulerJob
Expand All @@ -34,6 +35,8 @@


def _run_scheduler_job(args):
InternalApiConfig.force_database_direct_access()

job = SchedulerJob(
subdir=process_subdir(args.subdir),
num_runs=args.num_runs,
Expand Down
3 changes: 3 additions & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlalchemy.engine.url import make_url

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.logging_config import configure_logging
Expand Down Expand Up @@ -117,6 +118,8 @@ def create_app(config=None, testing=False):
flask_app.json_provider_class = AirflowJsonProvider
flask_app.json = AirflowJsonProvider(flask_app)

InternalApiConfig.force_database_direct_access()

csrf.init_app(flask_app)

init_wsgi_middleware(flask_app)
Expand Down
10 changes: 10 additions & 0 deletions tests/api_internal/test_internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ def test_get_use_internal_api_enabled(self):
assert InternalApiConfig.get_use_internal_api() is True
assert InternalApiConfig.get_internal_api_endpoint() == "http://localhost:8888/internal_api/v1/rpcapi"

@conf_vars(
{
("core", "database_access_isolation"): "true",
("core", "internal_api_url"): "http://localhost:8888",
}
)
def test_force_database_direct_access(self):
InternalApiConfig.force_database_direct_access()
assert InternalApiConfig.get_use_internal_api() is False


class TestInternalApiCall:
@staticmethod
Expand Down