diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 182bde36b71a5..e329c66d0cf3e 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2146,11 +2146,6 @@ components: type: string type: array title: Hooks - executors: - items: - type: string - type: array - title: Executors macros: items: type: string @@ -2208,7 +2203,6 @@ components: required: - name - hooks - - executors - macros - flask_blueprints - fastapi_apps diff --git a/airflow/api_fastapi/core_api/serializers/plugins.py b/airflow/api_fastapi/core_api/serializers/plugins.py index ee6812bb954c9..68bc8ea443c25 100644 --- a/airflow/api_fastapi/core_api/serializers/plugins.py +++ b/airflow/api_fastapi/core_api/serializers/plugins.py @@ -65,7 +65,6 @@ class PluginResponse(BaseModel): name: str hooks: list[str] - executors: list[str] macros: list[str] flask_blueprints: list[str] fastapi_apps: list[FastAPIAppResponse] diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index 4a940793df27f..f74153f95fc94 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -21,7 +21,6 @@ import functools import logging import os -from contextlib import suppress from typing import TYPE_CHECKING from airflow.api_internal.internal_api_call import InternalApiConfig @@ -284,17 +283,6 @@ def _import_and_validate(path: str) -> type[BaseExecutor]: cls.validate_database_executor_compatibility(executor) return executor - if executor_name.connector_source == ConnectorSource.PLUGIN: - with suppress(ImportError, AttributeError): - # Load plugins here for executors as at that time the plugins might not have been - # initialized yet - from airflow import plugins_manager - - plugins_manager.integrate_executor_plugins() - return ( - _import_and_validate(f"airflow.executors.{executor_name.module_path}"), - ConnectorSource.PLUGIN, - ) return _import_and_validate(executor_name.module_path), executor_name.connector_source @classmethod diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 2ec1388d16361..fc7adc5993f64 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -64,7 +64,6 @@ # Plugin components to integrate as modules registered_hooks: list[BaseHook] | None = None macros_modules: list[Any] | None = None -executors_modules: list[Any] | None = None # Plugin components to integrate directly admin_views: list[Any] | None = None @@ -88,7 +87,6 @@ """ PLUGINS_ATTRIBUTES_TO_DUMP = { "hooks", - "executors", "macros", "admin_views", "flask_blueprints", @@ -154,7 +152,6 @@ class AirflowPlugin: name: str | None = None source: AirflowPluginSource | None = None hooks: list[Any] = [] - executors: list[Any] = [] macros: list[Any] = [] admin_views: list[Any] = [] flask_blueprints: list[Any] = [] @@ -533,33 +530,6 @@ def initialize_hook_lineage_readers_plugins(): hook_lineage_reader_classes.extend(plugin.hook_lineage_readers) -def integrate_executor_plugins() -> None: - """Integrate executor plugins to the context.""" - global plugins - global executors_modules - - if executors_modules is not None: - return - - ensure_plugins_loaded() - - if plugins is None: - raise AirflowPluginException("Can't load plugins.") - - log.debug("Integrate executor plugins") - - executors_modules = [] - for plugin in plugins: - if plugin.name is None: - raise AirflowPluginException("Invalid plugin name") - plugin_name: str = plugin.name - - executors_module = make_module("airflow.executors." + plugin_name, plugin.executors) - if executors_module: - executors_modules.append(executors_module) - sys.modules[executors_module.__name__] = executors_module - - def integrate_macros_plugins() -> None: """Integrates macro plugins.""" global plugins @@ -615,7 +585,6 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str :param attrs_to_dump: A list of plugin attributes to dump """ ensure_plugins_loaded() - integrate_executor_plugins() integrate_macros_plugins() initialize_web_ui_plugins() initialize_fastapi_plugins() @@ -629,7 +598,7 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str for attr in attrs_to_dump: if attr in ("global_operator_extra_links", "operator_extra_links"): info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)] - elif attr in ("macros", "timetables", "hooks", "executors", "priority_weight_strategies"): + elif attr in ("macros", "timetables", "hooks", "priority_weight_strategies"): info[attr] = [qualname(d) for d in getattr(plugin, attr)] elif attr == "listeners": # listeners may be modules or class instances diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index f5ca444b535b2..4658779b0ffbb 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1333,13 +1333,6 @@ export const $PluginResponse = { type: "array", title: "Hooks", }, - executors: { - items: { - type: "string", - }, - type: "array", - title: "Executors", - }, macros: { items: { type: "string", @@ -1419,7 +1412,6 @@ export const $PluginResponse = { required: [ "name", "hooks", - "executors", "macros", "flask_blueprints", "fastapi_apps", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1e22e3937f6b2..e9523b195f1b0 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -311,7 +311,6 @@ export type PluginCollectionResponse = { export type PluginResponse = { name: string; hooks: Array; - executors: Array; macros: Array; flask_blueprints: Array; fastapi_apps: Array; diff --git a/airflow/www/views.py b/airflow/www/views.py index c5bbdd389b093..c153cc80597f5 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -4286,7 +4286,6 @@ class PluginView(AirflowBaseView): def list(self): """List loaded plugins.""" plugins_manager.ensure_plugins_loaded() - plugins_manager.integrate_executor_plugins() plugins_manager.initialize_extra_operators_links_plugins() plugins_manager.initialize_web_ui_plugins() plugins_manager.initialize_fastapi_plugins() diff --git a/newsfragments/43289.significant.rst b/newsfragments/43289.significant.rst new file mode 100644 index 0000000000000..15063202640fb --- /dev/null +++ b/newsfragments/43289.significant.rst @@ -0,0 +1,4 @@ +Support for adding executors via Airflow Plugins is removed + +Executors should no longer be registered or imported via Airflow's plugin mechanism -- these types of classes +are just treated as plain Python classes by Airflow, so there is no need to register them with Airflow. diff --git a/tests/api_connexion/endpoints/test_plugin_endpoint.py b/tests/api_connexion/endpoints/test_plugin_endpoint.py index 924de84dc0d48..487ba53a30080 100644 --- a/tests/api_connexion/endpoints/test_plugin_endpoint.py +++ b/tests/api_connexion/endpoints/test_plugin_endpoint.py @@ -145,7 +145,6 @@ def test_get_plugins_return_200(self): { "appbuilder_menu_items": [appbuilder_menu_items], "appbuilder_views": [{"view": qualname(MockView)}], - "executors": [], "flask_blueprints": [ f"<{qualname(bp.__class__)}: name={bp.name!r} import_name={bp.import_name!r}>" ], diff --git a/tests/api_connexion/schemas/test_plugin_schema.py b/tests/api_connexion/schemas/test_plugin_schema.py index 0c7141e3493f8..951933e9ffc44 100644 --- a/tests/api_connexion/schemas/test_plugin_schema.py +++ b/tests/api_connexion/schemas/test_plugin_schema.py @@ -86,7 +86,6 @@ def test_serialize(self): assert deserialized_plugin == { "appbuilder_menu_items": [appbuilder_menu_items], "appbuilder_views": [{"view": self.mock_plugin.appbuilder_views[0]["view"]}], - "executors": [], "flask_blueprints": [str(bp)], "fastapi_apps": [ {"app": app, "name": "App name", "url_prefix": "/some_prefix"}, @@ -113,7 +112,6 @@ def test_serialize(self): { "appbuilder_menu_items": [appbuilder_menu_items], "appbuilder_views": [{"view": self.mock_plugin.appbuilder_views[0]["view"]}], - "executors": [], "flask_blueprints": [str(bp)], "fastapi_apps": [ {"app": app, "name": "App name", "url_prefix": "/some_prefix"}, @@ -131,7 +129,6 @@ def test_serialize(self): { "appbuilder_menu_items": [appbuilder_menu_items], "appbuilder_views": [{"view": self.mock_plugin.appbuilder_views[0]["view"]}], - "executors": [], "flask_blueprints": [str(bp)], "fastapi_apps": [ {"app": app, "name": "App name", "url_prefix": "/some_prefix"}, diff --git a/tests/cli/commands/test_plugins_command.py b/tests/cli/commands/test_plugins_command.py index d07641ec841d1..c9807520e4ed3 100644 --- a/tests/cli/commands/test_plugins_command.py +++ b/tests/cli/commands/test_plugins_command.py @@ -69,7 +69,6 @@ def test_should_display_one_plugin(self): "admin_views": [], "macros": ["tests.plugins.test_plugin.plugin_macro"], "menu_links": [], - "executors": ["tests.plugins.test_plugin.PluginExecutor"], "flask_blueprints": [ "" ], diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py index 40a336bc580c3..68bc02a6300e4 100644 --- a/tests/executors/test_executor_loader.py +++ b/tests/executors/test_executor_loader.py @@ -22,7 +22,6 @@ import pytest -from airflow import plugins_manager from airflow.exceptions import AirflowConfigException from airflow.executors import executor_loader from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, ExecutorName @@ -34,9 +33,6 @@ pytestmark = pytest.mark.skip_if_database_isolation_mode -# Plugin Manager creates new modules, which is difficult to mock, so we use test isolation by a unique name. -TEST_PLUGIN_NAME = "unique_plugin_name_to_avoid_collision_i_love_kitties" - class FakeExecutor: is_single_threaded = False @@ -46,11 +42,6 @@ class FakeSingleThreadedExecutor: is_single_threaded = True -class FakePlugin(plugins_manager.AirflowPlugin): - name = TEST_PLUGIN_NAME - executors = [FakeExecutor] - - class TestExecutorLoader: def setup_method(self) -> None: from airflow.executors import executor_loader @@ -89,17 +80,6 @@ def test_should_support_executor_from_core(self, executor_name): assert executor.name == ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name) assert executor.name.connector_source == ConnectorSource.CORE - @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()]) - @mock.patch("airflow.plugins_manager.executors_modules", None) - def test_should_support_plugins(self): - with conf_vars({("core", "executor"): f"{TEST_PLUGIN_NAME}.FakeExecutor"}): - executor = ExecutorLoader.get_default_executor() - assert executor is not None - assert "FakeExecutor" == executor.__class__.__name__ - assert executor.name is not None - assert executor.name == ExecutorName(f"{TEST_PLUGIN_NAME}.FakeExecutor") - assert executor.name.connector_source == ConnectorSource.PLUGIN - def test_should_support_custom_path(self): with conf_vars({("core", "executor"): "tests.executors.test_executor_loader.FakeExecutor"}): executor = ExecutorLoader.get_default_executor() @@ -124,7 +104,7 @@ def test_should_support_custom_path(self): ), # Core executors and custom module path executor and plugin ( - f"CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor, {TEST_PLUGIN_NAME}.FakeExecutor", + "CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor", [ ExecutorName( "airflow.providers.celery.executors.celery_executor.CeleryExecutor", @@ -138,17 +118,12 @@ def test_should_support_custom_path(self): "tests.executors.test_executor_loader.FakeExecutor", None, ), - ExecutorName( - f"{TEST_PLUGIN_NAME}.FakeExecutor", - None, - ), ], ), # Core executors and custom module path executor and plugin with aliases ( ( - "CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor, " - f"plugin_exec:{TEST_PLUGIN_NAME}.FakeExecutor" + "CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor" ), [ ExecutorName( @@ -163,10 +138,6 @@ def test_should_support_custom_path(self): "tests.executors.test_executor_loader.FakeExecutor", "fake_exec", ), - ExecutorName( - f"{TEST_PLUGIN_NAME}.FakeExecutor", - "plugin_exec", - ), ], ), ], @@ -194,8 +165,6 @@ def test_init_executors(self): "CeleryExecutor, my.module.path, my.module.path", "CeleryExecutor, my_alias:my.module.path, my.module.path", "CeleryExecutor, my_alias:my.module.path, other_alias:my.module.path", - f"CeleryExecutor, {TEST_PLUGIN_NAME}.FakeExecutor, {TEST_PLUGIN_NAME}.FakeExecutor", - f"my_alias:{TEST_PLUGIN_NAME}.FakeExecutor, other_alias:{TEST_PLUGIN_NAME}.FakeExecutor", ], ) def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_config): @@ -239,21 +208,6 @@ def test_should_support_import_executor_from_core(self, executor_config, expecte assert expected_value == executor.__name__ assert import_source == ConnectorSource.CORE - @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()]) - @mock.patch("airflow.plugins_manager.executors_modules", None) - @pytest.mark.parametrize( - ("executor_config"), - [ - (f"{TEST_PLUGIN_NAME}.FakeExecutor"), - (f"my_cool_alias:{TEST_PLUGIN_NAME}.FakeExecutor, CeleryExecutor"), - ], - ) - def test_should_support_import_plugins(self, executor_config): - with conf_vars({("core", "executor"): executor_config}): - executor, import_source = ExecutorLoader.import_default_executor_cls() - assert "FakeExecutor" == executor.__name__ - assert import_source == ConnectorSource.PLUGIN - @pytest.mark.parametrize( "executor_config", [ diff --git a/tests/plugins/test_plugin.py b/tests/plugins/test_plugin.py index 01b18c48a63fa..98f64e75456f5 100644 --- a/tests/plugins/test_plugin.py +++ b/tests/plugins/test_plugin.py @@ -21,8 +21,6 @@ from flask import Blueprint from flask_appbuilder import BaseView as AppBuilderBaseView, expose -from airflow.executors.base_executor import BaseExecutor - # Importing base classes that we need to derive from airflow.hooks.base import BaseHook @@ -49,11 +47,6 @@ class PluginHook(BaseHook): pass -# Will show up under airflow.executors.test_plugin.PluginExecutor -class PluginExecutor(BaseExecutor): - pass - - # Will show up under airflow.macros.test_plugin.plugin_macro def plugin_macro(): pass @@ -123,7 +116,6 @@ def get_weight(self, ti): class AirflowTestPlugin(AirflowPlugin): name = "test_plugin" hooks = [PluginHook] - executors = [PluginExecutor] macros = [plugin_macro] flask_blueprints = [bp] fastapi_apps = [app_with_metadata] diff --git a/tests_common/test_utils/mock_plugins.py b/tests_common/test_utils/mock_plugins.py index 3e50f1b413ebc..875a9abbd3a0f 100644 --- a/tests_common/test_utils/mock_plugins.py +++ b/tests_common/test_utils/mock_plugins.py @@ -25,7 +25,6 @@ "plugins", "registered_hooks", "macros_modules", - "executors_modules", "admin_views", "flask_blueprints", "fastapi_apps", @@ -44,7 +43,6 @@ "plugins", "registered_hooks", "macros_modules", - "executors_modules", "admin_views", "flask_blueprints", "menu_links",