From 59a19cc71abb0ca6d10ef41cb124a893abb12bbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Sat, 7 Mar 2020 14:48:23 +0100 Subject: [PATCH 1/3] [AIRFLOW-7003] Lazy load all plguins --- airflow/__init__.py | 5 - airflow/models/dagbag.py | 2 + airflow/plugins_manager.py | 316 +++++++++++--------- airflow/serialization/serialized_objects.py | 14 +- airflow/www/app.py | 10 +- 5 files changed, 190 insertions(+), 157 deletions(-) diff --git a/airflow/__init__.py b/airflow/__init__.py index 3c26eb4fc6c55..5993084d5c8f0 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -40,13 +40,8 @@ settings.initialize() -from airflow.plugins_manager import integrate_plugins - login: Optional[Callable] = None -integrate_plugins() - - PY37 = sys.version_info >= (3, 7) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index a7f37184e2f6f..8f345f8147a66 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -33,6 +33,7 @@ from airflow.configuration import conf from airflow.dag.base_dag import BaseDagBag from airflow.exceptions import AirflowDagCycleException +from airflow.plugins_manager import integrate_dag_plugins from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.file import correct_maybe_zipped @@ -191,6 +192,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): """ from airflow.models.dag import DAG # Avoid circular import + integrate_dag_plugins() found_dags = [] # if the source file no longer exists in the DB or in the filesystem, diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 77f0ff37c5fde..d6247f27dc564 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -34,6 +34,32 @@ import_errors = {} +plugins = None # type: Optional[List[AirflowPlugin]] + +norm_pattern = re.compile(r'[/|.]') + +# Plugin components to integrate as modules +operators_modules = [] +sensors_modules = [] +hooks_modules = [] +executors_modules = [] +macros_modules = [] + +# Plugin components to integrate directly +admin_views: List[Any] = [] +flask_blueprints: List[Any] = [] +menu_links: List[Any] = [] +flask_appbuilder_views: List[Any] = [] +flask_appbuilder_menu_links: List[Any] = [] +global_operator_extra_links: List[Any] = [] +operator_extra_links: List[Any] = [] +registered_operator_link_classes: Dict[str, Type] = {} +"""Mapping of class names to class of OperatorLinks registered by plugins. + +Used by the DAG serialization code to only allow specific classes to be created +during deserialization +""" + class AirflowPluginException(Exception): """Exception when loading plugin.""" @@ -84,89 +110,82 @@ def on_load(cls, *args, **kwargs): """ -def load_entrypoint_plugins(entry_points, airflow_plugins): - """ - Load AirflowPlugin subclasses from the entrypoints - provided. The entry_point group should be 'airflow.plugins'. - - :param entry_points: A collection of entrypoints to search for plugins - :type entry_points: Generator[setuptools.EntryPoint, None, None] - :param airflow_plugins: A collection of existing airflow plugins to - ensure we don't load duplicates - :type airflow_plugins: list[type[airflow.plugins_manager.AirflowPlugin]] - :rtype: list[airflow.plugins_manager.AirflowPlugin] - """ - for entry_point in entry_points: - log.debug('Importing entry_point plugin %s', entry_point.name) - plugin_obj = entry_point.load() - if is_valid_plugin(plugin_obj, airflow_plugins): - if callable(getattr(plugin_obj, 'on_load', None)): - plugin_obj.on_load() - airflow_plugins.append(plugin_obj) - return airflow_plugins - - -def is_valid_plugin(plugin_obj, existing_plugins): +def is_valid_plugin(plugin_obj): """ Check whether a potential object is a subclass of the AirflowPlugin class. :param plugin_obj: potential subclass of AirflowPlugin - :param existing_plugins: Existing list of AirflowPlugin subclasses :return: Whether or not the obj is a valid subclass of AirflowPlugin """ + global plugins # pylint: disable=global-statement + if ( inspect.isclass(plugin_obj) and issubclass(plugin_obj, AirflowPlugin) and (plugin_obj is not AirflowPlugin) ): plugin_obj.validate() - return plugin_obj not in existing_plugins + return plugin_obj not in plugins return False -plugins = [] # type: List[AirflowPlugin] +def load_entrypoint_plugins(): + """ + Load and register plugins AirflowPlugin subclasses from the entrypoints. + The entry_point group should be 'airflow.plugins'. + """ + global plugins # pylint: disable=global-statement -norm_pattern = re.compile(r'[/|.]') + entry_points = pkg_resources.iter_entry_points('airflow.plugins') -if not settings.PLUGINS_FOLDER: - raise ValueError("Plugins folder is not set") - -# Crawl through the plugins folder to find AirflowPlugin derivatives -for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True): - for f in files: - filepath = os.path.join(root, f) - try: - if not os.path.isfile(filepath): - continue - mod_name, file_ext = os.path.splitext( - os.path.split(filepath)[-1]) - if file_ext != '.py': - continue - - log.debug('Importing plugin module %s', filepath) - # normalize root path as namespace - namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name]) - - loader = importlib.machinery.SourceFileLoader(mod_name, filepath) - spec = importlib.util.spec_from_loader(mod_name, loader) - m = importlib.util.module_from_spec(spec) - sys.modules[spec.name] = m - loader.exec_module(m) - for obj in list(m.__dict__.values()): - if is_valid_plugin(obj, plugins): - plugins.append(obj) - except Exception as e: # pylint: disable=broad-except - log.exception(e) - path = filepath or str(f) - log.error('Failed to import plugin %s', path) - import_errors[path] = str(e) - -plugins = load_entrypoint_plugins( - pkg_resources.iter_entry_points('airflow.plugins'), - plugins -) + log.debug("Loading plugins from entrypoints") + + for entry_point in entry_points: + log.debug('Importing entry_point plugin %s', entry_point.name) + plugin_obj = entry_point.load() + if is_valid_plugin(plugin_obj): + if callable(getattr(plugin_obj, 'on_load', None)): + plugin_obj.on_load() + plugins.append(plugin_obj) + + +def load_plugins_from_plugin_directory(): + """ + Load and register Airflow Plugin from plugin directory + """ + global import_errors # pylint: disable=global-statement + global plugins # pylint: disable=global-statement + log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) + + # Crawl through the plugins folder to find AirflowPlugin derivatives + for root, _, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True): # noqa # pylint: disable=too-many-nested-blocks + for f in files: + filepath = os.path.join(root, f) + try: + if not os.path.isfile(filepath): + continue + mod_name, file_ext = os.path.splitext( + os.path.split(filepath)[-1]) + if file_ext != '.py': + continue + + log.debug('Importing plugin module %s', filepath) + + loader = importlib.machinery.SourceFileLoader(mod_name, filepath) + spec = importlib.util.spec_from_loader(mod_name, loader) + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + loader.exec_module(mod) + for obj in list(mod.__dict__.values()): + if is_valid_plugin(obj): + plugins.append(obj) + except Exception as e: # pylint: disable=broad-except + log.exception(e) + path = filepath or str(f) + log.error('Failed to import plugin %s', path) + import_errors[path] = str(e) # pylint: disable=protected-access @@ -183,103 +202,116 @@ def make_module(name: str, objects: List[Any]): # pylint: enable=protected-access -# Plugin components to integrate as modules -operators_modules = [] -sensors_modules = [] -hooks_modules = [] -executors_modules = [] -macros_modules = [] +def load_plugins(): + """ + Load plugins from plugins directory and entrypoints. + Plugins are only loaded if they have not been previously loaded. + """ + global plugins # pylint: disable=global-statement + + if plugins is not None: + log.debug("Plugins are already loaded. Skipping.") + return + + if not settings.PLUGINS_FOLDER: + raise ValueError("Plugins folder is not set") + + log.debug("Loading plugins") + + plugins = [] + + load_plugins_from_plugin_directory() + load_entrypoint_plugins() + + initialize_plugins() + + +def initialize_plugins(): + """Creates modules for loaded extension from plugins""" + # pylint: disable=global-statement + global plugins + global operators_modules + global sensors_modules + global hooks_modules + global executors_modules + global macros_modules + + global admin_views + global flask_blueprints + global menu_links + global flask_appbuilder_views + global flask_appbuilder_menu_links + global global_operator_extra_links + global operator_extra_links + global registered_operator_link_classes + # pylint: enable=global-statement + + log.debug("Initialize plugin modules") + + for plugin in plugins: + plugin_name: str = plugin.name + operators_modules.append( + make_module('airflow.operators.' + plugin_name, plugin.operators + plugin.sensors)) + sensors_modules.append( + make_module('airflow.sensors.' + plugin_name, plugin.sensors) + ) + hooks_modules.append(make_module('airflow.hooks.' + plugin_name, plugin.hooks)) + executors_modules.append( + make_module('airflow.executors.' + plugin_name, plugin.executors)) + macros_modules.append(make_module('airflow.macros.' + plugin_name, plugin.macros)) + + admin_views.extend(plugin.admin_views) + menu_links.extend(plugin.menu_links) + flask_appbuilder_views.extend(plugin.appbuilder_views) + flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items) + flask_blueprints.extend([{ + 'name': plugin.name, + 'blueprint': bp + } for bp in plugin.flask_blueprints]) + global_operator_extra_links.extend(plugin.global_operator_extra_links) + operator_extra_links.extend(list(plugin.operator_extra_links)) + + registered_operator_link_classes.update({ + "{}.{}".format(link.__class__.__module__, + link.__class__.__name__): link.__class__ + for link in plugin.operator_extra_links + }) -# Plugin components to integrate directly -admin_views: List[Any] = [] -flask_blueprints: List[Any] = [] -menu_links: List[Any] = [] -flask_appbuilder_views: List[Any] = [] -flask_appbuilder_menu_links: List[Any] = [] -global_operator_extra_links: List[Any] = [] -operator_extra_links: List[Any] = [] -registered_operator_link_classes: Dict[str, Type] = {} -"""Mapping of class names to class of OperatorLinks registered by plugins. -Used by the DAG serialization code to only allow specific classes to be created -during deserialization -""" +def integrate_executor_plugins() -> None: + """Integrate executor plugins to the context.""" + load_plugins() + + log.debug("Integrate executor plugins") + + for executors_module in executors_modules: + sys.modules[executors_module.__name__] = executors_module + # noinspection PyProtectedMember + globals()[executors_module._name] = executors_module # pylint: disable=protected-access + + +def integrate_dag_plugins() -> None: + """Integrates operator, sensor, hook, macro plugins.""" + load_plugins() + + log.debug("Integrate DAG plugins.") -for p in plugins: - if not p.name: - raise AirflowPluginException("Plugin name is missing.") - plugin_name: str = p.name - operators_modules.append( - make_module('airflow.operators.' + plugin_name, p.operators + p.sensors)) - sensors_modules.append( - make_module('airflow.sensors.' + plugin_name, p.sensors) - ) - hooks_modules.append(make_module('airflow.hooks.' + plugin_name, p.hooks)) - executors_modules.append( - make_module('airflow.executors.' + plugin_name, p.executors)) - macros_modules.append(make_module('airflow.macros.' + plugin_name, p.macros)) - - admin_views.extend(p.admin_views) - menu_links.extend(p.menu_links) - flask_appbuilder_views.extend(p.appbuilder_views) - flask_appbuilder_menu_links.extend(p.appbuilder_menu_items) - flask_blueprints.extend([{ - 'name': p.name, - 'blueprint': bp - } for bp in p.flask_blueprints]) - global_operator_extra_links.extend(p.global_operator_extra_links) - operator_extra_links.extend(list(p.operator_extra_links)) - - registered_operator_link_classes.update({ - "{}.{}".format(link.__class__.__module__, - link.__class__.__name__): link.__class__ - for link in p.operator_extra_links - }) - - -def integrate_operator_plugins() -> None: - """Integrate operators plugins to the context""" for operators_module in operators_modules: sys.modules[operators_module.__name__] = operators_module # noinspection PyProtectedMember globals()[operators_module._name] = operators_module # pylint: disable=protected-access - -def integrate_sensor_plugins() -> None: - """Integrate sensor plugins to the context""" for sensors_module in sensors_modules: sys.modules[sensors_module.__name__] = sensors_module # noinspection PyProtectedMember globals()[sensors_module._name] = sensors_module # pylint: disable=protected-access - -def integrate_hook_plugins() -> None: - """Integrate hook plugins to the context""" for hooks_module in hooks_modules: sys.modules[hooks_module.__name__] = hooks_module # noinspection PyProtectedMember globals()[hooks_module._name] = hooks_module # pylint: disable=protected-access - -def integrate_executor_plugins() -> None: - """Integrate executor plugins to the context.""" - for executors_module in executors_modules: - sys.modules[executors_module.__name__] = executors_module - # noinspection PyProtectedMember - globals()[executors_module._name] = executors_module # pylint: disable=protected-access - - -def integrate_macro_plugins() -> None: - """Integrate macro plugins to the context""" for macros_module in macros_modules: sys.modules[macros_module.__name__] = macros_module # noinspection PyProtectedMember globals()[macros_module._name] = macros_module # pylint: disable=protected-access - - -def integrate_plugins() -> None: - """Integrates all types of plugins.""" - integrate_operator_plugins() - integrate_sensor_plugins() - integrate_hook_plugins() - integrate_macro_plugins() diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index b69dd2de5b021..9fb498bd26690 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -326,14 +326,15 @@ def serialize_operator(cls, op: BaseOperator) -> dict: def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: """Deserializes an operator from a JSON object. """ - from airflow.plugins_manager import operator_extra_links + from airflow import plugins_manager + plugins_manager.load_plugins() op = SerializedBaseOperator(task_id=encoded_op['task_id']) # Extra Operator Links defined in Plugins op_extra_links_from_plugin = {} - for ope in operator_extra_links: + for ope in plugins_manager.operator_extra_links: for operator in ope.operators: if operator.__name__ == encoded_op["_task_type"] and \ operator.__module__ == encoded_op["_task_module"]: @@ -397,7 +398,8 @@ def _deserialize_operator_extra_links( :param encoded_op_links: Serialized Operator Link :return: De-Serialized Operator Link """ - from airflow.plugins_manager import registered_operator_link_classes + from airflow import plugins_manager + plugins_manager.load_plugins() op_predefined_extra_links = {} @@ -433,8 +435,10 @@ def _deserialize_operator_extra_links( _operator_link_class_path, data = list(_operator_links_source.items())[0] if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS: single_op_link_class = import_string(_operator_link_class_path) - elif _operator_link_class_path in registered_operator_link_classes: - single_op_link_class = registered_operator_link_classes[_operator_link_class_path] + elif _operator_link_class_path in plugins_manager.registered_operator_link_classes: + single_op_link_class = plugins_manager.registered_operator_link_classes[ + _operator_link_class_path + ] else: raise KeyError("Operator Link class %r not registered" % _operator_link_class_path) diff --git a/airflow/www/app.py b/airflow/www/app.py index 944cb130c270e..7e7496e117d05 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -173,16 +173,16 @@ def init_views(appbuilder): def integrate_plugins(): """Integrate plugins to the context""" - from airflow.plugins_manager import ( - flask_appbuilder_views, flask_appbuilder_menu_links - ) + from airflow import plugins_manager - for v in flask_appbuilder_views: + plugins_manager.load_plugins() + + for v in plugins_manager.flask_appbuilder_views: log.debug("Adding view %s", v["name"]) appbuilder.add_view(v["view"], v["name"], category=v["category"]) - for ml in sorted(flask_appbuilder_menu_links, key=lambda x: x["name"]): + for ml in sorted(plugins_manager.flask_appbuilder_menu_links, key=lambda x: x["name"]): log.debug("Adding menu link %s", ml["name"]) appbuilder.add_link(ml["name"], href=ml["href"], From 205a856667e797306b7c29f9410215bb24864706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Mon, 9 Mar 2020 16:28:18 +0100 Subject: [PATCH 2/3] fixup! [AIRFLOW-7003] Lazy load all plguins --- airflow/plugins_manager.py | 10 ++++------ airflow/serialization/serialized_objects.py | 4 ++-- airflow/www/app.py | 2 +- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index d6247f27dc564..64afe81c5aeb4 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -21,7 +21,6 @@ import inspect import logging import os -import re import sys import types from typing import Any, Dict, List, Optional, Type @@ -36,8 +35,6 @@ plugins = None # type: Optional[List[AirflowPlugin]] -norm_pattern = re.compile(r'[/|.]') - # Plugin components to integrate as modules operators_modules = [] sensors_modules = [] @@ -202,9 +199,10 @@ def make_module(name: str, objects: List[Any]): # pylint: enable=protected-access -def load_plugins(): +def endure_plugins_loaded(): """ Load plugins from plugins directory and entrypoints. + Plugins are only loaded if they have not been previously loaded. """ global plugins # pylint: disable=global-statement @@ -280,7 +278,7 @@ def initialize_plugins(): def integrate_executor_plugins() -> None: """Integrate executor plugins to the context.""" - load_plugins() + endure_plugins_loaded() log.debug("Integrate executor plugins") @@ -292,7 +290,7 @@ def integrate_executor_plugins() -> None: def integrate_dag_plugins() -> None: """Integrates operator, sensor, hook, macro plugins.""" - load_plugins() + endure_plugins_loaded() log.debug("Integrate DAG plugins.") diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 9fb498bd26690..b41faa6e67384 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -327,7 +327,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: """Deserializes an operator from a JSON object. """ from airflow import plugins_manager - plugins_manager.load_plugins() + plugins_manager.endure_plugins_loaded() op = SerializedBaseOperator(task_id=encoded_op['task_id']) @@ -399,7 +399,7 @@ def _deserialize_operator_extra_links( :return: De-Serialized Operator Link """ from airflow import plugins_manager - plugins_manager.load_plugins() + plugins_manager.endure_plugins_loaded() op_predefined_extra_links = {} diff --git a/airflow/www/app.py b/airflow/www/app.py index 7e7496e117d05..e5e4773006766 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -175,7 +175,7 @@ def integrate_plugins(): """Integrate plugins to the context""" from airflow import plugins_manager - plugins_manager.load_plugins() + plugins_manager.endure_plugins_loaded() for v in plugins_manager.flask_appbuilder_views: log.debug("Adding view %s", v["name"]) From f44a0d164044d69dadf6c6b7726016db29d92a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Mon, 9 Mar 2020 19:32:16 +0100 Subject: [PATCH 3/3] fixup! fixup! [AIRFLOW-7003] Lazy load all plguins --- airflow/plugins_manager.py | 8 ++++---- airflow/serialization/serialized_objects.py | 4 ++-- airflow/www/app.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 64afe81c5aeb4..1422d65b4a069 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -150,7 +150,7 @@ def load_entrypoint_plugins(): def load_plugins_from_plugin_directory(): """ - Load and register Airflow Plugin from plugin directory + Load and register Airflow Plugins from plugins directory. """ global import_errors # pylint: disable=global-statement global plugins # pylint: disable=global-statement @@ -199,7 +199,7 @@ def make_module(name: str, objects: List[Any]): # pylint: enable=protected-access -def endure_plugins_loaded(): +def ensure_plugins_loaded(): """ Load plugins from plugins directory and entrypoints. @@ -278,7 +278,7 @@ def initialize_plugins(): def integrate_executor_plugins() -> None: """Integrate executor plugins to the context.""" - endure_plugins_loaded() + ensure_plugins_loaded() log.debug("Integrate executor plugins") @@ -290,7 +290,7 @@ def integrate_executor_plugins() -> None: def integrate_dag_plugins() -> None: """Integrates operator, sensor, hook, macro plugins.""" - endure_plugins_loaded() + ensure_plugins_loaded() log.debug("Integrate DAG plugins.") diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index b41faa6e67384..d8b1eca522fa0 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -327,7 +327,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: """Deserializes an operator from a JSON object. """ from airflow import plugins_manager - plugins_manager.endure_plugins_loaded() + plugins_manager.ensure_plugins_loaded() op = SerializedBaseOperator(task_id=encoded_op['task_id']) @@ -399,7 +399,7 @@ def _deserialize_operator_extra_links( :return: De-Serialized Operator Link """ from airflow import plugins_manager - plugins_manager.endure_plugins_loaded() + plugins_manager.ensure_plugins_loaded() op_predefined_extra_links = {} diff --git a/airflow/www/app.py b/airflow/www/app.py index e5e4773006766..400c143d584c8 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -175,7 +175,7 @@ def integrate_plugins(): """Integrate plugins to the context""" from airflow import plugins_manager - plugins_manager.endure_plugins_loaded() + plugins_manager.ensure_plugins_loaded() for v in plugins_manager.flask_appbuilder_views: log.debug("Adding view %s", v["name"])