Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
from apitools.base.py.exceptions import HttpError, HttpForbiddenError
except ImportError:
pass

Expand Down Expand Up @@ -290,9 +290,10 @@ def get_query_location(self, project_id, query, use_legacy_sql):
"""
Get the location of tables referenced in a query.

This method returns the location of the first referenced table in the query
and depends on the BigQuery service to provide error handling for
queries that reference tables in multiple locations.
This method returns the location of the first available referenced
table for user in the query and depends on the BigQuery service to
provide error handling for queries that reference tables in multiple
locations.
"""
reference = bigquery.JobReference(
jobId=uuid.uuid4().hex, projectId=project_id)
Expand All @@ -318,17 +319,25 @@ def get_query_location(self, project_id, query, use_legacy_sql):

referenced_tables = response.statistics.query.referencedTables
if referenced_tables: # Guards against both non-empty and non-None
table = referenced_tables[0]
location = self.get_table_location(
table.projectId, table.datasetId, table.tableId)
_LOGGER.info(
"Using location %r from table %r referenced by query %s",
location,
table,
query)
return location

_LOGGER.debug("Query %s does not reference any tables.", query)
for table in referenced_tables:
try:
location = self.get_table_location(
table.projectId, table.datasetId, table.tableId)
except HttpForbiddenError:
# Permission access for table (i.e. from authorized_view),
# try next one
continue
_LOGGER.info(
"Using location %r from table %r referenced by query %s",
location,
table,
query)
return location

_LOGGER.debug(
"Query %s does not reference any tables or "
"you don't have permission to inspect them.",
query)
return None

@retry.with_exponential_backoff(
Expand Down
41 changes: 40 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import apache_beam as beam
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery_test import HttpError
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
from apache_beam.io.gcp.bigquery_tools import BigQueryJobTypes
Expand All @@ -50,6 +49,15 @@
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions

# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError, HttpForbiddenError
except ImportError:
HttpError = None
HttpForbiddenError = None
# pylint: enable=wrong-import-order, wrong-import-position


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestTableSchemaParser(unittest.TestCase):
Expand Down Expand Up @@ -267,6 +275,37 @@ def test_wait_for_job_retries_fail(self):
'The maximum number of retries has been reached',
str(context.exception))

def test_get_query_location(self):
client = mock.Mock()
query = """
SELECT
av.column1, table.column1
FROM `dataset.authorized_view` as av
JOIN `dataset.table` as table ON av.column2 = table.column2
"""
job = mock.MagicMock(spec=bigquery.Job)
job.statistics.query.referencedTables = [
bigquery.TableReference(
projectId="first_project_id",
datasetId="first_dataset",
tableId="table_used_by_authorized_view"),
bigquery.TableReference(
projectId="second_project_id",
datasetId="second_dataset",
tableId="table"),
]
client.jobs.Insert.return_value = job

wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
wrapper.get_table_location = mock.Mock(
side_effect=[
HttpForbiddenError(response={'status': '404'}, url='', content=''),
"US"
])
location = wrapper.get_query_location(
project_id="second_project_id", query=query, use_legacy_sql=False)
self.assertEqual("US", location)


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryReader(unittest.TestCase):
Expand Down