Apache Airflow Provider(s)
microsoft-azure
Versions of Apache Airflow Providers
apache-airflow-providers-microsoft-azure==9.0.1
azure-identity==1.15.0
azure-storage-blob==12.19.1
azure-storage-file-datalake==12.14.0
Apache Airflow version
Airflow v2.8.3 with Python 3.10.14
Operating System
Debian GNU/Linux 11 (bullseye)
Deployment
Official Apache Airflow Helm Chart
Deployment details
k8s v1.27.3 (AKS with Microsoft Entra Workload ID enabled)
Airflow helm chart v1.13.1
values.yml
# Airflow Worker Config
workers:
serviceAccount:
annotations:
azure.workload.identity/client-id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
podAnnotations:
azure.workload.identity/skip-containers: worker-log-groomer;worker-kerberos;git-sync;git-sync-init;wait-for-airflow-migrations;volume-permissions
labels:
azure.workload.identity/use: "true"
Credentials injected by workload identity as environment variables
(airflow)env | grep AZURE
AZURE_AUTHORITY_HOST=https://login.microsoftonline.com/
AZURE_CLIENT_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
AZURE_FEDERATED_TOKEN_FILE=/var/run/secrets/azure/tokens/azure-identity-token
AZURE_TENANT_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Microsoft Azure Data Lake Storage Gen2 Connection environment variable
(airflow)env | grep ADLS
AIRFLOW_CONN_ADLS_DEFAULT=adls://<storage_name>
What happened
[2024-05-09, 10:35:29 UTC] {operators.py:47} INFO - Getting list of file systems
[2024-05-09, 10:35:29 UTC] {base.py:83} INFO - Using connection ID 'adls_default' for task execution.
[2024-05-09, 10:35:29 UTC] {data_lake.py:368} INFO - account_url: https://<storage_name>.dfs.core.windows.net
[2024-05-09, 10:35:29 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/plugins/adls/operators.py", line 48, in execute
return hook.list_file_system()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/microsoft/azure/hooks/data_lake.py", line 519, in list_file_system
file_system = self.service_client.list_file_systems(
File "/usr/local/lib/python3.10/functools.py", line 981, in __get__
val = self.func(instance)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/microsoft/azure/hooks/data_lake.py", line 333, in service_client
return self.get_conn()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/microsoft/azure/hooks/data_lake.py", line 370, in get_conn
return DataLakeServiceClient(
File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/filedatalake/_data_lake_service_client.py", line 96, in __init__
self._blob_service_client = BlobServiceClient(blob_account_url, credential, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/blob/_blob_service_client.py", line 139, in __init__
super(BlobServiceClient, self).__init__(parsed_url, service='blob', credential=credential, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/blob/_shared/base_client.py", line 110, in __init__
self._config, self._pipeline = self._create_pipeline(self.credential, sdk_moniker=self._sdk_moniker, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/azure/storage/blob/_shared/base_client.py", line 234, in _create_pipeline
raise TypeError(f"Unsupported credential: {type(credential)}")
TypeError: Unsupported credential: <class 'airflow.providers.microsoft.azure.utils.AzureIdentityCredentialAdapter'>
What you think should happen instead
DefaultAzureCredential authentication method should work as expected using workload identity
How to reproduce
I've created a custom operator to list the filesytem of Azure Data Lake Storage Gen2 using the method "list_file_system" from AzureDataLakeStorageV2Hook class.
class ADLSListFileSystemOperator(BaseOperator):
template_fields: Sequence[str] = ()
ui_color = "#901dd2"
def __init__(
self,
*,
prefix: str | None = None,
include_metadata: bool = False,
adls_conn_id: str = "adls_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.prefix = prefix
self.include_metadata = include_metadata
self.adls_conn_id = adls_conn_id
def execute(self, context: Context) -> list:
hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.adls_conn_id)
self.log.info("Getting list of file systems")
return hook.list_file_system()
Anything else
No response
Are you willing to submit PR?
Code of Conduct
Apache Airflow Provider(s)
microsoft-azure
Versions of Apache Airflow Providers
apache-airflow-providers-microsoft-azure==9.0.1
azure-identity==1.15.0
azure-storage-blob==12.19.1
azure-storage-file-datalake==12.14.0
Apache Airflow version
Airflow v2.8.3 with Python 3.10.14
Operating System
Debian GNU/Linux 11 (bullseye)
Deployment
Official Apache Airflow Helm Chart
Deployment details
k8s v1.27.3 (AKS with Microsoft Entra Workload ID enabled)
Airflow helm chart v1.13.1
Credentials injected by workload identity as environment variables
(airflow)env | grep AZURE AZURE_AUTHORITY_HOST=https://login.microsoftonline.com/ AZURE_CLIENT_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx AZURE_FEDERATED_TOKEN_FILE=/var/run/secrets/azure/tokens/azure-identity-token AZURE_TENANT_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxMicrosoft Azure Data Lake Storage Gen2 Connection environment variable
What happened
What you think should happen instead
DefaultAzureCredential authentication method should work as expected using workload identity
How to reproduce
I've created a custom operator to list the filesytem of Azure Data Lake Storage Gen2 using the method "list_file_system" from AzureDataLakeStorageV2Hook class.
Anything else
No response
Are you willing to submit PR?
Code of Conduct