Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
account from the list granting this role to the originating account (templated).
:param labels: a dictionary containing labels for the table, passed to BigQuery
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand All @@ -198,6 +200,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
deferrable: bool = False,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(sql=sql, **kwargs)
Expand All @@ -208,6 +211,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.labels = labels
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -240,6 +244,7 @@ def execute(self, context: Context):
conn_id=self.gcp_conn_id,
job_id=job.job_id,
project_id=hook.project_id,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -288,6 +293,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
account from the list granting this role to the originating account (templated).
:param labels: a dictionary containing labels for the table, passed to BigQuery
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand All @@ -312,6 +319,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
deferrable: bool = False,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs)
Expand All @@ -321,6 +329,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.labels = labels
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -360,6 +369,7 @@ def execute(self, context: Context) -> None: # type: ignore[override]
sql=self.sql,
pass_value=self.pass_value,
tolerance=self.tol,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -414,6 +424,8 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
account from the list granting this role to the originating account (templated).
:param labels: a dictionary containing labels for the table, passed to BigQuery
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand All @@ -439,6 +451,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
deferrable: bool = False,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(
Expand All @@ -455,6 +468,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.labels = labels
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -498,6 +512,7 @@ def execute(self, context: Context):
days_back=self.days_back,
ratio_formula=self.ratio_formula,
ignore_zero=self.ignore_zero,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -794,6 +809,8 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param deferrable: Run operator in the deferrable mode
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled. Deprecated.
Expand Down Expand Up @@ -822,6 +839,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
deferrable: bool = False,
delegate_to: str | None = None,
poll_interval: float = 4.0,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -840,6 +858,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.project_id = project_id
self.deferrable = deferrable
self.poll_interval = poll_interval

def _submit_job(
self,
Expand Down Expand Up @@ -915,6 +934,7 @@ def execute(self, context: Context):
dataset_id=self.dataset_id,
table_id=self.table_id,
project_id=hook.project_id,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -2630,7 +2650,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
:param result_retry: How to retry the `result` call that retrieves rows
:param result_timeout: The number of seconds to wait for `result` method before using `result_retry`
:param deferrable: Run operator in the deferrable mode
:param poll_interval: polling period in seconds to check for the status of job. Defaults to 4 seconds.
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
Defaults to 4 seconds.
"""

template_fields: Sequence[str] = (
Expand Down