From 289e7682ea0d265bf9bee8ce9e43151a7b744f74 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 12 Aug 2022 14:22:17 +0100 Subject: [PATCH 1/2] Renamed up/downstream references to producing/consuming We changed the UI to use these new terms already, but the code wasn't changed. These names are clearer to me (and others) so I have made the code reflect them too. --- airflow/api_connexion/endpoints/dataset_endpoint.py | 6 ++---- airflow/api_connexion/openapi/v1.yaml | 4 ++-- airflow/api_connexion/schemas/dataset_schema.py | 4 ++-- airflow/models/dataset.py | 4 ++-- airflow/models/taskinstance.py | 2 +- airflow/www/static/js/datasets/Details.tsx | 12 ++++++------ airflow/www/static/js/datasets/List.tsx | 4 ++-- airflow/www/static/js/types/api-generated.ts | 4 ++-- .../api_connexion/endpoints/test_dataset_endpoint.py | 12 ++++++------ tests/api_connexion/schemas/test_dataset_schema.py | 12 ++++++------ tests/models/test_dag.py | 4 ++-- 11 files changed, 33 insertions(+), 35 deletions(-) diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py b/airflow/api_connexion/endpoints/dataset_endpoint.py index b6d4ac0ca5a4a..dbfafec6e37f1 100644 --- a/airflow/api_connexion/endpoints/dataset_endpoint.py +++ b/airflow/api_connexion/endpoints/dataset_endpoint.py @@ -42,7 +42,7 @@ def get_dataset(id: int, session: Session = NEW_SESSION) -> APIResponse: """Get a Dataset""" dataset = ( session.query(Dataset) - .options(joinedload(Dataset.downstream_dag_references), joinedload(Dataset.upstream_task_references)) + .options(joinedload(Dataset.consuming_dags), joinedload(Dataset.producing_tasks)) .get(id) ) if not dataset: @@ -73,9 +73,7 @@ def get_datasets( query = query.filter(Dataset.uri.ilike(f"%{uri_pattern}%")) query = apply_sorting(query, order_by, {}, allowed_attrs) datasets = ( - query.options( - subqueryload(Dataset.downstream_dag_references), subqueryload(Dataset.upstream_task_references) - ) + query.options(subqueryload(Dataset.consuming_dags), subqueryload(Dataset.producing_tasks)) .offset(offset) .limit(limit) .all() diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 74baeaa122b77..1056d63a74bec 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3519,11 +3519,11 @@ components: type: string description: The dataset update time nullable: false - downstream_dag_references: + consuming_dags: type: array items: $ref: '#/components/schemas/DatasetDagRef' - upstream_task_references: + producing_tasks: type: array items: $ref: '#/components/schemas/DatasetTaskRef' diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py index 7ff3e638e12d8..ffb8b88e4b94a 100644 --- a/airflow/api_connexion/schemas/dataset_schema.py +++ b/airflow/api_connexion/schemas/dataset_schema.py @@ -64,8 +64,8 @@ class Meta: extra = JsonObjectField() created_at = auto_field() updated_at = auto_field() - upstream_task_references = fields.List(fields.Nested(DatasetTaskRefSchema)) - downstream_dag_references = fields.List(fields.Nested(DatasetDagRefSchema)) + producing_tasks = fields.List(fields.Nested(DatasetTaskRefSchema)) + consuming_dags = fields.List(fields.Nested(DatasetDagRefSchema)) class DatasetCollection(NamedTuple): diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index c1dfd866c5ff0..bec94b5064922 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -52,8 +52,8 @@ class Dataset(Base): created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) - downstream_dag_references = relationship("DatasetDagRef", back_populates="dataset") - upstream_task_references = relationship("DatasetTaskRef", back_populates="dataset") + consuming_dags = relationship("DatasetDagRef", back_populates="dataset") + producing_tasks = relationship("DatasetTaskRef", back_populates="dataset") __tablename__ = "dataset" __table_args__ = ( diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index ebdf07a3820e1..69ac247749804 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1538,7 +1538,7 @@ def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None: if not dataset: self.log.warning("Dataset %s not found", obj) continue - downstream_dag_ids = [x.dag_id for x in dataset.downstream_dag_references] + downstream_dag_ids = [x.dag_id for x in dataset.consuming_dags] self.log.debug("downstream dag ids %s", downstream_dag_ids) session.add( DatasetEvent( diff --git a/airflow/www/static/js/datasets/Details.tsx b/airflow/www/static/js/datasets/Details.tsx index c412c2b12e9a5..d871fe317fc50 100644 --- a/airflow/www/static/js/datasets/Details.tsx +++ b/airflow/www/static/js/datasets/Details.tsx @@ -43,8 +43,8 @@ const gridUrl = getMetaValue('grid_url'); const Details = ({ dataset: { uri, - upstreamTaskReferences, - downstreamDagReferences, + producingTasks, + consumingDags, }, }: { dataset: API.Dataset }) => ( @@ -54,13 +54,13 @@ const Details = ({ {uri} - {upstreamTaskReferences && !!upstreamTaskReferences.length && ( + {producingTasks && !!producingTasks.length && ( Producing Tasks - {upstreamTaskReferences.map(({ dagId, taskId }) => ( + {producingTasks.map(({ dagId, taskId }) => ( )} - {downstreamDagReferences && !!downstreamDagReferences.length && ( + {consumingDags && !!consumingDags.length && ( Consuming DAGs - {downstreamDagReferences.map(({ dagId }) => ( + {consumingDags.map(({ dagId }) => ( { }, { Header: UpstreamHeader, - accessor: 'upstreamTaskReferences', + accessor: 'producingTasks', Cell: ({ cell: { value } }: any) => value.length, disableSortBy: true, }, { Header: DownstreamHeader, - accessor: 'downstreamDagReferences', + accessor: 'consumingDags', Cell: ({ cell: { value } }: any) => value.length, disableSortBy: true, }, diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 0b016f430ffdf..8df89497f7149 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1477,8 +1477,8 @@ export interface components { created_at?: string; /** @description The dataset update time */ updated_at?: string; - downstream_dag_references?: components["schemas"]["DatasetDagRef"][]; - upstream_task_references?: components["schemas"]["DatasetTaskRef"][]; + consuming_dags?: components["schemas"]["DatasetDagRef"][]; + producing_tasks?: components["schemas"]["DatasetTaskRef"][]; }; /** * @description A datasets reference to an upstream task. diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py b/tests/api_connexion/endpoints/test_dataset_endpoint.py index 6e715927857ce..3343b8d208621 100644 --- a/tests/api_connexion/endpoints/test_dataset_endpoint.py +++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py @@ -89,8 +89,8 @@ def test_should_respond_200(self, session): "extra": {'foo': 'bar'}, "created_at": self.default_time, "updated_at": self.default_time, - "downstream_dag_references": [], - "upstream_task_references": [], + "consuming_dags": [], + "producing_tasks": [], } def test_should_respond_404(self): @@ -138,8 +138,8 @@ def test_should_respond_200(self, session): "extra": {'foo': 'bar'}, "created_at": self.default_time, "updated_at": self.default_time, - "downstream_dag_references": [], - "upstream_task_references": [], + "consuming_dags": [], + "producing_tasks": [], }, { "id": 2, @@ -147,8 +147,8 @@ def test_should_respond_200(self, session): "extra": {'foo': 'bar'}, "created_at": self.default_time, "updated_at": self.default_time, - "downstream_dag_references": [], - "upstream_task_references": [], + "consuming_dags": [], + "producing_tasks": [], }, ], "total_entries": 2, diff --git a/tests/api_connexion/schemas/test_dataset_schema.py b/tests/api_connexion/schemas/test_dataset_schema.py index 9703e2f0c355d..6f36718300f93 100644 --- a/tests/api_connexion/schemas/test_dataset_schema.py +++ b/tests/api_connexion/schemas/test_dataset_schema.py @@ -67,14 +67,14 @@ def test_serialize(self, dag_maker, session): "extra": {'foo': 'bar'}, "created_at": self.timestamp, "updated_at": self.timestamp, - "downstream_dag_references": [ + "consuming_dags": [ { "dag_id": "test_dataset_downstream_schema", "created_at": self.timestamp, "updated_at": self.timestamp, } ], - "upstream_task_references": [ + "producing_tasks": [ { "task_id": "task1", "dag_id": "test_dataset_upstream_schema", @@ -110,8 +110,8 @@ def test_serialize(self, session): "extra": {'foo': 'bar'}, "created_at": self.timestamp, "updated_at": self.timestamp, - "downstream_dag_references": [], - "upstream_task_references": [], + "consuming_dags": [], + "producing_tasks": [], }, { "id": 2, @@ -119,8 +119,8 @@ def test_serialize(self, session): "extra": {'foo': 'bar'}, "created_at": self.timestamp, "updated_at": self.timestamp, - "downstream_dag_references": [], - "upstream_task_references": [], + "consuming_dags": [], + "producing_tasks": [], }, ], "total_entries": 2, diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 083b680f17731..b9d812728c248 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -841,8 +841,8 @@ def test_bulk_write_to_db_datasets(self): d2 = stored_datasets[d2.uri] d3 = stored_datasets[d3.uri] assert stored_datasets[uri1].extra == {"should": "be used"} - assert [x.dag_id for x in d1.downstream_dag_references] == [dag_id1] - assert [(x.task_id, x.dag_id) for x in d1.upstream_task_references] == [(task_id, dag_id2)] + assert [x.dag_id for x in d1.consuming_dags] == [dag_id1] + assert [(x.task_id, x.dag_id) for x in d1.producing_tasks] == [(task_id, dag_id2)] assert set( session.query(DatasetTaskRef.task_id, DatasetTaskRef.dag_id, DatasetTaskRef.dataset_id) .filter(DatasetTaskRef.dag_id.in_((dag_id1, dag_id2))) From 66ac8f3b886642d6adc321d89819a6c6c93691af Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 12 Aug 2022 14:52:26 +0100 Subject: [PATCH 2/2] fixup! Renamed up/downstream references to producing/consuming --- airflow/models/taskinstance.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 69ac247749804..765a50169fcdc 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1538,8 +1538,8 @@ def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None: if not dataset: self.log.warning("Dataset %s not found", obj) continue - downstream_dag_ids = [x.dag_id for x in dataset.consuming_dags] - self.log.debug("downstream dag ids %s", downstream_dag_ids) + consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags] + self.log.debug("consuming dag ids %s", consuming_dag_ids) session.add( DatasetEvent( dataset_id=dataset.id, @@ -1549,7 +1549,7 @@ def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None: source_map_index=self.map_index, ) ) - for dag_id in downstream_dag_ids: + for dag_id in consuming_dag_ids: session.merge(DatasetDagRunQueue(dataset_id=dataset.id, target_dag_id=dag_id)) def _execute_task_with_callbacks(self, context, test_mode=False):