Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ def _trigger_dag(
)

run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
dag_run = DagRun.find(dag_id=dag_id, run_id=run_id)
dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)

if dag_run:
raise DagRunAlreadyExists(f"Run id {run_id} already exists for dag id {dag_id}")
raise DagRunAlreadyExists(
f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"
)

run_conf = None
if conf:
Expand Down
65 changes: 48 additions & 17 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,13 @@ def next_dagruns_to_examine(
query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
)

@staticmethod
@classmethod
@provide_session
def find(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr , I'm thinking while we're updating this file, should we also convert this function into a classmethod?

cls,
dag_id: Optional[Union[str, List[str]]] = None,
run_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
execution_date: Optional[Union[datetime, List[datetime]]] = None,

@gulshngill gulshngill Oct 20, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR but noticed that this did not match the type in the docstring

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, why not

state: Optional[DagRunState] = None,
external_trigger: Optional[bool] = None,
no_backfills: bool = False,
Expand Down Expand Up @@ -304,35 +305,65 @@ def find(
:param execution_end_date: dag run that was executed until this date
:type execution_end_date: datetime.datetime
"""
DR = DagRun

qry = session.query(DR)
qry = session.query(cls)
dag_ids = [dag_id] if isinstance(dag_id, str) else dag_id
if dag_ids:
qry = qry.filter(DR.dag_id.in_(dag_ids))
qry = qry.filter(cls.dag_id.in_(dag_ids))
if run_id:
qry = qry.filter(DR.run_id == run_id)
qry = qry.filter(cls.run_id == run_id)
if execution_date:
if isinstance(execution_date, list):
qry = qry.filter(DR.execution_date.in_(execution_date))
qry = qry.filter(cls.execution_date.in_(execution_date))
else:
qry = qry.filter(DR.execution_date == execution_date)
qry = qry.filter(cls.execution_date == execution_date)
if execution_start_date and execution_end_date:
qry = qry.filter(DR.execution_date.between(execution_start_date, execution_end_date))
qry = qry.filter(cls.execution_date.between(execution_start_date, execution_end_date))
elif execution_start_date:
qry = qry.filter(DR.execution_date >= execution_start_date)
qry = qry.filter(cls.execution_date >= execution_start_date)
elif execution_end_date:
qry = qry.filter(DR.execution_date <= execution_end_date)
qry = qry.filter(cls.execution_date <= execution_end_date)
if state:
qry = qry.filter(DR.state == state)
qry = qry.filter(cls.state == state)
if external_trigger is not None:
qry = qry.filter(DR.external_trigger == external_trigger)
qry = qry.filter(cls.external_trigger == external_trigger)
if run_type:
qry = qry.filter(DR.run_type == run_type)
qry = qry.filter(cls.run_type == run_type)
if no_backfills:
qry = qry.filter(DR.run_type != DagRunType.BACKFILL_JOB)
qry = qry.filter(cls.run_type != DagRunType.BACKFILL_JOB)

return qry.order_by(cls.execution_date).all()

@classmethod
@provide_session
def find_duplicate(
cls,
dag_id: str,
run_id: str,
execution_date: datetime,
session: Session = None,
) -> Optional['DagRun']:
"""
Return an existing run for the DAG with a specific run_id or execution_date.

return qry.order_by(DR.execution_date).all()
*None* is returned if no such DAG run is found.

:param dag_id: the dag_id to find duplicates for
:type dag_id: str
:param run_id: defines the run id for this dag run
:type run_id: str
:param execution_date: the execution date
:type execution_date: datetime.datetime
:param session: database session
:type session: sqlalchemy.orm.session.Session
"""
return (
session.query(cls)
.filter(
cls.dag_id == dag_id,
or_(cls.run_id == run_id, cls.execution_date == execution_date),
)
.one_or_none()
)

@staticmethod
def generate_run_id(run_type: DagRunType, execution_date: datetime) -> str:
Expand Down
6 changes: 3 additions & 3 deletions tests/api/common/experimental/test_trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
dag = DAG(dag_id)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_run_mock.find.return_value = DagRun()
dag_run_mock.find_duplicate.return_value = DagRun()
with pytest.raises(AirflowException):
_trigger_dag(dag_id, dag_bag_mock)

Expand All @@ -60,7 +60,7 @@ def test_trigger_dag_include_subdags(self, dag_bag_mock, dag_run_mock, dag_mock)
dag_id = "trigger_dag"
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag_mock
dag_run_mock.find.return_value = None
dag_run_mock.find_duplicate.return_value = None
dag1 = mock.MagicMock(subdags=[])
dag2 = mock.MagicMock(subdags=[])
dag_mock.subdags = [dag1, dag2]
Expand All @@ -76,7 +76,7 @@ def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, da
dag_id = "trigger_dag"
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag_mock
dag_run_mock.find.return_value = None
dag_run_mock.find_duplicate.return_value = None
dag1 = mock.MagicMock(subdags=[])
dag2 = mock.MagicMock(subdags=[dag1])
dag_mock.subdags = [dag1, dag2]
Expand Down
23 changes: 23 additions & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,29 @@ def test_dagrun_find(self):
assert 0 == len(models.DagRun.find(dag_id=dag_id2, external_trigger=True))
assert 1 == len(models.DagRun.find(dag_id=dag_id2, external_trigger=False))

def test_dagrun_find_duplicate(self):
session = settings.Session()
now = timezone.utcnow()

dag_id = "test_dagrun_find_duplicate"
dag_run = models.DagRun(
dag_id=dag_id,
run_id=dag_id,
run_type=DagRunType.MANUAL,
execution_date=now,
start_date=now,
state=State.RUNNING,
external_trigger=True,
)
session.add(dag_run)

session.commit()

assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=dag_id, execution_date=now) is not None
assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=dag_id, execution_date=None) is not None
assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=None, execution_date=now) is not None
assert models.DagRun.find_duplicate(dag_id=dag_id, run_id=None, execution_date=None) is None

def test_dagrun_success_when_all_skipped(self):
"""
Tests that a DAG run succeeds when all tasks are skipped
Expand Down