From 2088465a538692ff715a321fdcb1792d2d4d995c Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 10 Jun 2023 12:10:24 +0200 Subject: [PATCH 1/3] Add parquet_row_group_size to BaseSQLToGCSOperator operator Signed-off-by: Hussein Awala --- .../google/cloud/transfers/sql_to_gcs.py | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py index 3bd8af683f7fe..456037beee79c 100644 --- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py @@ -23,7 +23,7 @@ import json import os from tempfile import NamedTemporaryFile -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, Any, Sequence import pyarrow as pa import pyarrow.parquet as pq @@ -82,6 +82,7 @@ class BaseSQLToGCSOperator(BaseOperator): :param write_on_empty: Optional parameter to specify whether to write a file if the export does not return any rows. Default is False so we will not write a file if the export returns no rows. + :param parquet_row_group_size: The size of parquet row groups when using parquet format. """ template_fields: Sequence[str] = ( @@ -119,6 +120,7 @@ def __init__( exclude_columns: set | None = None, partition_columns: list | None = None, write_on_empty: bool = False, + parquet_row_group_size: int = 1, **kwargs, ) -> None: super().__init__(**kwargs) @@ -143,6 +145,7 @@ def __init__( self.exclude_columns = exclude_columns self.partition_columns = partition_columns self.write_on_empty = write_on_empty + self.parquet_row_group_size = parquet_row_group_size def execute(self, context: Context): if self.partition_columns: @@ -212,6 +215,15 @@ def convert_types(self, schema, col_type_dict, row) -> list: for name, value in zip(schema, row) ] + @staticmethod + def _write_rows_to_parquet(parquet_writer: pq.ParquetWriter, rows): + rows_pydic: dict[str, list[Any]] = {col: [] for col in parquet_writer.schema.names} + for row in rows: + for ind, col in enumerate(parquet_writer.schema.names): + rows_pydic[col].append(row[ind]) + tbl = pa.Table.from_pydict(rows_pydic, parquet_writer.schema) + parquet_writer.write_table(tbl) + def _write_local_data_files(self, cursor): """ Takes a cursor, and writes results to a local file. @@ -233,6 +245,7 @@ def _write_local_data_files(self, cursor): if self.export_format == "parquet": parquet_schema = self._convert_parquet_schema(cursor) parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema) + rows_buffer = [] prev_partition_values = None curr_partition_values = None @@ -253,6 +266,10 @@ def _write_local_data_files(self, cursor): file_no += 1 if self.export_format == "parquet": + # Write out the remaining rows in the buffer + if rows_buffer: + self._write_rows_to_parquet(parquet_writer, rows_buffer) + rows_buffer = [] parquet_writer.close() file_to_upload["partition_values"] = prev_partition_values @@ -279,9 +296,10 @@ def _write_local_data_files(self, cursor): row = self.convert_types(schema, col_type_dict, row) if self.null_marker is not None: row = [value if value is not None else self.null_marker for value in row] - row_pydic = {col: [value] for col, value in zip(schema, row)} - tbl = pa.Table.from_pydict(row_pydic, parquet_schema) - parquet_writer.write_table(tbl) + rows_buffer.append(row) + if len(rows_buffer) >= self.parquet_row_group_size: + self._write_rows_to_parquet(parquet_writer, rows_buffer) + rows_buffer = [] else: row = self.convert_types(schema, col_type_dict, row) row_dict = dict(zip(schema, row)) @@ -301,6 +319,10 @@ def _write_local_data_files(self, cursor): file_no += 1 if self.export_format == "parquet": + # Write out the remaining rows in the buffer + if rows_buffer: + self._write_rows_to_parquet(parquet_writer, rows_buffer) + rows_buffer = [] parquet_writer.close() file_to_upload["partition_values"] = curr_partition_values @@ -312,6 +334,10 @@ def _write_local_data_files(self, cursor): parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema) if self.export_format == "parquet": + # Write out the remaining rows in the buffer + if rows_buffer: + self._write_rows_to_parquet(parquet_writer, rows_buffer) + rows_buffer = [] parquet_writer.close() # Last file may have 0 rows, don't yield if empty # However, if it is the first file and self.write_on_empty is True, then yield to write an empty file @@ -349,7 +375,7 @@ def _configure_csv_file(self, file_handle, schema): csv_writer.writerow(schema) return csv_writer - def _configure_parquet_file(self, file_handle, parquet_schema): + def _configure_parquet_file(self, file_handle, parquet_schema) -> pq.ParquetWriter: parquet_writer = pq.ParquetWriter(file_handle.name, parquet_schema) return parquet_writer From d47ed9b34df0435960dfd90e4d05544d044d6b51 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 10 Jun 2023 12:33:03 +0200 Subject: [PATCH 2/3] add a unit test Signed-off-by: Hussein Awala --- .../google/cloud/transfers/sql_to_gcs.py | 3 +- .../google/cloud/transfers/test_sql_to_gcs.py | 36 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py index 456037beee79c..c73275f290aee 100644 --- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py @@ -82,7 +82,8 @@ class BaseSQLToGCSOperator(BaseOperator): :param write_on_empty: Optional parameter to specify whether to write a file if the export does not return any rows. Default is False so we will not write a file if the export returns no rows. - :param parquet_row_group_size: The size of parquet row groups when using parquet format. + :param parquet_row_group_size: The approximate number of rows in each row group + when using parquet format. """ template_fields: Sequence[str] = ( diff --git a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py index 28243633053d9..dd0e5a42d639e 100644 --- a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py @@ -449,6 +449,42 @@ def test__write_local_data_files_parquet(self): df = pd.read_parquet(file.name) assert df.equals(OUTPUT_DF) + def test__write_local_data_files_parquet_with_row_size(self): + import math + + import pyarrow.parquet as pq + + op = DummySQLToGCSOperator( + sql=SQL, + bucket=BUCKET, + filename=FILENAME, + task_id=TASK_ID, + schema_filename=SCHEMA_FILE, + export_format="parquet", + gzip=False, + schema=SCHEMA, + gcp_conn_id="google_cloud_default", + parquet_row_group_size=8, + ) + input_data = INPUT_DATA * 10 + output_df = pd.DataFrame([["convert_type_return_value"] * 3] * 30, columns=COLUMNS) + + cursor = MagicMock() + cursor.__iter__.return_value = input_data + cursor.description = CURSOR_DESCRIPTION + + files = op._write_local_data_files(cursor) + file = next(files)["file_handle"] + file.flush() + df = pd.read_parquet(file.name) + assert df.equals(output_df) + parquet_file = pq.ParquetFile(file.name) + assert parquet_file.num_row_groups == math.ceil((len(INPUT_DATA) * 10) / op.parquet_row_group_size) + tolerance = 1 + for i in range(parquet_file.num_row_groups): + row_group_size = parquet_file.metadata.row_group(i).num_rows + assert row_group_size == op.parquet_row_group_size or (tolerance := tolerance - 1) >= 0 + def test__write_local_data_files_json_with_exclude_columns(self): op = DummySQLToGCSOperator( sql=SQL, From e0a0454ede8c9e083e2323858bc6966060dfcd2b Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 10 Jun 2023 12:37:15 +0200 Subject: [PATCH 3/3] Improve docstring Signed-off-by: Hussein Awala --- airflow/providers/google/cloud/transfers/sql_to_gcs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py index c73275f290aee..ce283ff866171 100644 --- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py @@ -83,7 +83,9 @@ class BaseSQLToGCSOperator(BaseOperator): export does not return any rows. Default is False so we will not write a file if the export returns no rows. :param parquet_row_group_size: The approximate number of rows in each row group - when using parquet format. + when using parquet format. Using a large row group size can reduce the file size + and improve the performance of reading the data, but it needs more memory to + execute the operator. (default: 1) """ template_fields: Sequence[str] = (