diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index c835ead045643..a3d19c4b9cc6c 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -57,6 +57,7 @@ from airflow.exceptions import AirflowException from airflow.providers.common.sql.hooks.sql import DbApiHook +from airflow.providers.google.cloud.utils.bigquery import bq_cast from airflow.providers.google.common.consts import CLIENT_INFO from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook, get_field from airflow.utils.helpers import convert_camel_to_snake @@ -2738,7 +2739,7 @@ def next(self) -> list | None: rows = query_results["rows"] for dict_row in rows: - typed_row = [_bq_cast(vs["v"], col_types[idx]) for idx, vs in enumerate(dict_row["f"])] + typed_row = [bq_cast(vs["v"], col_types[idx]) for idx, vs in enumerate(dict_row["f"])] self.buffer.append(typed_row) if not self.page_token: @@ -2843,25 +2844,6 @@ def _escape(s: str) -> str: return e -def _bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str: - """ - Helper method that casts a BigQuery row to the appropriate data types. - This is useful because BigQuery returns all fields as strings. - """ - if string_field is None: - return None - elif bq_type == "INTEGER": - return int(string_field) - elif bq_type in ("FLOAT", "TIMESTAMP"): - return float(string_field) - elif bq_type == "BOOLEAN": - if string_field not in ["true", "false"]: - raise ValueError(f"{string_field} must have value 'true' or 'false'") - return string_field == "true" - else: - return string_field - - def split_tablename( table_input: str, default_project_id: str, var_name: str | None = None ) -> tuple[str, str, str]: @@ -3068,7 +3050,7 @@ def get_records(self, query_results: dict[str, Any]) -> list[Any]: fields = query_results["schema"]["fields"] col_types = [field["type"] for field in fields] for dict_row in rows: - typed_row = [_bq_cast(vs["v"], col_types[idx]) for idx, vs in enumerate(dict_row["f"])] + typed_row = [bq_cast(vs["v"], col_types[idx]) for idx, vs in enumerate(dict_row["f"])] buffer.append(typed_row) return buffer diff --git a/airflow/providers/google/cloud/utils/bigquery.py b/airflow/providers/google/cloud/utils/bigquery.py new file mode 100644 index 0000000000000..03753c2423691 --- /dev/null +++ b/airflow/providers/google/cloud/utils/bigquery.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + + +def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str: + """ + Helper method that casts a BigQuery row to the appropriate data types. + This is useful because BigQuery returns all fields as strings. + """ + if string_field is None: + return None + elif bq_type == "INTEGER": + return int(string_field) + elif bq_type in ("FLOAT", "TIMESTAMP"): + return float(string_field) + elif bq_type == "BOOLEAN": + if string_field not in ["true", "false"]: + raise ValueError(f"{string_field} must have value 'true' or 'false'") + return string_field == "true" + else: + return string_field