Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,7 @@ def test_expected_output_push(
"providers/google/tests/unit/google/file.py",
),
{
"selected-providers-list-as-string": "amazon apache.beam apache.cassandra "
"selected-providers-list-as-string": "amazon apache.beam apache.cassandra apache.kafka "
"cncf.kubernetes common.compat common.sql "
"facebook google hashicorp microsoft.azure microsoft.mssql mysql "
"openlineage oracle postgres presto salesforce samba sftp ssh trino",
Expand All @@ -1635,14 +1635,14 @@ def test_expected_output_push(
"test-groups": "['core', 'providers']",
"docs-build": "true",
"docs-list-as-string": "apache-airflow helm-chart amazon apache.beam apache.cassandra "
"cncf.kubernetes common.compat common.sql facebook google hashicorp microsoft.azure "
"apache.kafka cncf.kubernetes common.compat common.sql facebook google hashicorp microsoft.azure "
"microsoft.mssql mysql openlineage oracle postgres "
"presto salesforce samba sftp ssh trino",
"skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk,ts-compile-format-lint-ui",
"run-kubernetes-tests": "true",
"upgrade-to-newer-dependencies": "false",
"core-test-types-list-as-string": "Always CLI",
"providers-test-types-list-as-string": "Providers[amazon] Providers[apache.beam,apache.cassandra,cncf.kubernetes,common.compat,common.sql,facebook,"
"providers-test-types-list-as-string": "Providers[amazon] Providers[apache.beam,apache.cassandra,apache.kafka,cncf.kubernetes,common.compat,common.sql,facebook,"
"hashicorp,microsoft.azure,microsoft.mssql,mysql,openlineage,oracle,postgres,presto,"
"salesforce,samba,sftp,ssh,trino] Providers[google]",
"needs-mypy": "true",
Expand Down Expand Up @@ -1890,7 +1890,7 @@ def test_upgrade_to_newer_dependencies(
pytest.param(
("providers/google/docs/some_file.rst",),
{
"docs-list-as-string": "amazon apache.beam apache.cassandra "
"docs-list-as-string": "amazon apache.beam apache.cassandra apache.kafka "
"cncf.kubernetes common.compat common.sql facebook google hashicorp "
"microsoft.azure microsoft.mssql mysql openlineage oracle "
"postgres presto salesforce samba sftp ssh trino",
Expand Down
4 changes: 3 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@
],
"devel-deps": [],
"plugins": [],
"cross-providers-deps": [],
"cross-providers-deps": [
"google"
],
"excluded-python-versions": [],
"state": "ready"
},
Expand Down
19 changes: 19 additions & 0 deletions providers/apache/kafka/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,24 @@ PIP package Version required
``confluent-kafka`` ``>=2.3.0``
=================== ==================

Cross provider package dependencies
-----------------------------------

Those are dependencies that might be needed in order to use all the features of the package.
You need to install the specified provider packages in order to use them.

You can install such cross-provider dependencies when installing from PyPI. For example:

.. code-block:: bash

pip install apache-airflow-providers-apache-kafka[google]


==================================================================================================== ==========
Dependent package Extra
==================================================================================================== ==========
`apache-airflow-providers-google <https://airflow.apache.org/docs/apache-airflow-providers-google>`_ ``google``
==================================================================================================== ==========

The changelog for the provider package can be found in the
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.7.0/changelog.html>`_.
7 changes: 7 additions & 0 deletions providers/apache/kafka/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ dependencies = [
"confluent-kafka>=2.3.0",
]

# The optional dependencies should be modified in place in the generated file
# Any change in the dependencies is preserved when the file is regenerated
[project.optional-dependencies]
"google" = [
"apache-airflow-providers-google"
]

[project.urls]
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.7.0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.7.0/changelog.html"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,5 @@ def get_provider_info():
}
],
"dependencies": ["apache-airflow>=2.9.0", "asgiref>=2.3.0", "confluent-kafka>=2.3.0"],
"optional-dependencies": {"google": ["apache-airflow-providers-google"]},
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from confluent_kafka.admin import AdminClient

from airflow.hooks.base import BaseHook
from airflow.providers.google.cloud.hooks.managed_kafka import ManagedKafkaHook

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we import Google hook in kafka provider?
This doesn't feel right.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaBaseHook is used in many places this import force to install google provider to use the Kafka provider.
Am I missing something?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah indeed. sorry I missed that one. Good you noticed it.

@potiuk potiuk Mar 9, 2025

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaksYermak -> can you please fix that ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal @potiuk I am ready to prepare a fix ASAP. But I have a question. In case when we can't use a google Hook inside Kafka provider what is a better way to use code from the google provider? Could I make a conditional import here as a solution?

The reason for using Google Hook was that for the Kafka cluster managed by Google for establishing connection we need to pass a function which generates a token for Confluent with google's credentials. And a function which gets a google's credential exists only inside the Google provider.



class KafkaBaseHook(BaseHook):
Expand Down Expand Up @@ -63,6 +64,16 @@ def get_conn(self) -> Any:
if not (config.get("bootstrap.servers", None)):
raise ValueError("config['bootstrap.servers'] must be provided.")

bootstrap_servers = config.get("bootstrap.servers")
if (
bootstrap_servers
and bootstrap_servers.find("cloud.goog") != -1
and bootstrap_servers.find("managedkafka") != -1
):
self.log.info("Adding token generation for Google Auth to the confluent configuration.")
hook = ManagedKafkaHook()
token = hook.get_confluent_token
config.update({"oauth_cb": token})
return self._get_client(config)

def test_connection(self) -> tuple[bool, str]:
Expand Down
60 changes: 60 additions & 0 deletions providers/google/docs/operators/cloud/managed_kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,66 @@ To update topic you can use
:start-after: [START how_to_cloud_managed_kafka_update_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_update_topic_operator]

Interacting with Apache Kafka Consumer Groups
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To delete consumer group you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaDeleteConsumerGroupOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_delete_consumer_group_operator]
:end-before: [END how_to_cloud_managed_kafka_delete_consumer_group_operator]

To get consumer group you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaGetConsumerGroupOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_get_consumer_group_operator]
:end-before: [END how_to_cloud_managed_kafka_get_consumer_group_operator]

To get a list of consumer groups you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaListConsumerGroupsOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_list_consumer_group_operator]
:end-before: [END how_to_cloud_managed_kafka_list_consumer_group_operator]

To update consumer group you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaUpdateConsumerGroupOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_update_consumer_group_operator]
:end-before: [END how_to_cloud_managed_kafka_update_consumer_group_operator]

Using Apache Kafka provider with Google Cloud Managed Service for Apache Kafka
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To produce data to topic you can use
:class:`~airflow.providers.apache.kafka.operators.produce.ProduceToTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_produce_to_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_produce_to_topic_operator]

To consume data from topic you can use
:class:`~airflow.providers.apache.kafka.operators.produce.ConsumeFromTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_consume_from_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_consume_from_topic_operator]

Reference
^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,7 @@ extra-links:
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaClusterLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaClusterListLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaTopicLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaConsumerGroupLink


secrets-backends:
Expand Down
Loading