[AIRFLOW-7003] Lazy load all plguins#7644
Conversation
|
Travis is green. |
ashb
left a comment
There was a problem hiding this comment.
I think we need to call integrate_dag_plugins() form somewhere in the task execution path too
|
|
||
| plugins = None # type: Optional[List[AirflowPlugin]] | ||
|
|
||
| norm_pattern = re.compile(r'[/|.]') |
There was a problem hiding this comment.
I don't think this is used anymore.
| hooks_modules = [] | ||
| executors_modules = [] | ||
| macros_modules = [] | ||
| def load_plugins(): |
There was a problem hiding this comment.
How about this as a name?
| def load_plugins(): | |
| def ensure_plugins_loaded(): |
It makes it clearer from the call-site that it will only load them once, not every time this fn is called
I called this method here: Both SchedulerJob and LocalTaskJob use this method. |
| hooks_modules = [] | ||
| executors_modules = [] | ||
| macros_modules = [] | ||
| def endure_plugins_loaded(): |
There was a problem hiding this comment.
| def endure_plugins_loaded(): | |
| def ensure_plugins_loaded(): |
|
|
||
| def integrate_dag_plugins() -> None: | ||
| """Integrates operator, sensor, hook, macro plugins.""" | ||
| endure_plugins_loaded() |
There was a problem hiding this comment.
| endure_plugins_loaded() | |
| ensure_plugins_loaded() |
| """Integrate operators plugins to the context""" | ||
| def integrate_executor_plugins() -> None: | ||
| """Integrate executor plugins to the context.""" | ||
| endure_plugins_loaded() |
There was a problem hiding this comment.
| endure_plugins_loaded() | |
| ensure_plugins_loaded() |
|
|
||
| def load_plugins_from_plugin_directory(): | ||
| """ | ||
| Load and register Airflow Plugin from plugin directory |
There was a problem hiding this comment.
| Load and register Airflow Plugin from plugin directory | |
| Load and register Airflow Plugins from plugins directory |
| """ | ||
| from airflow.plugins_manager import operator_extra_links | ||
| from airflow import plugins_manager | ||
| plugins_manager.endure_plugins_loaded() |
There was a problem hiding this comment.
| plugins_manager.endure_plugins_loaded() | |
| plugins_manager.ensure_plugins_loaded() |
| """ | ||
| from airflow.plugins_manager import registered_operator_link_classes | ||
| from airflow import plugins_manager | ||
| plugins_manager.endure_plugins_loaded() |
There was a problem hiding this comment.
| plugins_manager.endure_plugins_loaded() | |
| plugins_manager.ensure_plugins_loaded() |
| from airflow import plugins_manager | ||
|
|
||
| for v in flask_appbuilder_views: | ||
| plugins_manager.endure_plugins_loaded() |
There was a problem hiding this comment.
| plugins_manager.endure_plugins_loaded() | |
| plugins_manager.ensure_plugins_loaded() |
What about |
Codecov Report
@@ Coverage Diff @@
## master #7644 +/- ##
==========================================
- Coverage 86.83% 86.82% -0.02%
==========================================
Files 897 897
Lines 42805 42876 +71
==========================================
+ Hits 37170 37226 +56
- Misses 5635 5650 +15
Continue to review full report at Codecov.
|
| """Deserializes an operator from a JSON object. | ||
| """ | ||
| from airflow.plugins_manager import operator_extra_links | ||
| from airflow import plugins_manager |
There was a problem hiding this comment.
not related to your change, but i am just curious: do you know what's the reason to do a lazy import for plugins_manager here as well as in _deserialize_operator_extra_links?
There was a problem hiding this comment.
It's a security measure:
Since operator links can be dynamic (think a pre-signed S3 url that is only valid for 15mins), so we need to support inflating to "custom" classes, but we don't want to have to trust the serialized blob, so we only inflate classes are pre-registered.
This is a class of bugs called "Object Injection Attacks" -- if we trusted the input and de-serialized whatever class was here we might end up opening a reverse shell etc. https://blog.nelhage.com/2011/03/exploiting-pickle/ as an example. This defense is not perfect as the plugins are "under user control" but this is mostly looking forward to when we will have an API that accepts a serialized DAG blob to run.
Issue link: AIRFLOW-7003
Make sure to mark the boxes below before creating PR: [x]
[AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID** For document-only changes commit message can start with
[AIRFLOW-XXXX].In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.