Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_dataset(id: int, session: Session = NEW_SESSION) -> APIResponse:
"""Get a Dataset"""
dataset = (
session.query(Dataset)
.options(joinedload(Dataset.downstream_dag_references), joinedload(Dataset.upstream_task_references))
.options(joinedload(Dataset.consuming_dags), joinedload(Dataset.producing_tasks))
.get(id)
)
if not dataset:
Expand Down Expand Up @@ -73,9 +73,7 @@ def get_datasets(
query = query.filter(Dataset.uri.ilike(f"%{uri_pattern}%"))
query = apply_sorting(query, order_by, {}, allowed_attrs)
datasets = (
query.options(
subqueryload(Dataset.downstream_dag_references), subqueryload(Dataset.upstream_task_references)
)
query.options(subqueryload(Dataset.consuming_dags), subqueryload(Dataset.producing_tasks))
.offset(offset)
.limit(limit)
.all()
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3519,11 +3519,11 @@ components:
type: string
description: The dataset update time
nullable: false
downstream_dag_references:
consuming_dags:
type: array
items:
$ref: '#/components/schemas/DatasetDagRef'
upstream_task_references:
producing_tasks:
type: array
items:
$ref: '#/components/schemas/DatasetTaskRef'
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/schemas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ class Meta:
extra = JsonObjectField()
created_at = auto_field()
updated_at = auto_field()
upstream_task_references = fields.List(fields.Nested(DatasetTaskRefSchema))
downstream_dag_references = fields.List(fields.Nested(DatasetDagRefSchema))
producing_tasks = fields.List(fields.Nested(DatasetTaskRefSchema))
consuming_dags = fields.List(fields.Nested(DatasetDagRefSchema))


class DatasetCollection(NamedTuple):
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class Dataset(Base):
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

downstream_dag_references = relationship("DatasetDagRef", back_populates="dataset")
upstream_task_references = relationship("DatasetTaskRef", back_populates="dataset")
consuming_dags = relationship("DatasetDagRef", back_populates="dataset")
producing_tasks = relationship("DatasetTaskRef", back_populates="dataset")

__tablename__ = "dataset"
__table_args__ = (
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,8 +1538,8 @@ def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None:
if not dataset:
self.log.warning("Dataset %s not found", obj)
continue
downstream_dag_ids = [x.dag_id for x in dataset.downstream_dag_references]
self.log.debug("downstream dag ids %s", downstream_dag_ids)
consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
self.log.debug("consuming dag ids %s", consuming_dag_ids)
session.add(
DatasetEvent(
dataset_id=dataset.id,
Expand All @@ -1549,7 +1549,7 @@ def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None:
source_map_index=self.map_index,
)
)
for dag_id in downstream_dag_ids:
for dag_id in consuming_dag_ids:
session.merge(DatasetDagRunQueue(dataset_id=dataset.id, target_dag_id=dag_id))

def _execute_task_with_callbacks(self, context, test_mode=False):
Expand Down
12 changes: 6 additions & 6 deletions airflow/www/static/js/datasets/Details.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const gridUrl = getMetaValue('grid_url');
const Details = ({
dataset: {
uri,
upstreamTaskReferences,
downstreamDagReferences,
producingTasks,
consumingDags,
},
}: { dataset: API.Dataset }) => (
<Box>
Expand All @@ -54,13 +54,13 @@ const Details = ({
{uri}
<ClipboardButton value={uri} iconOnly ml={2} />
</Heading>
{upstreamTaskReferences && !!upstreamTaskReferences.length && (
{producingTasks && !!producingTasks.length && (
<Box mb={2}>
<Flex alignItems="center">
<Heading size="md" fontWeight="normal">Producing Tasks</Heading>
<InfoTooltip label="Tasks that will update this dataset." size={14} />
</Flex>
{upstreamTaskReferences.map(({ dagId, taskId }) => (
{producingTasks.map(({ dagId, taskId }) => (
<Link
key={`${dagId}.${taskId}`}
color="blue.600"
Expand All @@ -72,13 +72,13 @@ const Details = ({
))}
</Box>
)}
{downstreamDagReferences && !!downstreamDagReferences.length && (
{consumingDags && !!consumingDags.length && (
<Box>
<Flex alignItems="center">
<Heading size="md" fontWeight="normal">Consuming DAGs</Heading>
<InfoTooltip label="DAGs that depend on this dataset updating to trigger a run." size={14} />
</Flex>
{downstreamDagReferences.map(({ dagId }) => (
{consumingDags.map(({ dagId }) => (
<Link
key={dagId}
color="blue.600"
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/static/js/datasets/List.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ const DatasetsList = ({ onSelect }: Props) => {
},
{
Header: UpstreamHeader,
accessor: 'upstreamTaskReferences',
accessor: 'producingTasks',
Cell: ({ cell: { value } }: any) => value.length,
disableSortBy: true,
},
{
Header: DownstreamHeader,
accessor: 'downstreamDagReferences',
accessor: 'consumingDags',
Cell: ({ cell: { value } }: any) => value.length,
disableSortBy: true,
},
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1477,8 +1477,8 @@ export interface components {
created_at?: string;
/** @description The dataset update time */
updated_at?: string;
downstream_dag_references?: components["schemas"]["DatasetDagRef"][];
upstream_task_references?: components["schemas"]["DatasetTaskRef"][];
consuming_dags?: components["schemas"]["DatasetDagRef"][];
producing_tasks?: components["schemas"]["DatasetTaskRef"][];
};
/**
* @description A datasets reference to an upstream task.
Expand Down
12 changes: 6 additions & 6 deletions tests/api_connexion/endpoints/test_dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def test_should_respond_200(self, session):
"extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
"downstream_dag_references": [],
"upstream_task_references": [],
"consuming_dags": [],
"producing_tasks": [],
}

def test_should_respond_404(self):
Expand Down Expand Up @@ -138,17 +138,17 @@ def test_should_respond_200(self, session):
"extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
"downstream_dag_references": [],
"upstream_task_references": [],
"consuming_dags": [],
"producing_tasks": [],
},
{
"id": 2,
"uri": "s3://bucket/key/2",
"extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
"downstream_dag_references": [],
"upstream_task_references": [],
"consuming_dags": [],
"producing_tasks": [],
},
],
"total_entries": 2,
Expand Down
12 changes: 6 additions & 6 deletions tests/api_connexion/schemas/test_dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ def test_serialize(self, dag_maker, session):
"extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
"downstream_dag_references": [
"consuming_dags": [
{
"dag_id": "test_dataset_downstream_schema",
"created_at": self.timestamp,
"updated_at": self.timestamp,
}
],
"upstream_task_references": [
"producing_tasks": [
{
"task_id": "task1",
"dag_id": "test_dataset_upstream_schema",
Expand Down Expand Up @@ -110,17 +110,17 @@ def test_serialize(self, session):
"extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
"downstream_dag_references": [],
"upstream_task_references": [],
"consuming_dags": [],
"producing_tasks": [],
},
{
"id": 2,
"uri": "s3://bucket/key/2",
"extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
"downstream_dag_references": [],
"upstream_task_references": [],
"consuming_dags": [],
"producing_tasks": [],
},
],
"total_entries": 2,
Expand Down
4 changes: 2 additions & 2 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,8 +841,8 @@ def test_bulk_write_to_db_datasets(self):
d2 = stored_datasets[d2.uri]
d3 = stored_datasets[d3.uri]
assert stored_datasets[uri1].extra == {"should": "be used"}
assert [x.dag_id for x in d1.downstream_dag_references] == [dag_id1]
assert [(x.task_id, x.dag_id) for x in d1.upstream_task_references] == [(task_id, dag_id2)]
assert [x.dag_id for x in d1.consuming_dags] == [dag_id1]
assert [(x.task_id, x.dag_id) for x in d1.producing_tasks] == [(task_id, dag_id2)]
assert set(
session.query(DatasetTaskRef.task_id, DatasetTaskRef.dag_id, DatasetTaskRef.dataset_id)
.filter(DatasetTaskRef.dag_id.in_((dag_id1, dag_id2)))
Expand Down