diff --git a/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py b/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py index 593688dcbb03b..58c4d21e1c75d 100644 --- a/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py +++ b/airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py @@ -20,7 +20,10 @@ from datetime import datetime from airflow import DAG -from airflow.providers.amazon.aws.operators.glacier import GlacierCreateJobOperator +from airflow.providers.amazon.aws.operators.glacier import ( + GlacierCreateJobOperator, + GlacierUploadArchiveOperator, +) from airflow.providers.amazon.aws.sensors.glacier import GlacierJobOperationSensor from airflow.providers.amazon.aws.transfers.glacier_to_gcs import GlacierToGCSOperator @@ -46,6 +49,12 @@ ) # [END howto_sensor_glacier_job_operation] + # [START howto_operator_glacier_upload_archive] + upload_archive_to_glacier = GlacierUploadArchiveOperator( + vault_name=VAULT_NAME, body=b'Test Data', task_id="upload_data_to_glacier" + ) + # [END howto_operator_glacier_upload_archive] + # [START howto_transfer_glacier_to_gcs] transfer_archive_to_gcs = GlacierToGCSOperator( task_id="transfer_archive_to_gcs", @@ -60,4 +69,4 @@ ) # [END howto_transfer_glacier_to_gcs] - create_glacier_job >> wait_for_operation_complete >> transfer_archive_to_gcs + create_glacier_job >> wait_for_operation_complete >> upload_archive_to_glacier >> transfer_archive_to_gcs diff --git a/airflow/providers/amazon/aws/operators/glacier.py b/airflow/providers/amazon/aws/operators/glacier.py index 6d96a5de7fd97..4e7c8b5e17421 100644 --- a/airflow/providers/amazon/aws/operators/glacier.py +++ b/airflow/providers/amazon/aws/operators/glacier.py @@ -54,3 +54,53 @@ def __init__( def execute(self, context: Context): hook = GlacierHook(aws_conn_id=self.aws_conn_id) return hook.retrieve_inventory(vault_name=self.vault_name) + + +class GlacierUploadArchiveOperator(BaseOperator): + """ + This operator add an archive to an Amazon S3 Glacier vault + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlacierUploadArchiveOperator` + + :param vault_name: The name of the vault + :param body: A bytes or seekable file-like object. The data to upload. + :param checksum: The SHA256 tree hash of the data being uploaded. + This parameter is automatically populated if it is not provided + :param archive_description: The description of the archive you are uploading + :param account_id: (Optional) AWS account ID of the account that owns the vault. + Defaults to the credentials used to sign the request + :param aws_conn_id: The reference to the AWS connection details + """ + + template_fields: Sequence[str] = ("vault_name",) + + def __init__( + self, + *, + vault_name: str, + body: object, + checksum: str | None = None, + archive_description: str | None = None, + account_id: str | None = None, + aws_conn_id="aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.account_id = account_id + self.vault_name = vault_name + self.body = body + self.checksum = checksum + self.archive_description = archive_description + + def execute(self, context: Context): + hook = GlacierHook(aws_conn_id=self.aws_conn_id) + return hook.get_conn().upload_archive( + accountId=self.account_id, + vaultName=self.vault_name, + archiveDescription=self.archive_description, + body=self.body, + checksum=self.checksum, + ) diff --git a/docs/apache-airflow-providers-amazon/operators/glacier.rst b/docs/apache-airflow-providers-amazon/operators/glacier.rst index df9b0fcf8b77c..c1c40525a56f6 100644 --- a/docs/apache-airflow-providers-amazon/operators/glacier.rst +++ b/docs/apache-airflow-providers-amazon/operators/glacier.rst @@ -46,6 +46,20 @@ This Operator returns a dictionary of information related to the initiated job s :start-after: [START howto_operator_glacier_create_job] :end-before: [END howto_operator_glacier_create_job] +.. _howto/operator:GlacierUploadArchiveOperator: + +Upload archive to an Amazon Glacier +=================================== + +To add an archive to an Amazon S3 Glacier vault +use :class:`~airflow.providers.amazon.aws.transfers.glacier_to_gcs.GlacierUploadArchiveOperator` + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_glacier_upload_archive] + :end-before: [END howto_operator_glacier_upload_archive] + Sensors ------- diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 181a1079f4ee4..2e67be35b9fb2 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1253,6 +1253,7 @@ securityManager seealso Seedlist seedlist +seekable segmentGranularity Sendgrid sendgrid diff --git a/tests/providers/amazon/aws/operators/test_glacier.py b/tests/providers/amazon/aws/operators/test_glacier.py index a1b0628301118..51600eca0b5f9 100644 --- a/tests/providers/amazon/aws/operators/test_glacier.py +++ b/tests/providers/amazon/aws/operators/test_glacier.py @@ -19,7 +19,10 @@ from unittest import TestCase, mock -from airflow.providers.amazon.aws.operators.glacier import GlacierCreateJobOperator +from airflow.providers.amazon.aws.operators.glacier import ( + GlacierCreateJobOperator, + GlacierUploadArchiveOperator, +) AWS_CONN_ID = "aws_default" BUCKET_NAME = "airflow_bucket" @@ -38,3 +41,15 @@ def test_execute(self, hook_mock): op.execute(mock.MagicMock()) hook_mock.assert_called_once_with(aws_conn_id=AWS_CONN_ID) hook_mock.return_value.retrieve_inventory.assert_called_once_with(vault_name=VAULT_NAME) + + +class TestGlacierUploadArchiveOperator(TestCase): + @mock.patch("airflow.providers.amazon.aws.operators.glacier.GlacierHook.get_conn") + def test_execute(self, hook_mock): + op = GlacierUploadArchiveOperator( + aws_conn_id=AWS_CONN_ID, vault_name=VAULT_NAME, body=b'Test Data', task_id=TASK_ID + ) + op.execute(mock.MagicMock()) + hook_mock.return_value.upload_archive.assert_called_once_with( + accountId=None, vaultName=VAULT_NAME, archiveDescription=None, body=b'Test Data', checksum=None + )