diff --git a/airflow/providers/google/cloud/links/dataproc.py b/airflow/providers/google/cloud/links/dataproc.py index 917a1a7731cc7..2c54f96c39fa5 100644 --- a/airflow/providers/google/cloud/links/dataproc.py +++ b/airflow/providers/google/cloud/links/dataproc.py @@ -76,10 +76,14 @@ class DataprocLink(BaseOperatorLink): This link is deprecated. """ - warnings.warn( - "This DataprocLink is deprecated.", - AirflowProviderDeprecationWarning, - ) + def __init__(self, *args, **kwargs): + raise Exception() + warnings.warn( + "DataprocLink is deprecated. Please use Dataproc*Link classes", + AirflowProviderDeprecationWarning, + ) + super().__init__(*args, **kwargs) + name = "Dataproc resource" key = "conf" @@ -125,7 +129,13 @@ class DataprocListLink(BaseOperatorLink): This link is deprecated. """ - warnings.warn("This DataprocListLink is deprecated.", AirflowProviderDeprecationWarning) + def __init__(self, *args, **kwargs): + warnings.warn( + "DataprocListLink is deprecated. Please use Dataproc*ListLink classes", + AirflowProviderDeprecationWarning, + ) + super().__init__(*args, **kwargs) + name = "Dataproc resources" key = "list_conf" diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 09e76ae2069fe..d842a4595862f 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -43,13 +43,10 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.links.dataproc import ( DATAPROC_BATCH_LINK, - DATAPROC_CLUSTER_LINK_DEPRECATED, - DATAPROC_JOB_LINK_DEPRECATED, DataprocBatchesListLink, DataprocBatchLink, DataprocClusterLink, DataprocJobLink, - DataprocLink, DataprocWorkflowLink, DataprocWorkflowTemplateLink, ) @@ -742,7 +739,7 @@ class DataprocScaleClusterOperator(GoogleCloudBaseOperator): template_fields: Sequence[str] = ("cluster_name", "project_id", "region", "impersonation_chain") - operator_extra_links = (DataprocLink(),) + operator_extra_links = (DataprocClusterLink(),) def __init__( self, @@ -821,12 +818,15 @@ def execute(self, context: Context) -> None: update_mask = ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"] hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + # Hook always has a project_id as fallback so we can ignore assignment + project_id: str = self.project_id if self.project_id else hook.project_id # type: ignore[assignment] # Save data required to display extra link no matter what the cluster status will be - DataprocLink.persist( + DataprocClusterLink.persist( context=context, - task_instance=self, - url=DATAPROC_CLUSTER_LINK_DEPRECATED, - resource=self.cluster_name, + operator=self, + cluster_id=self.cluster_name, + project_id=project_id, + region=self.region, ) operation = hook.update_cluster( project_id=self.project_id, @@ -1000,7 +1000,7 @@ class DataprocJobBaseOperator(GoogleCloudBaseOperator): job_type = "" - operator_extra_links = (DataprocLink(),) + operator_extra_links = (DataprocJobLink(),) def __init__( self, @@ -1034,7 +1034,8 @@ def __init__( self.job_error_states = job_error_states if job_error_states is not None else {"ERROR"} self.impersonation_chain = impersonation_chain self.hook = DataprocHook(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain) - self.project_id = self.hook.project_id if project_id is None else project_id + # Hook project id is used as fallback so we can ignore assignment + self.project_id: str = project_id if project_id else self.hook.project_id # type: ignore[assignment] self.job_template: DataProcJobBuilder | None = None self.job: dict | None = None self.dataproc_job_id = None @@ -1081,8 +1082,8 @@ def execute(self, context: Context): job_id = job_object.reference.job_id self.log.info("Job %s submitted successfully.", job_id) # Save data required for extra links no matter what the job status will be - DataprocLink.persist( - context=context, task_instance=self, url=DATAPROC_JOB_LINK_DEPRECATED, resource=job_id + DataprocJobLink.persist( + context=context, operator=self, job_id=job_id, project_id=self.project_id, region=self.region ) if self.deferrable: @@ -1184,7 +1185,7 @@ class DataprocSubmitPigJobOperator(DataprocJobBaseOperator): ui_color = "#0273d4" job_type = "pig_job" - operator_extra_links = (DataprocLink(),) + operator_extra_links = (DataprocJobLink(),) def __init__( self, diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 29fd2c8073a76..730f0b2a5e861 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -1089,8 +1089,6 @@ extra-links: - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryGroupLink - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryLink - airflow.providers.google.cloud.links.datacatalog.DataCatalogTagTemplateLink - - airflow.providers.google.cloud.links.dataproc.DataprocLink - - airflow.providers.google.cloud.links.dataproc.DataprocListLink - airflow.providers.google.cloud.links.dataproc.DataprocClusterLink - airflow.providers.google.cloud.links.dataproc.DataprocJobLink - airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink diff --git a/tests/providers/google/cloud/operators/test_dataproc.py b/tests/providers/google/cloud/operators/test_dataproc.py index 6ddaec8be8928..8eafa6a00be42 100644 --- a/tests/providers/google/cloud/operators/test_dataproc.py +++ b/tests/providers/google/cloud/operators/test_dataproc.py @@ -33,8 +33,6 @@ ) from airflow.models import DAG, DagBag from airflow.providers.google.cloud.links.dataproc import ( - DATAPROC_CLUSTER_LINK_DEPRECATED, - DATAPROC_JOB_LINK_DEPRECATED, DataprocClusterLink, DataprocJobLink, DataprocWorkflowLink, @@ -49,7 +47,6 @@ DataprocGetBatchOperator, DataprocInstantiateInlineWorkflowTemplateOperator, DataprocInstantiateWorkflowTemplateOperator, - DataprocLink, DataprocListBatchesOperator, DataprocScaleClusterOperator, DataprocSubmitHadoopJobOperator, @@ -242,10 +239,9 @@ f"project={GCP_PROJECT}" ) DATAPROC_JOB_CONF_EXPECTED = { - "resource": TEST_JOB_ID, + "job_id": TEST_JOB_ID, "region": GCP_REGION, "project_id": GCP_PROJECT, - "url": DATAPROC_JOB_LINK_DEPRECATED, } DATAPROC_JOB_EXPECTED = { "job_id": TEST_JOB_ID, @@ -253,10 +249,9 @@ "project_id": GCP_PROJECT, } DATAPROC_CLUSTER_CONF_EXPECTED = { - "resource": CLUSTER_NAME, + "cluster_id": CLUSTER_NAME, "region": GCP_REGION, "project_id": GCP_PROJECT, - "url": DATAPROC_CLUSTER_LINK_DEPRECATED, } DATAPROC_CLUSTER_EXPECTED = { "cluster_id": CLUSTER_NAME, @@ -781,7 +776,9 @@ class TestDataprocClusterScaleOperator(DataprocClusterTestBase): def setup_class(cls): super().setup_class() cls.extra_links_expected_calls_base = [ - call.ti.xcom_push(execution_date=None, key="conf", value=DATAPROC_CLUSTER_CONF_EXPECTED) + call.ti.xcom_push( + execution_date=None, key="dataproc_cluster", value=DATAPROC_CLUSTER_CONF_EXPECTED + ) ] def test_deprecation_warning(self): @@ -827,7 +824,7 @@ def test_execute(self, mock_hook): self.extra_links_manager_mock.assert_has_calls(expected_calls, any_order=False) self.mock_ti.xcom_push.assert_called_once_with( - key="conf", + key="dataproc_cluster", value=DATAPROC_CLUSTER_CONF_EXPECTED, execution_date=None, ) @@ -855,17 +852,17 @@ def test_scale_cluster_operator_extra_links(dag_maker, create_task_instance_of_o # Assert operator links for serialized DAG assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}} + {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink": {}} ] # Assert operator link types are preserved during deserialization - assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink) + assert isinstance(deserialized_task.operator_extra_links[0], DataprocClusterLink) # Assert operator link is empty when no XCom push occurred - assert ti.task.get_extra_links(ti, DataprocLink.name) == "" + assert ti.task.get_extra_links(ti, DataprocClusterLink.name) == "" # Assert operator link is empty for deserialized task when no XCom push occurred - assert deserialized_task.get_extra_links(ti, DataprocLink.name) == "" + assert deserialized_task.get_extra_links(ti, DataprocClusterLink.name) == "" ti.xcom_push( key="conf", @@ -873,10 +870,10 @@ def test_scale_cluster_operator_extra_links(dag_maker, create_task_instance_of_o ) # Assert operator links are preserved in deserialized tasks after execution - assert deserialized_task.get_extra_links(ti, DataprocLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED + assert deserialized_task.get_extra_links(ti, DataprocClusterLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED # Assert operator links after execution - assert ti.task.get_extra_links(ti, DataprocLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED + assert ti.task.get_extra_links(ti, DataprocClusterLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED class TestDataprocClusterDeleteOperator: @@ -1817,7 +1814,7 @@ class TestDataProcSparkOperator(DataprocJobTestBase): @classmethod def setup_class(cls): cls.extra_links_expected_calls = [ - call.ti.xcom_push(execution_date=None, key="conf", value=DATAPROC_JOB_CONF_EXPECTED), + call.ti.xcom_push(execution_date=None, key="dataproc_job", value=DATAPROC_JOB_CONF_EXPECTED), call.hook().wait_for_job(job_id=TEST_JOB_ID, region=GCP_REGION, project_id=GCP_PROJECT), ] @@ -1864,7 +1861,7 @@ def test_execute(self, mock_hook, mock_uuid): op.execute(context=self.mock_context) self.mock_ti.xcom_push.assert_called_once_with( - key="conf", value=DATAPROC_JOB_CONF_EXPECTED, execution_date=None + key="dataproc_job", value=DATAPROC_JOB_CONF_EXPECTED, execution_date=None ) # Test whether xcom push occurs before polling for job @@ -1893,25 +1890,25 @@ def test_submit_spark_job_operator_extra_links(mock_hook, dag_maker, create_task # Assert operator links for serialized DAG assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}} + {"airflow.providers.google.cloud.links.dataproc.DataprocJobLink": {}} ] # Assert operator link types are preserved during deserialization - assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink) + assert isinstance(deserialized_task.operator_extra_links[0], DataprocJobLink) # Assert operator link is empty when no XCom push occurred - assert ti.task.get_extra_links(ti, DataprocLink.name) == "" + assert ti.task.get_extra_links(ti, DataprocJobLink.name) == "" # Assert operator link is empty for deserialized task when no XCom push occurred - assert deserialized_task.get_extra_links(ti, DataprocLink.name) == "" + assert deserialized_task.get_extra_links(ti, DataprocJobLink.name) == "" ti.xcom_push(key="conf", value=DATAPROC_JOB_CONF_EXPECTED) # Assert operator links after task execution - assert ti.task.get_extra_links(ti, DataprocLink.name) == DATAPROC_JOB_LINK_EXPECTED + assert ti.task.get_extra_links(ti, DataprocJobLink.name) == DATAPROC_JOB_LINK_EXPECTED # Assert operator links are preserved in deserialized tasks - link = deserialized_task.get_extra_links(ti, DataprocLink.name) + link = deserialized_task.get_extra_links(ti, DataprocJobLink.name) assert link == DATAPROC_JOB_LINK_EXPECTED