diff --git a/airflow/providers/microsoft/azure/operators/asb.py b/airflow/providers/microsoft/azure/operators/asb.py index 2e7599c469128..3a6bc2800e0b7 100644 --- a/airflow/providers/microsoft/azure/operators/asb.py +++ b/airflow/providers/microsoft/azure/operators/asb.py @@ -15,12 +15,14 @@ # specific language governing permissions and limitations # under the License. import datetime -from typing import TYPE_CHECKING, List, Optional, Sequence, Union +from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook if TYPE_CHECKING: + from azure.servicebus.management._models import AuthorizationRule + from airflow.utils.context import Context @@ -206,6 +208,125 @@ def execute(self, context: "Context") -> None: hook.delete_queue(self.queue_name) +class AzureServiceBusTopicCreateOperator(BaseOperator): + """ + Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AzureServiceBusTopicCreateOperator` + + :param topic_name: Name of the topic. + :param default_message_time_to_live: ISO 8601 default message time span to live value. This is + the duration after which the message expires, starting from when the message is sent to Service + Bus. This is the default value used when TimeToLive is not set on a message itself. + Input value of either type ~datetime.timedelta or string in ISO 8601 duration format + like "PT300S" is accepted. + :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of + memory allocated for the topic. + :param requires_duplicate_detection: A value indicating if this topic requires duplicate + detection. + :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the + duration of the duplicate detection history. The default value is 10 minutes. + Input value of either type ~datetime.timedelta or string in ISO 8601 duration format + like "PT300S" is accepted. + :param enable_batched_operations: Value that indicates whether server-side batched operations + are enabled. + :param size_in_bytes: The size of the topic, in bytes. + :param filtering_messages_before_publishing: Filter messages before publishing. + :param authorization_rules: List of Authorization rules for resource. + :param support_ordering: A value that indicates whether the topic supports ordering. + :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is + automatically deleted. The minimum duration is 5 minutes. + Input value of either type ~datetime.timedelta or string in ISO 8601 duration format + like "PT300S" is accepted. + :param enable_partitioning: A value that indicates whether the topic is to be partitioned + across multiple message brokers. + :param enable_express: A value that indicates whether Express Entities are enabled. An express + queue holds a message in memory temporarily before writing it to persistent storage. + :param user_metadata: Metadata associated with the topic. + :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that + can be accepted by the queue. This feature is only available when using a Premium namespace + and Service Bus API version "2021-05" or higher. + The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024. + """ + + template_fields: Sequence[str] = ("topic_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + topic_name: str, + azure_service_bus_conn_id: str = 'azure_service_bus_default', + default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None, + max_size_in_megabytes: Optional[int] = None, + requires_duplicate_detection: Optional[bool] = None, + duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None, + enable_batched_operations: Optional[bool] = None, + size_in_bytes: Optional[int] = None, + filtering_messages_before_publishing: Optional[bool] = None, + authorization_rules: Optional[List["AuthorizationRule"]] = None, + support_ordering: Optional[bool] = None, + auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None, + enable_partitioning: Optional[bool] = None, + enable_express: Optional[bool] = None, + user_metadata: Optional[str] = None, + max_message_size_in_kilobytes: Optional[int] = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.topic_name = topic_name + self.azure_service_bus_conn_id = azure_service_bus_conn_id + self.default_message_time_to_live = default_message_time_to_live + self.max_size_in_megabytes = max_size_in_megabytes + self.requires_duplicate_detection = requires_duplicate_detection + self.duplicate_detection_history_time_window = duplicate_detection_history_time_window + self.enable_batched_operations = enable_batched_operations + self.size_in_bytes = size_in_bytes + self.filtering_messages_before_publishing = filtering_messages_before_publishing + self.authorization_rules = authorization_rules + self.support_ordering = support_ordering + self.auto_delete_on_idle = auto_delete_on_idle + self.enable_partitioning = enable_partitioning + self.enable_express = enable_express + self.user_metadata = user_metadata + self.max_message_size_in_kilobytes = max_message_size_in_kilobytes + + def execute(self, context: "Context") -> str: + """Creates Topic in Service Bus namespace, by connecting to Service Bus Admin client""" + if self.topic_name is None: + raise TypeError("Topic name cannot be None.") + + # Create the hook + hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + + with hook.get_conn() as service_mgmt_conn: + topic_properties = service_mgmt_conn.get_topic(self.topic_name) + if topic_properties and topic_properties.name == self.topic_name: + self.log.info("Topic name already exists") + return topic_properties.name + topic = service_mgmt_conn.create_topic( + topic_name=self.topic_name, + default_message_time_to_live=self.default_message_time_to_live, + max_size_in_megabytes=self.max_size_in_megabytes, + requires_duplicate_detection=self.requires_duplicate_detection, + duplicate_detection_history_time_window=self.duplicate_detection_history_time_window, + enable_batched_operations=self.enable_batched_operations, + size_in_bytes=self.size_in_bytes, + filtering_messages_before_publishing=self.filtering_messages_before_publishing, + authorization_rules=self.authorization_rules, + support_ordering=self.support_ordering, + auto_delete_on_idle=self.auto_delete_on_idle, + enable_partitioning=self.enable_partitioning, + enable_express=self.enable_express, + user_metadata=self.user_metadata, + max_message_size_in_kilobytes=self.max_message_size_in_kilobytes, + ) + self.log.info("Created Topic %s", topic.name) + return topic.name + + class AzureServiceBusSubscriptionCreateOperator(BaseOperator): """ Create an Azure Service Bus Topic Subscription under a Service Bus Namespace @@ -467,3 +588,45 @@ def execute(self, context: "Context") -> None: # delete subscription with name hook.delete_subscription(self.subscription_name, self.topic_name) + + +class AzureServiceBusTopicDeleteOperator(BaseOperator): + """ + Deletes the topic in the Azure Service Bus namespace + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AzureServiceBusTopicDeleteOperator` + + :param topic_name: Name of the topic to be deleted. + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection `. + """ + + template_fields: Sequence[str] = ("topic_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + topic_name: str, + azure_service_bus_conn_id: str = 'azure_service_bus_default', + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.topic_name = topic_name + self.azure_service_bus_conn_id = azure_service_bus_conn_id + + def execute(self, context: "Context") -> None: + """Delete topic in Service Bus namespace, by connecting to Service Bus Admin client""" + if self.topic_name is None: + raise TypeError("Topic name cannot be None.") + hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + + with hook.get_conn() as service_mgmt_conn: + topic_properties = service_mgmt_conn.get_topic(self.topic_name) + if topic_properties and topic_properties.name == self.topic_name: + service_mgmt_conn.delete_topic(self.topic_name) + self.log.info("Topic %s deleted.", self.topic_name) + else: + self.log.info("Topic %s does not exist.", self.topic_name) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst index 0614d1322461e..5ad6962418597 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst @@ -98,6 +98,43 @@ Below is an example of using this operator to execute an Azure Service Bus Delet :start-after: [START howto_operator_delete_service_bus_queue] :end-before: [END howto_operator_delete_service_bus_queue] +Azure Service Bus Topic Operators +----------------------------------------- +Azure Service Bus Topic based Operators helps to interact with topic in service bus namespace +and it helps to Create, Delete operation for topic. + +.. _howto/operator:AzureServiceBusTopicCreateOperator: + +Create Azure Service Bus Topic +====================================== + +To create Azure service bus topic with specific Parameter you can use +:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicCreateOperator`. + +Below is an example of using this operator to execute an Azure Service Bus Create Topic. + +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_create_service_bus_topic] + :end-before: [END howto_operator_create_service_bus_topic] + +.. _howto/operator:AzureServiceBusTopicDeleteOperator: + +Delete Azure Service Bus Topic +====================================== + +To Delete the Azure service bus topic you can use +:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicDeleteOperator`. + +Below is an example of using this operator to execute an Azure Service Bus Delete topic. + +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_delete_service_bus_topic] + :end-before: [END howto_operator_delete_service_bus_topic] + Azure Service Bus Subscription Operators ----------------------------------------- Azure Service Bus Subscription based Operators helps to interact topic Subscription in service bus namespace diff --git a/tests/providers/microsoft/azure/operators/test_asb.py b/tests/providers/microsoft/azure/operators/test_asb.py index 25a79cbf12f68..d2bdcc90ebfb8 100644 --- a/tests/providers/microsoft/azure/operators/test_asb.py +++ b/tests/providers/microsoft/azure/operators/test_asb.py @@ -28,6 +28,8 @@ AzureServiceBusSendMessageOperator, AzureServiceBusSubscriptionCreateOperator, AzureServiceBusSubscriptionDeleteOperator, + AzureServiceBusTopicCreateOperator, + AzureServiceBusTopicDeleteOperator, AzureServiceBusUpdateSubscriptionOperator, ) @@ -204,6 +206,51 @@ def test_receive_message_queue(self, mock_get_conn): mock_get_conn.assert_has_calls(expected_calls) +class TestABSTopicCreateOperator: + def test_init(self): + """ + Test init by creating AzureServiceBusTopicCreateOperator with task id and topic name, + by asserting the value + """ + asb_create_topic = AzureServiceBusTopicCreateOperator( + task_id="asb_create_topic", + topic_name=TOPIC_NAME, + ) + assert asb_create_topic.task_id == "asb_create_topic" + assert asb_create_topic.topic_name == TOPIC_NAME + + @mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn") + @mock.patch('azure.servicebus.management.TopicProperties') + def test_create_topic(self, mock_topic_properties, mock_get_conn): + """ + Test AzureServiceBusTopicCreateOperator passed with the topic name + mocking the connection details, hook create_topic function + """ + asb_create_topic = AzureServiceBusTopicCreateOperator( + task_id="asb_create_topic", + topic_name=TOPIC_NAME, + ) + mock_topic_properties.name = TOPIC_NAME + mock_get_conn.return_value.__enter__.return_value.create_topic.return_value = mock_topic_properties + + with mock.patch.object(asb_create_topic.log, "info") as mock_log_info: + asb_create_topic.execute(None) + mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME) + + @mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook') + def test_create_subscription_exception(self, mock_sb_admin_client): + """ + Test `AzureServiceBusTopicCreateOperator` functionality to raise AirflowException, + by passing topic name as None and pytest raise Airflow Exception + """ + asb_create_topic_exception = AzureServiceBusTopicCreateOperator( + task_id="create_service_bus_subscription", + topic_name=None, + ) + with pytest.raises(TypeError): + asb_create_topic_exception.execute(None) + + class TestASBCreateSubscriptionOperator: def test_init(self): """ @@ -377,3 +424,60 @@ def test_receive_message_queue(self, mock_get_conn): .__exit__ ] mock_get_conn.assert_has_calls(expected_calls) + + +class TestASBTopicDeleteOperator: + def test_init(self): + """ + Test init by creating AzureServiceBusTopicDeleteOperator with task id, topic name and asserting + with values + """ + asb_delete_topic_operator = AzureServiceBusTopicDeleteOperator( + task_id="asb_delete_topic", + topic_name=TOPIC_NAME, + ) + assert asb_delete_topic_operator.task_id == "asb_delete_topic" + assert asb_delete_topic_operator.topic_name == TOPIC_NAME + + @mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn") + @mock.patch('azure.servicebus.management.TopicProperties') + def test_delete_topic(self, mock_topic_properties, mock_get_conn): + """ + Test AzureServiceBusTopicDeleteOperator by mocking topic name, connection + """ + asb_delete_topic = AzureServiceBusTopicDeleteOperator( + task_id="asb_delete_topic", + topic_name=TOPIC_NAME, + ) + mock_topic_properties.name = TOPIC_NAME + mock_get_conn.return_value.__enter__.return_value.get_topic.return_value = mock_topic_properties + with mock.patch.object(asb_delete_topic.log, "info") as mock_log_info: + asb_delete_topic.execute(None) + mock_log_info.assert_called_with("Topic %s deleted.", TOPIC_NAME) + + @mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn") + def test_delete_topic_not_exists(self, mock_get_conn): + """ + Test AzureServiceBusTopicDeleteOperator by mocking topic name, connection + """ + asb_delete_topic_not_exists = AzureServiceBusTopicDeleteOperator( + task_id="asb_delete_topic_not_exists", + topic_name=TOPIC_NAME, + ) + mock_get_conn.return_value.__enter__.return_value.get_topic.return_value = None + with mock.patch.object(asb_delete_topic_not_exists.log, "info") as mock_log_info: + asb_delete_topic_not_exists.execute(None) + mock_log_info.assert_called_with("Topic %s does not exist.", TOPIC_NAME) + + @mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook') + def test_delete_topic_exception(self, mock_sb_admin_client): + """ + Test `delete_topic` functionality to raise AirflowException, + by passing topic name as None and pytest raise Airflow Exception + """ + asb_delete_topic_exception = AzureServiceBusTopicDeleteOperator( + task_id="delete_service_bus_subscription", + topic_name=None, + ) + with pytest.raises(TypeError): + asb_delete_topic_exception.execute(None) diff --git a/tests/system/providers/microsoft/azure/example_azure_service_bus.py b/tests/system/providers/microsoft/azure/example_azure_service_bus.py index 6295d28b723d8..e4e736d8eb383 100644 --- a/tests/system/providers/microsoft/azure/example_azure_service_bus.py +++ b/tests/system/providers/microsoft/azure/example_azure_service_bus.py @@ -28,6 +28,8 @@ AzureServiceBusSendMessageOperator, AzureServiceBusSubscriptionCreateOperator, AzureServiceBusSubscriptionDeleteOperator, + AzureServiceBusTopicCreateOperator, + AzureServiceBusTopicDeleteOperator, AzureServiceBusUpdateSubscriptionOperator, ) @@ -94,6 +96,12 @@ ) # [END howto_operator_receive_message_service_bus_queue] + # [START howto_operator_create_service_bus_topic] + create_service_bus_topic = AzureServiceBusTopicCreateOperator( + task_id="create_service_bus_topic", topic_name=TOPIC_NAME + ) + # [END howto_operator_create_service_bus_topic] + # [START howto_operator_create_service_bus_subscription] create_service_bus_subscription = AzureServiceBusSubscriptionCreateOperator( task_id="create_service_bus_subscription", @@ -129,6 +137,13 @@ ) # [END howto_operator_delete_service_bus_subscription] + # [START howto_operator_delete_service_bus_topic] + delete_asb_topic = AzureServiceBusTopicDeleteOperator( + task_id="delete_asb_topic", + topic_name=TOPIC_NAME, + ) + # [END howto_operator_delete_service_bus_topic] + # [START howto_operator_delete_service_bus_queue] delete_service_bus_queue = AzureServiceBusDeleteQueueOperator( task_id="delete_service_bus_queue", queue_name=QUEUE_NAME, trigger_rule="all_done" @@ -137,6 +152,7 @@ chain( create_service_bus_queue, + create_service_bus_topic, create_service_bus_subscription, send_message_to_service_bus_queue, send_list_message_to_service_bus_queue, @@ -145,6 +161,7 @@ update_service_bus_subscription, receive_message_service_bus_subscription, delete_service_bus_subscription, + delete_asb_topic, delete_service_bus_queue, )