diff --git a/providers/standard/src/airflow/providers/standard/sensors/weekday.py b/providers/standard/src/airflow/providers/standard/sensors/weekday.py index 29d3442ddb718..842bdd3f09418 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/weekday.py +++ b/providers/standard/src/airflow/providers/standard/sensors/weekday.py @@ -103,7 +103,21 @@ def poke(self, context: Context) -> bool: self.week_day, WeekDay(timezone.utcnow().isoweekday()).name, ) + if self.use_task_logical_date: - return context["logical_date"].isoweekday() in self._week_day_num + logical_date = context.get("logical_date") + dag_run = context.get("dag_run") + + if not (logical_date or (dag_run and dag_run.run_after)): + raise ValueError( + "Either `logical_date` or `run_after` should be provided in the task context when " + "`use_task_logical_date` is True" + ) + + determined_weekday_num = ( + logical_date.isoweekday() if logical_date else dag_run.run_after.isoweekday() # type: ignore[union-attr] + ) + + return determined_weekday_num in self._week_day_num else: return timezone.utcnow().isoweekday() in self._week_day_num diff --git a/providers/standard/tests/unit/standard/sensors/test_weekday.py b/providers/standard/tests/unit/standard/sensors/test_weekday.py index 28795899b7341..e1bcee1865032 100644 --- a/providers/standard/tests/unit/standard/sensors/test_weekday.py +++ b/providers/standard/tests/unit/standard/sensors/test_weekday.py @@ -25,6 +25,8 @@ from airflow.models import DagBag from airflow.models.dag import DAG from airflow.providers.standard.sensors.weekday import DayOfWeekSensor +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.utils import timezone from airflow.utils.timezone import datetime from airflow.utils.weekday import WeekDay @@ -128,3 +130,23 @@ def test_weekday_sensor_timeout_with_set(self): ) with pytest.raises(AirflowSensorTimeout): op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Skip on Airflow < 3.0") + def test_weekday_sensor_should_use_run_after_when_logical_date_is_not_provided(self, dag_maker): + with dag_maker( + "test_weekday_sensor", + schedule=None, + ) as dag: + op = DayOfWeekSensor( + task_id="weekday_sensor_check_true", + week_day={"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"}, + use_task_logical_date=True, + dag=dag, + ) + dr = dag_maker.create_dagrun( + run_id="manual_run", + start_date=DEFAULT_DATE, + logical_date=None, + **{"run_after": timezone.utcnow()}, + ) + assert op.poke(context={"logical_date": None, "dag_run": dr}) is True