Skip to content

Use execution_date to check for existing DagRun for TriggerDagRunOperator#18968

Merged
kaxil merged 12 commits into
apache:mainfrom
gulshngill:change-trigger-dag
Nov 3, 2021
Merged

Use execution_date to check for existing DagRun for TriggerDagRunOperator#18968
kaxil merged 12 commits into
apache:mainfrom
gulshngill:change-trigger-dag

Conversation

@gulshngill

@gulshngill gulshngill commented Oct 14, 2021

Copy link
Copy Markdown
Contributor

A small suggestion to change DagRun.find in trigger_dag to use execution_date as a parameter rather than run_id.

I feel it would be better to use this rather than run_id as a parameter since using run_id will miss out checking for a scheduled run that ran at the same execution_date and throw the error below when it tries to create a new run with the same execution_date:

sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"

There is a constraint in dag_run called dag_run_dag_id_execution_date_key which can be found here.

@boring-cyborg boring-cyborg Bot added the area:API Airflow's REST/HTTP API label Oct 14, 2021
@boring-cyborg

boring-cyborg Bot commented Oct 14, 2021

Copy link
Copy Markdown

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@kaxil kaxil requested review from uranusjr and removed request for kaxil October 14, 2021 19:42
uranusjr
uranusjr previously approved these changes Oct 17, 2021
@github-actions github-actions Bot added the full tests needed We need to run full set of tests for this PR to merge label Oct 17, 2021
@github-actions

Copy link
Copy Markdown
Contributor

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@uranusjr

Copy link
Copy Markdown
Member

As a side note, we are moving toward decoupling DAG runs against execution_date and removing the unique constraint eventually, and when that happens this code will become (semantically) incorrect. But until that happens, this is the correct thing to do, and wouldn't break anything even after the constraint's removal since it only adds a superfulous restriction on when DAG runs can be created, which is more appropriate than the functionality throwing exceptions right now.

@uranusjr

Copy link
Copy Markdown
Member

Actually, DagRun has a unique constraint on both execution_date and run_id. So while this fixes the duplicated execution_date issue, it'd introduce a new issue of duplicated run_id (which is possible with manual runs, where the user can customise run_id). So this should do a SQL WHERE run_id = ? OR execution_date = ? query instead. DagRun.find() does not support this, but I think the lookup logic is common enough to warrant a new method, say DagRun.find_duplicate(dag_id, run_id, execution_date) (where all three arguments are required). There is actually already at least one occurence, maybe more (I didn't search very hard).

dagrun_instance = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
or_(DagRun.run_id == run_id, DagRun.execution_date == logical_date),
)
.first()
)

Comment thread airflow/models/dagrun.py
dag_id: Optional[Union[str, List[str]]] = None,
run_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
execution_date: Optional[Union[datetime, List[datetime]]] = None,

@gulshngill gulshngill Oct 20, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR but noticed that this did not match the type in the docstring

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, why not

@gulshngill

Copy link
Copy Markdown
Contributor Author

Hi @uranusjr , thanks for taking the time to review this PR and for the suggestion.
I've made the changes as suggested and updated the unit tests as well, I'll continue with doing the full matrix of tests

Comment thread airflow/models/dagrun.py Outdated
Comment thread airflow/models/dagrun.py Outdated
@uranusjr uranusjr added this to the Airflow 2.2.2 milestone Oct 29, 2021
Comment thread airflow/models/dagrun.py
@@ -270,7 +270,7 @@ def next_dagruns_to_examine(
def find(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr , I'm thinking while we're updating this file, should we also convert this function into a classmethod?

@gulshngill gulshngill requested a review from uranusjr October 29, 2021 10:45
Comment thread tests/models/test_dagrun.py Outdated
@kaxil kaxil changed the title Suggestion to change DagRun.find in trigger_dag to use execution_date instead of run_id Use execution_date to check for existing DagRun for TriggerDagRunOperator Nov 2, 2021
@kaxil kaxil merged commit e54ee6e into apache:main Nov 3, 2021
@boring-cyborg

boring-cyborg Bot commented Nov 3, 2021

Copy link
Copy Markdown

Awesome work, congrats on your first merged pull request!

jedcunningham pushed a commit that referenced this pull request Nov 3, 2021
…DagRunOperator`` (#18968)

A small suggestion to change `DagRun.find` in `trigger_dag` to use `execution_date` as a parameter rather than `run_id`.

I feel it would be better to use this rather than `run_id` as a parameter since using `run_id` will miss out checking for a scheduled run that ran at the same `execution_date` and throw the error below when it tries to create a new run with the same `execution_date`:

```
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"
```

There is a constraint in `dag_run` called `dag_run_dag_id_execution_date_key` which can be found [here](https://github.com/apache/airflow/blob/c4f5233cd10ae03ee69fba861c8a6fa64e1f8a71/airflow/models/dagrun.py#L103).

(cherry picked from commit e54ee6e)
@jedcunningham jedcunningham added the type:bug-fix Changelog: Bug Fixes label Apr 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants