From 4de32b52062a57c861525fb07dcf24a2d065aa30 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 15 Mar 2023 20:25:08 +0530 Subject: [PATCH 1/4] Expose missing poll_interval in Bigquery operator --- .../providers/google/cloud/operators/bigquery.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 08eb9b7a74ce0..7eba9d111e1ea 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -177,6 +177,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator): account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode + :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -198,6 +199,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, deferrable: bool = False, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__(sql=sql, **kwargs) @@ -208,6 +210,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.labels = labels self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, @@ -240,6 +243,7 @@ def execute(self, context: Context): conn_id=self.gcp_conn_id, job_id=job.job_id, project_id=hook.project_id, + poll_interval=self.poll_interval ), method_name="execute_complete", ) @@ -288,6 +292,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator): account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode + :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -312,6 +317,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, deferrable: bool = False, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs) @@ -321,6 +327,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.labels = labels self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, @@ -360,6 +367,7 @@ def execute(self, context: Context) -> None: # type: ignore[override] sql=self.sql, pass_value=self.pass_value, tolerance=self.tol, + poll_interval=self.poll_interval ), method_name="execute_complete", ) @@ -414,6 +422,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode + :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -439,6 +448,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, labels: dict | None = None, deferrable: bool = False, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__( @@ -498,6 +508,7 @@ def execute(self, context: Context): days_back=self.days_back, ratio_formula=self.ratio_formula, ignore_zero=self.ignore_zero, + poll_interval=self.poll_interval ), method_name="execute_complete", ) @@ -794,6 +805,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator): Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param deferrable: Run operator in the deferrable mode + :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled. Deprecated. @@ -822,6 +834,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, deferrable: bool = False, delegate_to: str | None = None, + poll_interval: float = 4.0, **kwargs, ) -> None: super().__init__(**kwargs) @@ -840,6 +853,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.project_id = project_id self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, @@ -915,6 +929,7 @@ def execute(self, context: Context): dataset_id=self.dataset_id, table_id=self.table_id, project_id=hook.project_id, + poll_interval=self.poll_interval ), method_name="execute_complete", ) From d1321c893d327460c4e5bb08b336aa5f4d2008ea Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 15 Mar 2023 20:29:06 +0530 Subject: [PATCH 2/4] Expose missing poll_interval in Bigquery operator --- airflow/providers/google/cloud/operators/bigquery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 7eba9d111e1ea..5adecd4ecb3c2 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -243,7 +243,7 @@ def execute(self, context: Context): conn_id=self.gcp_conn_id, job_id=job.job_id, project_id=hook.project_id, - poll_interval=self.poll_interval + poll_interval=self.poll_interval, ), method_name="execute_complete", ) @@ -367,7 +367,7 @@ def execute(self, context: Context) -> None: # type: ignore[override] sql=self.sql, pass_value=self.pass_value, tolerance=self.tol, - poll_interval=self.poll_interval + poll_interval=self.poll_interval, ), method_name="execute_complete", ) @@ -508,7 +508,7 @@ def execute(self, context: Context): days_back=self.days_back, ratio_formula=self.ratio_formula, ignore_zero=self.ignore_zero, - poll_interval=self.poll_interval + poll_interval=self.poll_interval, ), method_name="execute_complete", ) @@ -929,7 +929,7 @@ def execute(self, context: Context): dataset_id=self.dataset_id, table_id=self.table_id, project_id=hook.project_id, - poll_interval=self.poll_interval + poll_interval=self.poll_interval, ), method_name="execute_complete", ) From 8a67f8d424c9af37e6c9a72b3d075a3575f71e6f Mon Sep 17 00:00:00 2001 From: Pankaj Date: Thu, 16 Mar 2023 00:11:39 +0530 Subject: [PATCH 3/4] Fix test --- airflow/providers/google/cloud/operators/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 5adecd4ecb3c2..73bb5180e20df 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -465,6 +465,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.labels = labels self.deferrable = deferrable + self.poll_interval = poll_interval def _submit_job( self, From 20422ea19b9a328f30cd826abc857355c87ed1a6 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 17 Mar 2023 00:36:16 +0530 Subject: [PATCH 4/4] Apply review suggestions --- .../providers/google/cloud/operators/bigquery.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 73bb5180e20df..9de5663420bf6 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -177,7 +177,8 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator): account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode - :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -292,7 +293,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator): account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode - :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -422,7 +424,8 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat account from the list granting this role to the originating account (templated). :param labels: a dictionary containing labels for the table, passed to BigQuery :param deferrable: Run operator in the deferrable mode - :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = ( @@ -806,7 +809,8 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator): Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param deferrable: Run operator in the deferrable mode - :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled. Deprecated. @@ -2646,7 +2650,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator): :param result_retry: How to retry the `result` call that retrieves rows :param result_timeout: The number of seconds to wait for `result` method before using `result_retry` :param deferrable: Run operator in the deferrable mode - :param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds. + :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. + Defaults to 4 seconds. """ template_fields: Sequence[str] = (