diff --git a/UPDATING.md b/UPDATING.md index ead3aed8e2910..c5097aeb34521 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -1442,6 +1442,40 @@ Now the `dag_id` will not appear repeated in the payload, and the response forma } ``` +### Change in DagBag signature + +Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property +are deprecated and will be removed in future versions. + + +**Previous signature**: + +```python +DagBag( + dag_folder=None, + include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), + safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), + store_serialized_dags=False +): +``` + +**current**: +```python +DagBag( + dag_folder=None, + include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), + safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), + read_dags_from_db=False +): +``` + +If you were using positional arguments, it requires no change but if you were using keyword +arguments, please change `store_serialized_dags` to `read_dags_from_db`. + +Similarly, if you were using `DagBag().store_serialized_dags` property, change it to +`DagBag().read_dags_from_db`. + + ## Airflow 1.10.11 ### Use NULL as default value for dag.description diff --git a/airflow/api/common/experimental/__init__.py b/airflow/api/common/experimental/__init__.py index 92b1190588382..0906649b50697 100644 --- a/airflow/api/common/experimental/__init__.py +++ b/airflow/api/common/experimental/__init__.py @@ -32,7 +32,7 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel: dagbag = DagBag( dag_folder=dag_model.fileloc, - store_serialized_dags=conf.getboolean('core', 'store_serialized_dags') + read_dags_from_db=conf.getboolean('core', 'store_serialized_dags') ) dag = dagbag.get_dag(dag_id) # prefetch dag if it is stored serialized if dag_id not in dagbag.dags: diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index 05dadc7fc2e15..cdfb307bdd2c9 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -120,7 +120,7 @@ def read_store_serialized_dags(): return conf.getboolean('core', 'store_serialized_dags') dagbag = DagBag( dag_folder=dag_model.fileloc, - store_serialized_dags=read_store_serialized_dags() + read_dags_from_db=read_store_serialized_dags() ) dag_run = DagRun() triggers = _trigger_dag( diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index c624b2bd1ccd0..41d99e2106fca 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -23,6 +23,7 @@ import os import sys import textwrap +import warnings import zipfile from datetime import datetime, timedelta from typing import Dict, List, NamedTuple, Optional @@ -69,26 +70,35 @@ class DagBag(BaseDagBag, LoggingMixin): :param include_examples: whether to include the examples that ship with airflow or not :type include_examples: bool - :param store_serialized_dags: Read DAGs from DB if store_serialized_dags is ``True``. + :param read_dags_from_db: Read DAGs from DB if store_serialized_dags is ``True``. If ``False`` DAGs are read from python files. This property is not used when determining whether or not to write Serialized DAGs, that is done by checking the config ``store_serialized_dags``. - :type store_serialized_dags: bool + :type read_dags_from_db: bool """ DAGBAG_IMPORT_TIMEOUT = conf.getint('core', 'DAGBAG_IMPORT_TIMEOUT') SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler', 'scheduler_zombie_task_threshold') def __init__( - self, - dag_folder: Optional[str] = None, - include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), - safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), - store_serialized_dags: bool = False, + self, + dag_folder: Optional[str] = None, + include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), + safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), + read_dags_from_db: bool = False, + store_serialized_dags: Optional[bool] = None, ): # Avoid circular import from airflow.models.dag import DAG super().__init__() + + if store_serialized_dags: + warnings.warn( + "The store_serialized_dags parameter has been deprecated. " + "You should pass the read_dags_from_db parameter.", + DeprecationWarning, stacklevel=2) + read_dags_from_db = store_serialized_dags + dag_folder = dag_folder or settings.DAGS_FOLDER self.dag_folder = dag_folder self.dags: Dict[str, DAG] = {} @@ -96,7 +106,7 @@ def __init__( self.file_last_changed: Dict[str, datetime] = {} self.import_errors: Dict[str, str] = {} self.has_logged = False - self.store_serialized_dags = store_serialized_dags + self.read_dags_from_db = read_dags_from_db self.collect_dags( dag_folder=dag_folder, @@ -109,6 +119,15 @@ def size(self) -> int: """ return len(self.dags) + @property + def store_serialized_dags(self) -> bool: + """Whether or not to read dags from DB""" + warnings.warn( + "The store_serialized_dags property has been deprecated. " + "Use read_dags_from_db instead.", DeprecationWarning, stacklevel=2 + ) + return self.read_dags_from_db + @property def dag_ids(self) -> List[str]: return list(self.dags.keys()) @@ -123,8 +142,8 @@ def get_dag(self, dag_id): # Avoid circular import from airflow.models.dag import DagModel - # Only read DAGs from DB if this dagbag is store_serialized_dags. - if self.store_serialized_dags: + # Only read DAGs from DB if this dagbag is read_dags_from_db. + if self.read_dags_from_db: # Import here so that serialized dag is only imported when serialization is enabled from airflow.models.serialized_dag import SerializedDagModel if dag_id not in self.dags: @@ -363,7 +382,7 @@ def collect_dags( **Note**: The patterns in .airflowignore are treated as un-anchored regexes, not shell-like glob patterns. """ - if self.store_serialized_dags: + if self.read_dags_from_db: return self.log.info("Filling up the DagBag from %s", dag_folder) @@ -439,7 +458,7 @@ def sync_to_db(self): self.log.debug("Calling the DAG.bulk_sync_to_db method") DAG.bulk_sync_to_db(self.dags.values()) # Write Serialized DAGs to DB if DAG Serialization is turned on - # Even though self.store_serialized_dags is False + # Even though self.read_dags_from_db is False if settings.STORE_SERIALIZED_DAGS: self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method") SerializedDagModel.bulk_sync_to_db(self.dags.values()) diff --git a/airflow/www/extensions/init_dagbag.py b/airflow/www/extensions/init_dagbag.py index 89003a78fe4d7..aa00cbc828447 100644 --- a/airflow/www/extensions/init_dagbag.py +++ b/airflow/www/extensions/init_dagbag.py @@ -29,4 +29,4 @@ def init_dagbag(app): if os.environ.get('SKIP_DAGS_PARSING') == 'True': app.dag_bag = DagBag(os.devnull, include_examples=False) else: - app.dag_bag = DagBag(DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS) + app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=STORE_SERIALIZED_DAGS) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 1ba360fd6df4b..601c66728ddac 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -128,7 +128,7 @@ def test_should_response_200(self): def test_should_response_200_serialized(self): # Create empty app with empty dagbag to check if DAG is read from db app_serialized = app.create_app(testing=True) # type:ignore - dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True) + dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True) app_serialized.dag_bag = dag_bag # type:ignore client = app_serialized.test_client() diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py b/tests/api_connexion/endpoints/test_task_endpoint.py index 92d08efc0622e..829a1f3bb5ef8 100644 --- a/tests/api_connexion/endpoints/test_task_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_endpoint.py @@ -95,7 +95,7 @@ def test_should_response_200(self): def test_should_response_200_serialized(self): # Create empty app with empty dagbag to check if DAG is read from db app_serialized = app.create_app(testing=True) # type:ignore - dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True) + dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True) app_serialized.dag_bag = dag_bag # type:ignore client = app_serialized.test_client() diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index d9eae24687895..d8a86d00d24fe 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -128,9 +128,9 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True) def setUpClass(cls): # Ensure the DAGs we are looking at from the DB are up-to-date - non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) + non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) non_serialized_dagbag.sync_to_db() - cls.dagbag = DagBag(store_serialized_dags=True) + cls.dagbag = DagBag(read_dags_from_db=True) def test_dag_file_processor_sla_miss_callback(self): """ @@ -1373,9 +1373,9 @@ def setUp(self): @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True) def setUpClass(cls): # Ensure the DAGs we are looking at from the DB are up-to-date - non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) + non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) non_serialized_dagbag.sync_to_db() - cls.dagbag = DagBag(store_serialized_dags=True) + cls.dagbag = DagBag(read_dags_from_db=True) def test_is_alive(self): job = SchedulerJob(None, heartrate=10, state=State.RUNNING) diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 886119b4e4262..866f15535c4bb 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -642,7 +642,7 @@ def test_deactivate_unknown_dags(self): def test_serialized_dags_are_written_to_db_on_sync(self): """ Test that when dagbag.sync_to_db is called the DAGs are Serialized and written to DB - even when dagbag.store_serialized_dags is False + even when dagbag.read_dags_from_db is False """ with create_session() as session: serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() @@ -653,7 +653,7 @@ def test_serialized_dags_are_written_to_db_on_sync(self): include_examples=False) dagbag.sync_to_db() - self.assertFalse(dagbag.store_serialized_dags) + self.assertFalse(dagbag.read_dags_from_db) new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() self.assertEqual(new_serialized_dags_count, 1)