-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Add dag re-parsing request endpoint #39138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
5e1b315
Add dag re-parsing request endpoint
utkarsharma2 c7aede7
Fix migration files
utkarsharma2 d1b2e9e
Fix test_file_paths_in_queue_sorted_by_priority test
utkarsharma2 7a0ce40
Fix database cleanup issue
utkarsharma2 c0a7be8
Change http method to post
utkarsharma2 24e9f0f
Fix static checks
utkarsharma2 90703da
Remove created object from response
utkarsharma2 7ffedc1
Merge branch 'main' into DagReparsing
utkarsharma2 49771cf
Fix testcases
utkarsharma2 120672f
Merge branch 'main' into DagReparsing
utkarsharma2 8a4b8e3
Fix testcases
utkarsharma2 043d139
Merge branch 'main' into DagReparsing
utkarsharma2 f0840da
Change http response status for duplicate request
utkarsharma2 f927201
Fix tests
utkarsharma2 5a372e7
Changed the airflow version in migration file
utkarsharma2 e93c8f2
Merge branch 'main' into DagReparsing
utkarsharma2 0f128cc
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 15a38ad
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 f450105
Update airflow/models/dagbag.py
utkarsharma2 801b4cd
Update airflow/dag_processing/manager.py
utkarsharma2 aa09cd5
Better exception handling
utkarsharma2 f7b5807
Addressed PR comments
utkarsharma2 1279ff5
Merge branch 'main' into DagReparsing
utkarsharma2 24eeb97
Merge branch 'main' into DagReparsing
utkarsharma2 7fa0282
Fix tests
utkarsharma2 689c616
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 0e8f5d5
Update airflow/api_connexion/endpoints/dag_parsing.py
utkarsharma2 0241212
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 e080295
Update airflow/models/dagbag.py
utkarsharma2 f024421
Apply the suggestion from PR comments
utkarsharma2 5177618
Merge branch 'main' into DagReparsing
utkarsharma2 a670b09
Merge branch 'main' into DagReparsing
utkarsharma2 e0d7770
Remove __eq__ magic method
utkarsharma2 9bd67fb
Remove get_requests() method
utkarsharma2 14ca397
Optimize database request
uranusjr 7b89cfc
Update airflow/dag_processing/manager.py
utkarsharma2 1ab6a81
Fix test
utkarsharma2 5e4fb51
Change return status code
utkarsharma2 6240d87
Fix test
utkarsharma2 015e8d3
Merge branch 'main' into DagReparsing
utkarsharma2 5105e22
Merge branch 'main' into DagReparsing
utkarsharma2 0754769
Merge branch 'main' into DagReparsing
utkarsharma2 f00c263
Fix static checks
utkarsharma2 da7ffe4
Merge branch 'main' into DagReparsing
utkarsharma2 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| from http import HTTPStatus | ||
| from typing import TYPE_CHECKING, Sequence | ||
|
|
||
| from flask import Response, current_app | ||
| from itsdangerous import BadSignature, URLSafeSerializer | ||
| from sqlalchemy import exc, select | ||
|
|
||
| from airflow.api_connexion import security | ||
| from airflow.api_connexion.exceptions import NotFound, PermissionDenied | ||
| from airflow.auth.managers.models.resource_details import DagDetails | ||
| from airflow.models.dag import DagModel | ||
| from airflow.models.dagbag import DagPriorityParsingRequest | ||
| from airflow.utils.session import NEW_SESSION, provide_session | ||
| from airflow.www.extensions.init_auth_manager import get_auth_manager | ||
|
|
||
| if TYPE_CHECKING: | ||
| from sqlalchemy.orm import Session | ||
|
|
||
| from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest | ||
|
|
||
|
|
||
| @security.requires_access_dag("PUT") | ||
| @provide_session | ||
| def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response: | ||
| """Request re-parsing a DAG file.""" | ||
| secret_key = current_app.config["SECRET_KEY"] | ||
| auth_s = URLSafeSerializer(secret_key) | ||
| try: | ||
| path = auth_s.loads(file_token) | ||
| except BadSignature: | ||
| raise NotFound("File not found") | ||
|
|
||
| requests: Sequence[IsAuthorizedDagRequest] = [ | ||
| {"method": "PUT", "details": DagDetails(id=dag_id)} | ||
| for dag_id in session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == path)) | ||
| ] | ||
| if not requests: | ||
| raise NotFound("File not found") | ||
|
|
||
| # Check if user has read access to all the DAGs defined in the file | ||
| if not get_auth_manager().batch_is_authorized_dag(requests): | ||
| raise PermissionDenied() | ||
|
|
||
| parsing_request = DagPriorityParsingRequest(fileloc=path) | ||
| session.add(parsing_request) | ||
| try: | ||
| session.commit() | ||
| except exc.IntegrityError: | ||
| session.rollback() | ||
| return Response("Duplicate request", HTTPStatus.CREATED) | ||
| return Response(status=HTTPStatus.CREATED) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
52 changes: 52 additions & 0 deletions
52
airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| """Added DagPriorityParsingRequest table. | ||
|
|
||
| Revision ID: c4602ba06b4b | ||
| Revises: 677fdbb7fc54 | ||
| Create Date: 2024-04-17 17:12:05.473889 | ||
|
|
||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "c4602ba06b4b" | ||
| down_revision = "677fdbb7fc54" | ||
| branch_labels = None | ||
| depends_on = None | ||
| airflow_version = "2.10.0" | ||
|
|
||
|
|
||
| def upgrade(): | ||
| """Apply Added DagPriorityParsingRequest table.""" | ||
| op.create_table( | ||
| "dag_priority_parsing_request", | ||
| sa.Column("id", sa.String(length=32), nullable=False), | ||
| sa.Column("fileloc", sa.String(length=2000), nullable=False), | ||
| sa.PrimaryKeyConstraint("id", name=op.f("dag_priority_parsing_request_pkey")), | ||
| ) | ||
|
|
||
|
|
||
| def downgrade(): | ||
| """Unapply Added DagPriorityParsingRequest table.""" | ||
| op.drop_table("dag_priority_parsing_request") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| 468c1db106e059c4a97f07b9f8be7edfa487099113e4611c74f61f17c0ea0d82 | ||
| 6ae5e112d66c30d36fbc27a608355ffd66853e34d7538223f69a71e2eba54b59 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.