Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,40 @@ Now the `dag_id` will not appear repeated in the payload, and the response forma
}
```

### Change in DagBag signature

Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property
are deprecated and will be removed in future versions.


**Previous signature**:

```python
DagBag(
dag_folder=None,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
store_serialized_dags=False
):
```

**current**:
```python
DagBag(
dag_folder=None,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
read_dags_from_db=False
):
```

If you were using positional arguments, it requires no change but if you were using keyword
arguments, please change `store_serialized_dags` to `read_dags_from_db`.

Similarly, if you were using `DagBag().store_serialized_dags` property, change it to
`DagBag().read_dags_from_db`.


## Airflow 1.10.11

### Use NULL as default value for dag.description
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:

dagbag = DagBag(
dag_folder=dag_model.fileloc,
store_serialized_dags=conf.getboolean('core', 'store_serialized_dags')
read_dags_from_db=conf.getboolean('core', 'store_serialized_dags')
)
dag = dagbag.get_dag(dag_id) # prefetch dag if it is stored serialized
if dag_id not in dagbag.dags:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def read_store_serialized_dags():
return conf.getboolean('core', 'store_serialized_dags')
dagbag = DagBag(
dag_folder=dag_model.fileloc,
store_serialized_dags=read_store_serialized_dags()
read_dags_from_db=read_store_serialized_dags()
)
dag_run = DagRun()
triggers = _trigger_dag(
Expand Down
43 changes: 31 additions & 12 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import sys
import textwrap
import warnings
import zipfile
from datetime import datetime, timedelta
from typing import Dict, List, NamedTuple, Optional
Expand Down Expand Up @@ -69,34 +70,43 @@ class DagBag(BaseDagBag, LoggingMixin):
:param include_examples: whether to include the examples that ship
with airflow or not
:type include_examples: bool
:param store_serialized_dags: Read DAGs from DB if store_serialized_dags is ``True``.
:param read_dags_from_db: Read DAGs from DB if store_serialized_dags is ``True``.
If ``False`` DAGs are read from python files. This property is not used when
determining whether or not to write Serialized DAGs, that is done by checking
the config ``store_serialized_dags``.
:type store_serialized_dags: bool
:type read_dags_from_db: bool
"""

DAGBAG_IMPORT_TIMEOUT = conf.getint('core', 'DAGBAG_IMPORT_TIMEOUT')
SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler', 'scheduler_zombie_task_threshold')

def __init__(
self,
dag_folder: Optional[str] = None,
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
store_serialized_dags: bool = False,
self,
dag_folder: Optional[str] = None,
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
read_dags_from_db: bool = False,
store_serialized_dags: Optional[bool] = None,
):
# Avoid circular import
from airflow.models.dag import DAG
super().__init__()

if store_serialized_dags:
warnings.warn(
"The store_serialized_dags parameter has been deprecated. "
"You should pass the read_dags_from_db parameter.",
DeprecationWarning, stacklevel=2)
read_dags_from_db = store_serialized_dags

dag_folder = dag_folder or settings.DAGS_FOLDER
self.dag_folder = dag_folder
self.dags: Dict[str, DAG] = {}
# the file's last modified timestamp when we last read it
self.file_last_changed: Dict[str, datetime] = {}
self.import_errors: Dict[str, str] = {}
self.has_logged = False
self.store_serialized_dags = store_serialized_dags
self.read_dags_from_db = read_dags_from_db

self.collect_dags(
dag_folder=dag_folder,
Expand All @@ -109,6 +119,15 @@ def size(self) -> int:
"""
return len(self.dags)

@property
def store_serialized_dags(self) -> bool:
"""Whether or not to read dags from DB"""
warnings.warn(
"The store_serialized_dags property has been deprecated. "
"Use read_dags_from_db instead.", DeprecationWarning, stacklevel=2
)
return self.read_dags_from_db

@property
def dag_ids(self) -> List[str]:
return list(self.dags.keys())
Expand All @@ -123,8 +142,8 @@ def get_dag(self, dag_id):
# Avoid circular import
from airflow.models.dag import DagModel

# Only read DAGs from DB if this dagbag is store_serialized_dags.
if self.store_serialized_dags:
# Only read DAGs from DB if this dagbag is read_dags_from_db.
if self.read_dags_from_db:
# Import here so that serialized dag is only imported when serialization is enabled
from airflow.models.serialized_dag import SerializedDagModel
if dag_id not in self.dags:
Expand Down Expand Up @@ -363,7 +382,7 @@ def collect_dags(
**Note**: The patterns in .airflowignore are treated as
un-anchored regexes, not shell-like glob patterns.
"""
if self.store_serialized_dags:
if self.read_dags_from_db:
return

self.log.info("Filling up the DagBag from %s", dag_folder)
Expand Down Expand Up @@ -439,7 +458,7 @@ def sync_to_db(self):
self.log.debug("Calling the DAG.bulk_sync_to_db method")
DAG.bulk_sync_to_db(self.dags.values())
# Write Serialized DAGs to DB if DAG Serialization is turned on
# Even though self.store_serialized_dags is False
# Even though self.read_dags_from_db is False
if settings.STORE_SERIALIZED_DAGS:
self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method")
SerializedDagModel.bulk_sync_to_db(self.dags.values())
2 changes: 1 addition & 1 deletion airflow/www/extensions/init_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ def init_dagbag(app):
if os.environ.get('SKIP_DAGS_PARSING') == 'True':
app.dag_bag = DagBag(os.devnull, include_examples=False)
else:
app.dag_bag = DagBag(DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=STORE_SERIALIZED_DAGS)
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_should_response_200(self):
def test_should_response_200_serialized(self):
# Create empty app with empty dagbag to check if DAG is read from db
app_serialized = app.create_app(testing=True) # type:ignore
dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True)
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
app_serialized.dag_bag = dag_bag # type:ignore
client = app_serialized.test_client()

Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_should_response_200(self):
def test_should_response_200_serialized(self):
# Create empty app with empty dagbag to check if DAG is read from db
app_serialized = app.create_app(testing=True) # type:ignore
dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True)
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
app_serialized.dag_bag = dag_bag # type:ignore
client = app_serialized.test_client()

Expand Down
8 changes: 4 additions & 4 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
def setUpClass(cls):
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
non_serialized_dagbag.sync_to_db()
cls.dagbag = DagBag(store_serialized_dags=True)
cls.dagbag = DagBag(read_dags_from_db=True)

def test_dag_file_processor_sla_miss_callback(self):
"""
Expand Down Expand Up @@ -1373,9 +1373,9 @@ def setUp(self):
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
def setUpClass(cls):
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
non_serialized_dagbag.sync_to_db()
cls.dagbag = DagBag(store_serialized_dags=True)
cls.dagbag = DagBag(read_dags_from_db=True)

def test_is_alive(self):
job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
Expand Down
4 changes: 2 additions & 2 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def test_deactivate_unknown_dags(self):
def test_serialized_dags_are_written_to_db_on_sync(self):
"""
Test that when dagbag.sync_to_db is called the DAGs are Serialized and written to DB
even when dagbag.store_serialized_dags is False
even when dagbag.read_dags_from_db is False
"""
with create_session() as session:
serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
Expand All @@ -653,7 +653,7 @@ def test_serialized_dags_are_written_to_db_on_sync(self):
include_examples=False)
dagbag.sync_to_db()

self.assertFalse(dagbag.store_serialized_dags)
self.assertFalse(dagbag.read_dags_from_db)

new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
self.assertEqual(new_serialized_dags_count, 1)