diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index d5f2c02bdb772..407d4626f896c 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -66,7 +66,7 @@ def _trigger_dag( f"[{min_dag_start_date.isoformat()}] from DAG's default_args" ) - run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date) + run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date, dag_timezone=dag.timezone) dag_run = DagRun.find(dag_id=dag_id, run_id=run_id) if dag_run: diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index fa51f10ed48c0..6fb74b07b5c6f 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -92,7 +92,8 @@ def autogenerate(self, data, **kwargs): if "dag_run_id" not in data: try: data["dag_run_id"] = DagRun.generate_run_id( - DagRunType.MANUAL, timezone.parse(data["logical_date"]) + DagRunType.MANUAL, timezone.parse(data["logical_date"]), + dag_timezone=data['timezone'] ) except (ParserError, TypeError) as err: raise BadRequest("Incorrect datetime argument", detail=str(err)) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 41998eaa4d80f..f0d3faaf60a61 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -47,6 +47,15 @@ hostname_callable = socket.getfqdn # can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) default_timezone = utc +# Whether localize the dag run_id. +# If it is false, the run_id will be generated base on UTC time. +# If it is true, the run_id will be generated base on the the `default_timezone` +# For example, if you run a DAG on 2021-09-08 03:01:02 (UTC time) manually, +# if the `localize_dag_run_id` is False, the dag `run_id` is `manual__2021-09-08T03:01:02.022226+08:00` +# if the `localize_dag_run_id` is True and the `default_timezone` is Asia/Shanghai (+8:00), +# the dag `run_id` is `manual__2021-09-08T11:01:02.022226+08:00` +localize_dag_run_id = False + # The executor class that airflow should use. Choices include # ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``, # ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index 2d06dc6347595..83ae50e9c738c 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -340,6 +340,7 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non # explicitly mark as backfill and running run.state = State.RUNNING + run.run_id = run.generate_run_id(DagRunType.BACKFILL_JOB, run_date, dag_timezone=dag.timezone) run.run_type = DagRunType.BACKFILL_JOB run.verify_integrity(session=session) return run diff --git a/airflow/models/dag.py b/airflow/models/dag.py index bbb1fb7f72f94..c2373b43d2fea 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2310,8 +2310,8 @@ def create_dagrun( elif run_type and execution_date is not None: # Generate run_id from run_type and execution_date. if not isinstance(run_type, DagRunType): raise ValueError(f"`run_type` expected to be a DagRunType is {type(run_type)}") - run_id = DagRun.generate_run_id(run_type, execution_date) - else: + run_id = DagRun.generate_run_id(run_type, execution_date, dag_timezone=self.timezone) + elif not run_id: raise AirflowException( "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`" ) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 2b651c5f5873a..b338f9391f064 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. import warnings -from datetime import datetime +from datetime import datetime, tzinfo from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union from sqlalchemy import ( @@ -32,6 +32,7 @@ or_, text, ) +import pendulum from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import joinedload, relationship, synonym @@ -335,9 +336,16 @@ def find( return qry.order_by(DR.execution_date).all() @staticmethod - def generate_run_id(run_type: DagRunType, execution_date: datetime) -> str: + def generate_run_id(run_type: DagRunType, execution_date: datetime, + dag_timezone: tzinfo = settings.TIMEZONE) -> str: """Generate Run ID based on Run Type and Execution Date""" - return f"{run_type}__{execution_date.isoformat()}" + from airflow.configuration import conf + localize_dag_run_id = conf.getboolean("core", "localize_dag_run_id", fallback=False) + if localize_dag_run_id: + local_time = pendulum.instance(execution_date).astimezone(tz=dag_timezone) + return f"{run_type}__{local_time.isoformat()}" + else: + return f"{run_type}__{execution_date.isoformat()}" @provide_session def get_task_instances( diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index 2a32a3366538c..3051ce7df5d41 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -129,7 +129,7 @@ def execute(self, context: Dict): if self.trigger_run_id: run_id = self.trigger_run_id else: - run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date) + run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date, dag_timezone=self.dag.timezone) try: dag_run = trigger_dag( diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index c4ef287214077..0ea96e4bc0554 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -17,10 +17,12 @@ # under the License. import datetime +import time import unittest from unittest import mock from unittest.mock import call +import pytz from parameterized import parameterized from airflow import models, settings @@ -207,6 +209,87 @@ def test_dagrun_success_conditions(self): dr.update_state() assert State.SUCCESS == dr.state + def test_dagrun_run_id(self): + import pendulum + from airflow.configuration import conf + run_type = DagRunType.MANUAL + + # Part1: + # Test use the utc to generate `run_id`, + + # Set the localize_dag_run_id to False + conf.set("core", "localize_dag_run_id", "False") + dag_timezone = pendulum.tz.timezone("Asia/Seoul") + execution_date_summer = datetime.datetime(2021, 8, 8, 8, 8, 8, 123456) + run_id_1 = DagRun.generate_run_id(run_type, execution_date_summer, dag_timezone=dag_timezone) + assert run_id_1 == f'manual__{execution_date_summer.isoformat()}' + execution_date_winter = datetime.datetime(2021, 2, 8, 8, 8, 8, 123456) + run_id_2 = DagRun.generate_run_id(run_type, execution_date_winter, dag_timezone=dag_timezone) + assert run_id_2 == f'manual__{execution_date_winter.isoformat()}' + + + # Part2 : + # Test use the local time to generate `run_id` and the UTC offset and the UTC DST offset are the same + # Use Asia/Seoul and Asia/Shanghai as the examples + + # Set the localize_dag_run_id to True + conf.set("core", "localize_dag_run_id", "True") + + # create a datetime using Asia/Shanghai + execution_date_summer = datetime.datetime(2021, 8, 8, 8, 8, 8, 123456, tzinfo=pytz.timezone("Asia/Shanghai")) + execution_date_winter = datetime.datetime(2021, 2, 8, 8, 8, 8, 123456, tzinfo=pytz.timezone("Asia/Shanghai")) + + # Assume the server time zone is Asia/Seoul to check whether the run_id is generated using Seoul time + dag_timezone = pendulum.tz.timezone("Asia/Seoul") + run_id_3 = DagRun.generate_run_id(run_type, execution_date_summer, dag_timezone=dag_timezone) + assert run_id_3 == "manual__2021-08-08T09:08:08.123456+09:00" + run_id_4 = DagRun.generate_run_id(run_type, execution_date_winter, dag_timezone=dag_timezone) + assert run_id_4 == "manual__2021-02-08T09:08:08.123456+09:00" + + # Part3 : + # Test the timezone which the UTC DST and UTF DST offset are not the same + # The timezone Europe/Madrid, UTC offset is +01:00, UTC DST offset is +02:00 + dag_timezone = pendulum.tz.timezone("Europe/Madrid") + + # Set the localize_dag_run_id to True + conf.set("core", "localize_dag_run_id", "True") + run_id_7 = DagRun.generate_run_id(run_type, execution_date_summer, dag_timezone=dag_timezone) + assert run_id_7 == "manual__2021-08-08T02:08:08.123456+02:00" + run_id_8 = DagRun.generate_run_id(run_type, execution_date_winter, dag_timezone=dag_timezone) + assert run_id_8 == "manual__2021-02-08T01:08:08.123456+01:00" + + # Part3 : + # Test from non-dst to dst which means miss 1 hour + # The timezone Europe/Athens, UTC offset is +02:00, UTC DST offset is +03:00 + # They change it at 2021-03-28 03:00 -> 2021-03-28 04:00 + dag_timezone = pendulum.tz.timezone("Europe/Athens") + conf.set("core", "localize_dag_run_id", "True") + + # before changing from non-dst to dst + execution_date = pendulum.datetime(2021, 3, 28, 2, 8, 0, 123456, tz="Europe/Athens", + dst_rule=pendulum.PRE_TRANSITION) + run_id_9 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone) + assert run_id_9 == "manual__2021-03-28T02:08:00.123456+02:00" + + # changed to dst + execution_date = pendulum.datetime(2021, 3, 28, 3, 8, 0, 123456, tz="Europe/Athens", + dst_rule=pendulum.POST_TRANSITION) + run_id_10 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone) + assert run_id_10 == "manual__2021-03-28T04:08:00.123456+03:00" + + # Part3 : + # Test from DST to non-DST + # The timezone Europe/Athens, UTC offset is +02:00, UTC DST offset is +03:00 + # They change it at 2021-10-31 04:00 -> 2021-10-31 03:00, it means there are two 3:00 + execution_date = pendulum.datetime(2021, 10, 31, 3, 8, 0, 123456, tz="Europe/Athens", + dst_rule=pendulum.PRE_TRANSITION) + run_id_11 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone) + assert run_id_11 == "manual__2021-10-31T03:08:00.123456+03:00" + execution_date = pendulum.datetime(2021, 10, 31, 3, 8, 0, 123456, tz="Europe/Athens", + dst_rule=pendulum.POST_TRANSITION) + run_id_12 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone) + assert run_id_12 == "manual__2021-10-31T03:08:00.123456+02:00" + def test_dagrun_deadlock(self): session = settings.Session() dag = DAG('text_dagrun_deadlock', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})