Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==6.2.0
Apache Airflow version
2.5.0
Operating System
Red Hat Enterprise Linux Server 7.6 (Maipo)
Deployment
Virtualenv installation
Deployment details
Simple virtualenv deployment
What happened
bucket_key is a template_field in S3KeySensor, which means that is expected to be rendered as a template field.
The supported types for the attribute are both 'str' and 'list'. There is also a conditional operation in the init function of the class that relies on the type of the input data, that converts the attribute to a list of strings. If a list of str is passed in through Jinja template, self.bucket_key is available as a doubly-nested list of strings, rather than a list of strings.
This is because the input value of bucket_key can only be a string type that represents the template-string when used as a template_field. These template_fields are then converted to their corresponding values when instantiated as a task_instance.
Example log from init function:
scheduler | DEBUG | type: <class 'list'> | val: ["{{ ti.xcom_pull(task_ids='t1') }}"]
Example log from poke function:
poke | DEBUG | type: <class 'list'> | val: [["s3://test_bucket/test_key1", "s3://test_bucket/test_key2"]]
This leads to the poke function throwing an exception as each individual key needs to be a string value to parse the url, but is being passed as a list (since self.bucket_key is a nested list).
What you think should happen instead
Instead of putting the input value of bucket_key in a list, we should store the value as-is upon initialization of the class, and just conditionally check the type of the attribute within the poke function.
def __init__
self.bucket_key = bucket_key
(which willstore the input values correctly as a str or a list when the task instance is created and the template fields are rendered)
def poke
def poke(self, context: Context):
if isinstance(self.bucket_key, str):
return self._check_key(key)
else:
return all(self._check_key(key) for key in self.bucket_key)
How to reproduce
- Use a template field as the bucket_key attribute in S3KeySensor
- Pass a list of strings as the rendered template input value for the bucket_key attribute in the S3KeySensor task. (e.g. as an XCOM or Variable pulled value)
Example:
with DAG(
...
render_template_as_native_obj=True,
) as dag:
@task(task_id="get_list_of_str", do_xcom_push=True)
def get_list_of_str():
return ["s3://test_bucket/test_key1", "s3://test_bucket/test_key1"]
t = get_list_of_str()
op = S3KeySensor(task_id="s3_key_sensor", bucket_key="{{ ti.xcom_pull(task_ids='get_list_of_str') }}")
t >> op
Anything else
No response
Are you willing to submit PR?
Code of Conduct
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==6.2.0
Apache Airflow version
2.5.0
Operating System
Red Hat Enterprise Linux Server 7.6 (Maipo)
Deployment
Virtualenv installation
Deployment details
Simple virtualenv deployment
What happened
bucket_key is a template_field in S3KeySensor, which means that is expected to be rendered as a template field.
The supported types for the attribute are both 'str' and 'list'. There is also a conditional operation in the init function of the class that relies on the type of the input data, that converts the attribute to a list of strings. If a list of str is passed in through Jinja template, self.bucket_key is available as a doubly-nested list of strings, rather than a list of strings.
This is because the input value of bucket_key can only be a string type that represents the template-string when used as a template_field. These template_fields are then converted to their corresponding values when instantiated as a task_instance.
Example log from init function:
scheduler | DEBUG | type: <class 'list'> | val: ["{{ ti.xcom_pull(task_ids='t1') }}"]Example log from poke function:
poke | DEBUG | type: <class 'list'> | val: [["s3://test_bucket/test_key1", "s3://test_bucket/test_key2"]]This leads to the poke function throwing an exception as each individual key needs to be a string value to parse the url, but is being passed as a list (since self.bucket_key is a nested list).
What you think should happen instead
Instead of putting the input value of bucket_key in a list, we should store the value as-is upon initialization of the class, and just conditionally check the type of the attribute within the poke function.
def __init__
self.bucket_key = bucket_key(which willstore the input values correctly as a str or a list when the task instance is created and the template fields are rendered)
def poke
How to reproduce
Example:
Anything else
No response
Are you willing to submit PR?
Code of Conduct