Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a6ddec7
Fix typo.
ANiteckiP Feb 17, 2020
162363a
Type checking: pass empty dict instead of None to operators in test s…
ANiteckiP Feb 17, 2020
00d5e24
Reformat some whitespace.
ANiteckiP Feb 17, 2020
8a33d0b
Fix PubSubHook.pull return value type annotation.
ANiteckiP Feb 18, 2020
bc5fb8b
PubSubHook.acknowledge: Implement acknowledging by passing list of Re…
ANiteckiP Feb 18, 2020
ecbbc02
Refactor PubSubPullSensor.
ANiteckiP Feb 18, 2020
154c3af
Implement PubSubPullOperator.
ANiteckiP Feb 18, 2020
3f2a979
Remove unused constant from PubSub tests.
ANiteckiP Feb 18, 2020
b507649
Fix Sensor contract: some do have a return value.
ANiteckiP Feb 19, 2020
5c7fc27
Reformat whitespace in PubSub tests.
ANiteckiP Feb 19, 2020
82dc79a
PubSub: Add tests for PubSubPullOperator.
ANiteckiP Feb 19, 2020
7b067cc
Fix PubSubPullSensor tests after refactoring.
ANiteckiP Feb 20, 2020
fbea243
Sort imports.
ANiteckiP Feb 20, 2020
95aff40
Fix pylint complaining about callback interface.
ANiteckiP Feb 20, 2020
fe3af1e
Update airflow/providers/google/cloud/operators/pubsub.py
ANiteckiP Feb 20, 2020
f22763c
Update airflow/providers/google/cloud/hooks/pubsub.py
ANiteckiP Feb 20, 2020
15581fd
Update airflow/providers/google/cloud/hooks/pubsub.py
ANiteckiP Feb 20, 2020
1e02875
Update airflow/providers/google/cloud/operators/pubsub.py
ANiteckiP Feb 20, 2020
0a7e2a6
Reorder PubSub messages_callback argument.
ANiteckiP Feb 20, 2020
aa28655
Update docstring.
ANiteckiP Feb 20, 2020
4d5451f
Reformat mutually exclusive argument handling logic in PubSubHook.
ANiteckiP Feb 21, 2020
b31fb6d
PubSub: Deprecate return_immediately argument.
ANiteckiP Feb 27, 2020
869f094
Implement example DAG and system test for PubSubPullOperator.
ANiteckiP Mar 3, 2020
5d7bff7
Fix docstring formatting.
ANiteckiP Mar 5, 2020
cc087c9
Apply suggestions from code review
ANiteckiP Mar 16, 2020
5c5a74a
PubSubPullOperator: Fix docs.
ANiteckiP Mar 16, 2020
74bb327
Fix test
ANiteckiP Mar 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 74 additions & 10 deletions airflow/providers/google/cloud/example_dags/example_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.pubsub import (
PubSubCreateSubscriptionOperator, PubSubCreateTopicOperator, PubSubDeleteSubscriptionOperator,
PubSubDeleteTopicOperator, PubSubPublishMessageOperator,
PubSubDeleteTopicOperator, PubSubPublishMessageOperator, PubSubPullOperator,
)
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
TOPIC = "PubSubTestTopic"
TOPIC_FOR_SENSOR_DAG = "PubSubSensorTestTopic"
TOPIC_FOR_OPERATOR_DAG = "PubSubOperatorTestTopic"
MESSAGE = {"data": b"Tool", "attributes": {"name": "wrench", "mass": "1.3kg", "count": "3"}}

default_args = {"start_date": days_ago(1)}
Expand All @@ -45,23 +46,23 @@
# [END howto_operator_gcp_pubsub_pull_messages_result_cmd]

with models.DAG(
"example_gcp_pubsub",
"example_gcp_pubsub_sensor",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as example_dag:
) as example_sensor_dag:
# [START howto_operator_gcp_pubsub_create_topic]
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_create_topic]

# [START howto_operator_gcp_pubsub_create_subscription]
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_SENSOR_DAG
)
# [END howto_operator_gcp_pubsub_create_subscription]

# [START howto_operator_gcp_pubsub_pull_message]
# [START howto_operator_gcp_pubsub_pull_message_with_sensor]
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"

pull_messages = PubSubPullSensor(
Expand All @@ -70,7 +71,7 @@
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
# [END howto_operator_gcp_pubsub_pull_message]
# [END howto_operator_gcp_pubsub_pull_message_with_sensor]

# [START howto_operator_gcp_pubsub_pull_messages_result]
pull_messages_result = BashOperator(
Expand All @@ -82,7 +83,7 @@
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=GCP_PROJECT_ID,
topic=TOPIC,
topic=TOPIC_FOR_SENSOR_DAG,
messages=[MESSAGE, MESSAGE, MESSAGE],
)
# [END howto_operator_gcp_pubsub_publish]
Expand All @@ -97,9 +98,72 @@

# [START howto_operator_gcp_pubsub_delete_topic]
delete_topic = PubSubDeleteTopicOperator(
task_id="delete_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
task_id="delete_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_delete_topic]

create_topic >> subscribe_task >> publish_task
subscribe_task >> pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic


with models.DAG(
"example_gcp_pubsub_operator",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as example_operator_dag:
# [START howto_operator_gcp_pubsub_create_topic]
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_FOR_OPERATOR_DAG, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_create_topic]

# [START howto_operator_gcp_pubsub_create_subscription]
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC_FOR_OPERATOR_DAG
)
# [END howto_operator_gcp_pubsub_create_subscription]

# [START howto_operator_gcp_pubsub_pull_message_with_operator]
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"

pull_messages = PubSubPullOperator(
task_id="pull_messages",
ack_messages=True,
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
# [END howto_operator_gcp_pubsub_pull_message_with_operator]

# [START howto_operator_gcp_pubsub_pull_messages_result]
pull_messages_result = BashOperator(
task_id="pull_messages_result", bash_command=echo_cmd
)
# [END howto_operator_gcp_pubsub_pull_messages_result]

# [START howto_operator_gcp_pubsub_publish]
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=GCP_PROJECT_ID,
topic=TOPIC_FOR_OPERATOR_DAG,
messages=[MESSAGE, MESSAGE, MESSAGE],
)
# [END howto_operator_gcp_pubsub_publish]

# [START howto_operator_gcp_pubsub_unsubscribe]
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=GCP_PROJECT_ID,
subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
)
# [END howto_operator_gcp_pubsub_unsubscribe]

# [START howto_operator_gcp_pubsub_delete_topic]
delete_topic = PubSubDeleteTopicOperator(
task_id="delete_topic", topic=TOPIC_FOR_OPERATOR_DAG, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_delete_topic]

(
create_topic >> subscribe_task >> publish_task
>> pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic
)
28 changes: 22 additions & 6 deletions airflow/providers/google/cloud/hooks/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from google.api_core.retry import Retry
from google.cloud.exceptions import NotFound
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig, ReceivedMessage
from googleapiclient.errors import HttpError

from airflow.providers.google.cloud.hooks.base import CloudBaseHook
Expand Down Expand Up @@ -460,7 +460,7 @@ def pull(
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
) -> List[Dict]:
) -> List[ReceivedMessage]:
"""
Pulls up to ``max_messages`` messages from Pub/Sub subscription.

Expand Down Expand Up @@ -496,7 +496,7 @@ def pull(
subscriber = self.subscriber_client
subscription_path = SubscriberClient.subscription_path(project_id, subscription) # noqa E501 # pylint: disable=no-member,line-too-long

self.log.info("Pulling mex %d messages from subscription (path) %s", max_messages, subscription_path)
self.log.info("Pulling max %d messages from subscription (path) %s", max_messages, subscription_path)
try:
# pylint: disable=no-member
response = subscriber.pull(
Expand All @@ -517,7 +517,8 @@ def pull(
def acknowledge(
self,
subscription: str,
ack_ids: List[str],
ack_ids: Optional[List[str]] = None,
messages: Optional[List[ReceivedMessage]] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
Expand All @@ -529,9 +530,12 @@ def acknowledge(
:param subscription: the Pub/Sub subscription name to delete; do not
include the 'projects/{project}/topics/' prefix.
:type subscription: str
:param ack_ids: List of ReceivedMessage ackIds from a previous pull
response
:param ack_ids: List of ReceivedMessage ackIds from a previous pull response.
Mutually exclusive with ``messages`` argument.
:type ack_ids: list
:param messages: List of ReceivedMessage objects to acknowledge.
Mutually exclusive with ``ack_ids`` argument.
:type messages: list
:param project_id: Optional, the GCP project name or ID in which to create the topic
If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
Expand All @@ -545,8 +549,20 @@ def acknowledge(
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]]
"""

if not project_id:
raise ValueError("Project ID should be set.")

if ack_ids is not None and messages is None:
pass
elif ack_ids is None and messages is not None:
ack_ids = [
message.ack_id
for message in messages
]
else:
raise ValueError("One and only one of 'ack_ids' and 'messages' arguments have to be provided")

subscriber = self.subscriber_client
subscription_path = SubscriberClient.subscription_path(project_id, subscription) # noqa E501 # pylint: disable=no-member,line-too-long

Expand Down
125 changes: 123 additions & 2 deletions airflow/providers/google/cloud/operators/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
This module contains Google PubSub operators.
"""
import warnings
from typing import Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

from google.api_core.retry import Retry
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig, ReceivedMessage
from google.protobuf.json_format import MessageToDict

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
Expand Down Expand Up @@ -666,3 +667,123 @@ def execute(self, context):
self.log.info("Publishing to topic %s", self.topic)
hook.publish(project_id=self.project_id, topic=self.topic, messages=self.messages)
self.log.info("Published to topic %s", self.topic)


class PubSubPullOperator(BaseOperator):
"""Pulls messages from a PubSub subscription and passes them through XCom.
If the queue is empty, returns empty list - never waits for messages.
If you do need to wait, please use :class:`airflow.providers.google.cloud.sensors.PubSubPullSensor`
instead.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubPullSensor`

This sensor operator will pull up to ``max_messages`` messages from the
specified PubSub subscription. When the subscription returns messages,
the poke method's criteria will be fulfilled and the messages will be
returned from the operator and passed through XCom for downstream tasks.

If ``ack_messages`` is set to True, messages will be immediately
acknowledged before being returned, otherwise, downstream tasks will be
responsible for acknowledging them.

``project`` and ``subscription`` are templated so you can use
variables in them.

:param project: the GCP project ID for the subscription (templated)
:type project: str
:param subscription: the Pub/Sub subscription name. Do not include the
full subscription path.
:type subscription: str
:param max_messages: The maximum number of messages to retrieve per
PubSub pull request
:type max_messages: int
:param ack_messages: If True, each message will be acknowledged
immediately rather than by any downstream tasks
:type ack_messages: bool
:param gcp_conn_id: The connection ID to use connecting to
Google Cloud Platform.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request
must have domain-wide delegation enabled.
:type delegate_to: str
:param messages_callback: (Optional) Callback to process received messages.
It's return value will be saved to XCom.
If you are pulling large messages, you probably want to provide a custom callback.
If not provided, the default implementation will convert `ReceivedMessage` objects
into JSON-serializable dicts using `google.protobuf.json_format.MessageToDict` function.
:type messages_callback: Optional[Callable[[List[ReceivedMessage], Dict[str, Any]], Any]]
"""
template_fields = ['project_id', 'subscription']

@apply_defaults
def __init__(
self,
project_id: str,
subscription: str,
max_messages: int = 5,
ack_messages: bool = False,
messages_callback: Optional[Callable[[List[ReceivedMessage], Dict[str, Any]], Any]] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.project_id = project_id
self.subscription = subscription
self.max_messages = max_messages
self.ack_messages = ack_messages
self.messages_callback = messages_callback

def execute(self, context):
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
)

pulled_messages = hook.pull(
project_id=self.project_id,
subscription=self.subscription,
max_messages=self.max_messages,
return_immediately=True,
)

handle_messages = self.messages_callback or self._default_message_callback

ret = handle_messages(pulled_messages, context)

if pulled_messages and self.ack_messages:
hook.acknowledge(
project_id=self.project_id,
subscription=self.subscription,
messages=pulled_messages,
)

return ret

def _default_message_callback(
self,
pulled_messages: List[ReceivedMessage],
context: Dict[str, Any], # pylint: disable=unused-argument
):
"""
This method can be overridden by subclasses or by `messages_callback` constructor argument.
This default implementation converts `ReceivedMessage` objects into JSON-serializable dicts.

:param pulled_messages: messages received from the topic.
:type pulled_messages: List[ReceivedMessage]
:param context: same as in `execute`
:return: value to be saved to XCom.
"""

messages_json = [
MessageToDict(m)
for m in pulled_messages
]

return messages_json
Loading