Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions airflow/providers/amazon/aws/transfers/s3_to_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,6 @@ def _build_copy_query(self, copy_destination: str, credentials_block: str, copy_
{copy_options};
"""

def _get_table_primary_key(self, postgres_hook):
sql = """
select kcu.column_name
from information_schema.table_constraints tco
join information_schema.key_column_usage kcu
on kcu.constraint_name = tco.constraint_name
and kcu.constraint_schema = tco.constraint_schema
and kcu.constraint_name = tco.constraint_name
where tco.constraint_type = 'PRIMARY KEY'
and kcu.table_schema = %s
and kcu.table_name = %s
"""

result = postgres_hook.get_records(sql, (self.schema, self.table))

if len(result) == 0:
raise AirflowException(
f"""
No primary key on {self.schema}.{self.table}.
Please provide keys on 'upsert_keys' parameter.
"""
)
return [row[0] for row in result]

def execute(self, context) -> None:
postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
conn = S3Hook.get_connection(conn_id=self.aws_conn_id)
Expand Down