Skip to content

SqlToS3Operator not able to write data with partition_cols provided. #30382

Description

@amolsr

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

I am using the standard operator version which comes with apache/airflow:2.5.2.

Apache Airflow version

2.5.2

Operating System

Ubuntu 22.04.2 LTS

Deployment

Official Apache Airflow Helm Chart

Deployment details

I have used a simple docker compose setup can using the same in my local.

What happened

I am using SqlToS3Operator in my Dag. I need to store the data using the partition col. The operator writes the data in a temporary file but in my case it should be a folder. I am getting the below error for the same.

[2023-03-31, 03:47:57 UTC] {sql_to_s3.py:175} INFO - Writing data to temp file
[2023-03-31, 03:47:57 UTC] {taskinstance.py:1775} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/transfers/sql_to_s3.py", line 176, in execute
    getattr(data_df, file_options.function)(tmp_file.name, **self.pd_kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/util/_decorators.py", line 207, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/core/frame.py", line 2685, in to_parquet
    **kwargs,
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/parquet.py", line 423, in to_parquet
    **kwargs,
  File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/parquet.py", line 190, in write
    **kwargs,
  File "/home/airflow/.local/lib/python3.7/site-packages/pyarrow/parquet/__init__.py", line 3244, in write_to_dataset
    max_rows_per_group=row_group_size)
  File "/home/airflow/.local/lib/python3.7/site-packages/pyarrow/dataset.py", line 989, in write_dataset
    min_rows_per_group, max_rows_per_group, create_dir
  File "pyarrow/_dataset.pyx", line 2775, in pyarrow._dataset._filesystemdataset_write
  File "pyarrow/error.pxi", line 113, in pyarrow.lib.check_status
NotADirectoryError: [Errno 20] Cannot create directory '/tmp/tmp3z4dpv_p.parquet/application_createdAt=2020-06-05 11:47:44.000000000'. Detail: [errno 20] Not a directory

What you think should happen instead

The Operator should have supported the partition col as well.

How to reproduce

I am using the below code snipet for the same.

    sql_to_s3_task = SqlToS3Operator(
        task_id="sql_to_s3_task",
        sql_conn_id="mysql_con",
        query=sql,
        s3_bucket=Variable.get("AWS_S3_BUCKET"),
        aws_conn_id="aws_con",
        file_format="parquet",
        s3_key="Fact_applications",
        pd_kwargs={
            "partition_cols":['application_createdAt']
        },
        replace=True,
    )

This could be using to reproduce the same.

Anything else

I believe this logic should be updated for the same.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions