Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
aa75b20
add endpoint for aborting a dag run #15888
bbenshalom Aug 26, 2021
ee84594
add endpoint for aborting a dag run #15888
bbenshalom Aug 26, 2021
ddd62a0
update api
bbenshalom Aug 26, 2021
c85dc99
change endpoint to accept any state
bbenshalom Aug 26, 2021
e2ef141
created tests for endpoint
bbenshalom Aug 27, 2021
0ad1bb3
merge session as well
bbenshalom Aug 27, 2021
e1308fa
Change type hint of dag run variable to optional
bbenshalom Aug 27, 2021
3c3b4a5
add import
bbenshalom Aug 27, 2021
7d77191
Merge branch 'main' into api-set-dag-run-state
bbenshalom Aug 27, 2021
0c72cb7
PR requested changes
bbenshalom Aug 28, 2021
ed25b62
Merge branch 'main' into api-set-dag-run-state
bbenshalom Aug 28, 2021
6cb54fc
apply PR suggestion
bbenshalom Aug 28, 2021
ea92dd2
fix tests and remove schema load
bbenshalom Aug 28, 2021
5dd182b
fix schema and run pre commit
bbenshalom Aug 28, 2021
b3cf3a6
Merge branch 'main' into api-set-dag-run-state
bbenshalom Aug 28, 2021
5a6b7a8
PR feedback
bbenshalom Aug 31, 2021
95d3472
Merge branch 'main' into api-set-dag-run-state
bbenshalom Aug 31, 2021
09f1d4f
More PR feedback
bbenshalom Sep 1, 2021
28db7f2
Merge branch 'main' into api-set-dag-run-state
bbenshalom Sep 1, 2021
9caf0a8
use DagRunState instead of State
bbenshalom Sep 1, 2021
65fa088
Apply suggestions from code review
bbenshalom Sep 1, 2021
fb11ba5
better tests
bbenshalom Sep 1, 2021
6302281
Merge branch 'main' into api-set-dag-run-state
bbenshalom Sep 2, 2021
12e43d6
change post action to patch
bbenshalom Sep 2, 2021
229dd1a
change summary and description of endpoint
bbenshalom Sep 2, 2021
003e772
fix for flake8
bbenshalom Sep 2, 2021
03c0012
Merge remote-tracking branch 'upstream/api-set-dag-run-state' into ap…
bbenshalom Sep 2, 2021
b4408b6
Merge branch 'main' into api-set-dag-run-state
bbenshalom Sep 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

from flask import current_app, g, request
from marshmallow import ValidationError
from sqlalchemy import or_
Expand All @@ -27,11 +29,12 @@
dagrun_collection_schema,
dagrun_schema,
dagruns_batch_form_schema,
set_dagrun_state_form_schema,
)
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.utils.state import State
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunType


Expand Down Expand Up @@ -271,3 +274,29 @@ def post_dag_run(dag_id, session):
)

raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")


@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
@provide_session
def update_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:
"""Set a state of a dag run."""
dag_run: Optional[DagRun] = (
session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
)
if dag_run is None:
error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
raise NotFound(error_message)
try:
post_body = set_dagrun_state_form_schema.load(request.json)
except ValidationError as err:
raise BadRequest(detail=str(err))

state = post_body['state']
dag_run.set_state(state=DagRunState(state))
session.merge(dag_run)
return dagrun_schema.dump(dag_run)
39 changes: 39 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,35 @@ paths:
'404':
$ref: '#/components/responses/NotFound'

patch:
summary: Modify a DAG run
description: Modify a DAG run
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
operationId: update_dag_run_state
tags: [ UpdateDagRunState ]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/UpdateDagRunState'

responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRun'
'400':
$ref: '#/components/responses/BadRequest'
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'

/eventLogs:
get:
summary: List log entries
Expand Down Expand Up @@ -1996,6 +2025,16 @@ components:
required:
- dag_id

UpdateDagRunState:
type: object
properties:
state:
description: The state to set this DagRun
type: string
enum:
- success
- failed
Comment thread
bbenshalom marked this conversation as resolved.

DAGRunCollection:
type: object
description: Collection of DAG runs.
Expand Down
10 changes: 9 additions & 1 deletion airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import json
from typing import List, NamedTuple

from marshmallow import fields, post_dump, pre_load
from marshmallow import fields, post_dump, pre_load, validate
from marshmallow.schema import Schema
from marshmallow.validate import Range
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
Expand All @@ -29,6 +29,7 @@
from airflow.api_connexion.schemas.enum_schemas import DagStateField
from airflow.models.dagrun import DagRun
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType


Expand Down Expand Up @@ -104,6 +105,12 @@ def autofill(self, data, **kwargs):
return data


class SetDagRunStateFormSchema(Schema):
Comment thread
bbenshalom marked this conversation as resolved.
"""Schema for handling the request of setting state of DAG run"""

state = DagStateField(validate=validate.OneOf([DagRunState.SUCCESS.value, DagRunState.FAILED.value]))


class DAGRunCollection(NamedTuple):
"""List of DAGRuns with metadata"""

Expand Down Expand Up @@ -141,4 +148,5 @@ class Meta:

dagrun_schema = DAGRunSchema()
dagrun_collection_schema = DAGRunCollectionSchema()
set_dagrun_state_form_schema = SetDagRunStateFormSchema()
dagruns_batch_form_schema = DagRunsBatchFormSchema()
88 changes: 88 additions & 0 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
from unittest import mock

import pytest
from freezegun import freeze_time
from parameterized import parameterized

from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models import DAG, DagModel, DagRun
from airflow.operators.dummy import DummyOperator
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
Expand Down Expand Up @@ -1154,3 +1156,89 @@ def test_should_raises_403_unauthorized(self):
environ_overrides={'REMOTE_USER': "test_view_dags"},
)
assert response.status_code == 403


class TestPostSetDagRunState(TestDagRunEndpoint):
@pytest.mark.parametrize("state", ["failed", "success"])
@freeze_time(TestDagRunEndpoint.default_time)
def test_should_respond_200(self, state, dag_maker):
dag_id = "TEST_DAG_ID"
dag_run_id = 'TEST_DAG_RUN_ID'
with dag_maker(dag_id):
DummyOperator(task_id='task_id')
dag_maker.create_dagrun(run_id=dag_run_id)

request_json = {"state": state}

response = self.client.patch(
f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 200
assert response.json == {
'conf': {},
'dag_id': dag_id,
'dag_run_id': dag_run_id,
'end_date': self.default_time,
'execution_date': dag_maker.start_date.isoformat(),
'external_trigger': False,
'logical_date': dag_maker.start_date.isoformat(),
'start_date': dag_maker.start_date.isoformat(),
'state': state,
}

@pytest.mark.parametrize('invalid_state', ["running", "queued"])
@freeze_time(TestDagRunEndpoint.default_time)
def test_should_response_400_for_non_existing_dag_run_state(self, invalid_state, dag_maker):
dag_id = "TEST_DAG_ID"
dag_run_id = 'TEST_DAG_RUN_ID'
with dag_maker(dag_id):
DummyOperator(task_id='task_id')
dag_maker.create_dagrun(run_id=dag_run_id)

request_json = {"state": invalid_state}

response = self.client.patch(
"api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json == {
'detail': f"'{invalid_state}' is not one of ['success', 'failed'] - 'state'",
'status': 400,
'title': 'Bad Request',
'type': EXCEPTIONS_LINK_MAP[400],
}

def test_should_raises_401_unauthenticated(self, session):
response = self.client.patch(
"api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1",
json={
"state": 'success',
},
)

assert_401(response)

def test_should_raise_403_forbidden(self):
response = self.client.patch(
"api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1",
json={
"state": 'success',
},
environ_overrides={'REMOTE_USER': "test_no_permissions"},
)
assert response.status_code == 403

def test_should_respond_404(self):
response = self.client.patch(
"api/v1/dags/INVALID_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1",
json={
"state": 'success',
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404