Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import pendulum
from dateutil.relativedelta import relativedelta
from pendulum.tz.timezone import Timezone
from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, not_, or_
from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, and_, case, func, not_, or_
from sqlalchemy.orm import backref, joinedload, relationship
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session
Expand All @@ -71,6 +71,7 @@
from airflow.models.dagcode import DagCode
from airflow.models.dagpickle import DagPickle
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DatasetDagRef, DatasetDagRunQueue as DDRQ
from airflow.models.operator import Operator
from airflow.models.param import DagParam, ParamsDict
from airflow.models.taskinstance import Context, TaskInstance, TaskInstanceKey, clear_task_instances
Expand Down Expand Up @@ -187,6 +188,32 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
return query.first()


def get_dataset_triggered_next_run_info(dag_ids: List[str], *, session: Session) -> Dict[str, str]:
"""
Given a list of dag_ids, get string representing how close any that are dataset triggered are
their next run, e.g. "1 of 2 datasets updated"
"""
return {
x.dag_id: f"{x.ready} of {x.total} datasets updated"
for x in session.query(
DatasetDagRef.dag_id,
func.count().label("total"),
func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)).label("ready"),
)
.join(
DDRQ,
and_(
DDRQ.dataset_id == DatasetDagRef.dataset_id,
DDRQ.target_dag_id == DatasetDagRef.dag_id,
),
isouter=True,
)
.group_by(DatasetDagRef.dag_id)
.filter(DatasetDagRef.dag_id.in_(dag_ids))
.all()
}


@functools.total_ordering
class DAG(LoggingMixin):
"""
Expand Down Expand Up @@ -3057,6 +3084,12 @@ def calculate_dagrun_date_fields(
self.next_dagrun_create_after,
)

@provide_session
def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> Optional[str]:
if self.schedule_interval != "Dataset":
return None
return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]


# NOTE: Please keep the list of arguments in sync with DAG.__init__.
# Only exception: dag_id here should have a default value, but not in DAG.
Expand Down
7 changes: 6 additions & 1 deletion airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,16 @@ <h4 class="pull-right" style="user-select: none;-moz-user-select: auto;">
{% if dag_model is defined and dag_model and dag_model.timetable_description %}
<span class="material-icons text-muted js-tooltip" aria-hidden="true" data-original-title="Schedule: {{ dag_model.timetable_description|string }}">info</span>
{% endif %}
{% if dag_model is defined and dag_model.next_dagrun is defined and dag_model.schedule_interval != 'dataset-triggered' %}
{% if dag_model is defined and dag_model.next_dagrun is defined and dag_model.schedule_interval != 'Dataset' %}
<p class="label label-default js-tooltip" style="margin-left: 5px" id="next-run" data-html="true" data-placement="bottom">
Next Run: <time datetime="{{ dag_model.next_dagrun }}">{{ dag_model.next_dagrun }}</time>
</p>
{% endif %}
{% if dag_model is defined and dag_model.schedule_interval is defined and dag_model.schedule_interval == 'Dataset' %}
<p class="label label-default" style="margin-left: 5px">
Next Run: {{ dag_model.get_dataset_triggered_next_run_info() }}
</p>
{% endif %}
</h4>
</div>
<div class="clearfix"></div>
Expand Down
5 changes: 4 additions & 1 deletion airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ <h2>{{ page_title }}</h2>
</th>
<th style="width:180px;">{{ sortable_column("Next Run", "next_dagrun") }}
<span class="material-icons text-muted js-tooltip" aria-hidden="true"
title="Expected Date/Time of the next Dag Run.">info</span>
title="Expected date/time of the next DAG Run, or for dataset triggered DAGs, how many datasets have been updated since the last DAG Run.">info</span>
</th>
<th>Recent Tasks
<span class="material-icons text-muted js-tooltip" aria-hidden="true"
Expand Down Expand Up @@ -289,6 +289,9 @@ <h2>{{ page_title }}</h2>
</span>
</td>
<td class="text-nowrap">
{% if dag.dag_id in dataset_triggered_next_run_info %}
{{ dataset_triggered_next_run_info[dag.dag_id] }}
{% endif %}
{% if dag.next_dagrun is not none %}
<time datetime="{{ dag.next_dagrun }}">{{ dag.next_dagrun }}</time>
{% endif %}
Expand Down
13 changes: 12 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@
from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.triggerer_job import TriggererJob
from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors
from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.dag import DAG, get_dataset_triggered_next_run_info
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.operator import Operator
Expand Down Expand Up @@ -852,6 +853,14 @@ def index(self):
permissions.RESOURCE_DAG,
) in user_permissions

dataset_triggered_dag_ids = {dag.dag_id for dag in dags if dag.schedule_interval == "Dataset"}
Comment thread
dstandish marked this conversation as resolved.
Outdated
if dataset_triggered_dag_ids:
dataset_triggered_next_run_info = get_dataset_triggered_next_run_info(
dataset_triggered_dag_ids, session=session
)
else:
dataset_triggered_next_run_info = {}

for dag in dags:
dag_resource_name = permissions.RESOURCE_DAG_PREFIX + dag.dag_id
if all_dags_editable:
Expand Down Expand Up @@ -970,6 +979,7 @@ def _iter_parsed_moved_data_table_names():
sorting_key=arg_sorting_key,
sorting_direction=arg_sorting_direction,
auto_refresh_interval=conf.getint('webserver', 'auto_refresh_interval'),
dataset_triggered_next_run_info=dataset_triggered_next_run_info,
)

@expose('/datasets')
Expand Down Expand Up @@ -3341,6 +3351,7 @@ def landing_times(self, dag_id, session=None):
}
)
chart.buildcontent()

return self.render_template(
'airflow/chart.html',
dag=dag,
Expand Down
45 changes: 41 additions & 4 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from typing import List, Optional
from unittest import mock
from unittest.mock import patch
from uuid import uuid4

import jinja2
import pendulum
Expand All @@ -43,10 +44,10 @@
from airflow.configuration import conf
from airflow.decorators import task as task_decorator
from airflow.exceptions import AirflowException, DuplicateTaskIdFound, ParamValidationError
from airflow.models import DAG, DagModel, DagRun, DagTag, Dataset, TaskFail, TaskInstance as TI
from airflow.models import DAG, DagModel, DagRun, DagTag, TaskFail, TaskInstance as TI
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import dag as dag_decorator
from airflow.models.dataset import DatasetTaskRef
from airflow.models.dag import dag as dag_decorator, get_dataset_triggered_next_run_info
from airflow.models.dataset import Dataset, DatasetDagRunQueue, DatasetTaskRef
from airflow.models.param import DagParam, Param, ParamsDict
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
Expand All @@ -65,7 +66,7 @@
from airflow.utils.weight_rule import WeightRule
from tests.models import DEFAULT_DATE
from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.db import clear_db_dags, clear_db_runs
from tests.test_utils.db import clear_db_dags, clear_db_datasets, clear_db_runs
from tests.test_utils.mapping import expand_mapped_task
from tests.test_utils.timetables import cron_timetable, delta_timetable

Expand Down Expand Up @@ -2582,3 +2583,39 @@ def test__time_restriction(dag_maker, dag_date, tasks_date, restrict):
EmptyOperator(task_id="do2", start_date=tasks_date[1][0], end_date=tasks_date[1][1])

assert dag._time_restriction == restrict


@pytest.fixture()
def reset_dataset():
clear_db_datasets()
yield
clear_db_datasets()


def test_get_dataset_triggered_next_run_info(session, reset_dataset):
unique_id = str(uuid4())
dataset1 = Dataset(uri=f"s3://{unique_id}-1")
dataset2 = Dataset(uri=f"s3://{unique_id}-2")
dataset3 = Dataset(uri=f"s3://{unique_id}-3")
dag1 = DAG(dag_id=f"datasets-{unique_id}-1", schedule_on=[dataset2])
dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1, dataset2])
dag3 = DAG(dag_id=f"datasets-{unique_id}-3", schedule_on=[dataset1, dataset2, dataset3])
DAG.bulk_write_to_db(dags=[dag1, dag2, dag3], session=session)

session.commit()
session.bulk_save_objects(
[
DatasetDagRunQueue(dataset_id=dataset1.id, target_dag_id=dag2.dag_id),
DatasetDagRunQueue(dataset_id=dataset1.id, target_dag_id=dag3.dag_id),
]
)
session.commit()
session.expunge_all()

info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
assert "0 of 1 datasets updated" == info[dag1.dag_id]

# This time, check both dag2 and dag3 at the same time (tests filtering)
info = get_dataset_triggered_next_run_info([dag2.dag_id, dag3.dag_id], session=session)
assert "1 of 2 datasets updated" == info[dag2.dag_id]
assert "1 of 3 datasets updated" == info[dag3.dag_id]
2 changes: 1 addition & 1 deletion tests/www/views/test_views_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


def test_index(admin_client):
with assert_queries_count(14):
with assert_queries_count(15):
resp = admin_client.get('/', follow_redirects=True)
check_content_in_response('DAGs', resp)

Expand Down