Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _trigger_dag(
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)

run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
run_id = run_id or DagRun.generate_run_id(DagRunType.MANUAL, execution_date, dag_timezone=dag.timezone)
dag_run = DagRun.find(dag_id=dag_id, run_id=run_id)

if dag_run:
Expand Down
3 changes: 2 additions & 1 deletion airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def autogenerate(self, data, **kwargs):
if "dag_run_id" not in data:
try:
data["dag_run_id"] = DagRun.generate_run_id(
DagRunType.MANUAL, timezone.parse(data["logical_date"])
DagRunType.MANUAL, timezone.parse(data["logical_date"]),
dag_timezone=data['timezone']
)
except (ParserError, TypeError) as err:
raise BadRequest("Incorrect datetime argument", detail=str(err))
Expand Down
9 changes: 9 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ hostname_callable = socket.getfqdn
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = utc

# Whether localize the dag run_id.
# If it is false, the run_id will be generated base on UTC time.
# If it is true, the run_id will be generated base on the the `default_timezone`
# For example, if you run a DAG on 2021-09-08 03:01:02 (UTC time) manually,
# if the `localize_dag_run_id` is False, the dag `run_id` is `manual__2021-09-08T03:01:02.022226+08:00`
# if the `localize_dag_run_id` is True and the `default_timezone` is Asia/Shanghai (+8:00),
# the dag `run_id` is `manual__2021-09-08T11:01:02.022226+08:00`
localize_dag_run_id = False

# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
Expand Down
1 change: 1 addition & 0 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non

# explicitly mark as backfill and running
run.state = State.RUNNING
run.run_id = run.generate_run_id(DagRunType.BACKFILL_JOB, run_date, dag_timezone=dag.timezone)
run.run_type = DagRunType.BACKFILL_JOB
run.verify_integrity(session=session)
return run
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2310,8 +2310,8 @@ def create_dagrun(
elif run_type and execution_date is not None: # Generate run_id from run_type and execution_date.
if not isinstance(run_type, DagRunType):
raise ValueError(f"`run_type` expected to be a DagRunType is {type(run_type)}")
run_id = DagRun.generate_run_id(run_type, execution_date)
else:
run_id = DagRun.generate_run_id(run_type, execution_date, dag_timezone=self.timezone)
elif not run_id:
raise AirflowException(
"Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
)
Expand Down
14 changes: 11 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import warnings
from datetime import datetime
from datetime import datetime, tzinfo
from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union

from sqlalchemy import (
Expand All @@ -32,6 +32,7 @@
or_,
text,
)
import pendulum
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import joinedload, relationship, synonym
Expand Down Expand Up @@ -335,9 +336,16 @@ def find(
return qry.order_by(DR.execution_date).all()

@staticmethod
def generate_run_id(run_type: DagRunType, execution_date: datetime) -> str:
def generate_run_id(run_type: DagRunType, execution_date: datetime,
dag_timezone: tzinfo = settings.TIMEZONE) -> str:
"""Generate Run ID based on Run Type and Execution Date"""
return f"{run_type}__{execution_date.isoformat()}"
from airflow.configuration import conf
localize_dag_run_id = conf.getboolean("core", "localize_dag_run_id", fallback=False)
if localize_dag_run_id:
local_time = pendulum.instance(execution_date).astimezone(tz=dag_timezone)
return f"{run_type}__{local_time.isoformat()}"
else:
return f"{run_type}__{execution_date.isoformat()}"

@provide_session
def get_task_instances(
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def execute(self, context: Dict):
if self.trigger_run_id:
run_id = self.trigger_run_id
else:
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date, dag_timezone=self.dag.timezone)

try:
dag_run = trigger_dag(
Expand Down
83 changes: 83 additions & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
# under the License.

import datetime
import time
import unittest
from unittest import mock
from unittest.mock import call

import pytz
from parameterized import parameterized

from airflow import models, settings
Expand Down Expand Up @@ -207,6 +209,87 @@ def test_dagrun_success_conditions(self):
dr.update_state()
assert State.SUCCESS == dr.state

def test_dagrun_run_id(self):
import pendulum
from airflow.configuration import conf
run_type = DagRunType.MANUAL

# Part1:
# Test use the utc to generate `run_id`,

# Set the localize_dag_run_id to False
conf.set("core", "localize_dag_run_id", "False")
dag_timezone = pendulum.tz.timezone("Asia/Seoul")
execution_date_summer = datetime.datetime(2021, 8, 8, 8, 8, 8, 123456)
run_id_1 = DagRun.generate_run_id(run_type, execution_date_summer, dag_timezone=dag_timezone)
assert run_id_1 == f'manual__{execution_date_summer.isoformat()}'
execution_date_winter = datetime.datetime(2021, 2, 8, 8, 8, 8, 123456)
run_id_2 = DagRun.generate_run_id(run_type, execution_date_winter, dag_timezone=dag_timezone)
assert run_id_2 == f'manual__{execution_date_winter.isoformat()}'


# Part2 :
# Test use the local time to generate `run_id` and the UTC offset and the UTC DST offset are the same
# Use Asia/Seoul and Asia/Shanghai as the examples

# Set the localize_dag_run_id to True
conf.set("core", "localize_dag_run_id", "True")

# create a datetime using Asia/Shanghai
execution_date_summer = datetime.datetime(2021, 8, 8, 8, 8, 8, 123456, tzinfo=pytz.timezone("Asia/Shanghai"))
execution_date_winter = datetime.datetime(2021, 2, 8, 8, 8, 8, 123456, tzinfo=pytz.timezone("Asia/Shanghai"))

# Assume the server time zone is Asia/Seoul to check whether the run_id is generated using Seoul time
dag_timezone = pendulum.tz.timezone("Asia/Seoul")
run_id_3 = DagRun.generate_run_id(run_type, execution_date_summer, dag_timezone=dag_timezone)
assert run_id_3 == "manual__2021-08-08T09:08:08.123456+09:00"
run_id_4 = DagRun.generate_run_id(run_type, execution_date_winter, dag_timezone=dag_timezone)
assert run_id_4 == "manual__2021-02-08T09:08:08.123456+09:00"

# Part3 :
# Test the timezone which the UTC DST and UTF DST offset are not the same
# The timezone Europe/Madrid, UTC offset is +01:00, UTC DST offset is +02:00
dag_timezone = pendulum.tz.timezone("Europe/Madrid")

# Set the localize_dag_run_id to True
conf.set("core", "localize_dag_run_id", "True")
run_id_7 = DagRun.generate_run_id(run_type, execution_date_summer, dag_timezone=dag_timezone)
assert run_id_7 == "manual__2021-08-08T02:08:08.123456+02:00"
run_id_8 = DagRun.generate_run_id(run_type, execution_date_winter, dag_timezone=dag_timezone)
assert run_id_8 == "manual__2021-02-08T01:08:08.123456+01:00"

# Part3 :
# Test from non-dst to dst which means miss 1 hour
# The timezone Europe/Athens, UTC offset is +02:00, UTC DST offset is +03:00
# They change it at 2021-03-28 03:00 -> 2021-03-28 04:00
dag_timezone = pendulum.tz.timezone("Europe/Athens")
conf.set("core", "localize_dag_run_id", "True")

# before changing from non-dst to dst
execution_date = pendulum.datetime(2021, 3, 28, 2, 8, 0, 123456, tz="Europe/Athens",
dst_rule=pendulum.PRE_TRANSITION)
run_id_9 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone)
assert run_id_9 == "manual__2021-03-28T02:08:00.123456+02:00"

# changed to dst
execution_date = pendulum.datetime(2021, 3, 28, 3, 8, 0, 123456, tz="Europe/Athens",
dst_rule=pendulum.POST_TRANSITION)
run_id_10 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone)
assert run_id_10 == "manual__2021-03-28T04:08:00.123456+03:00"

# Part3 :
# Test from DST to non-DST
# The timezone Europe/Athens, UTC offset is +02:00, UTC DST offset is +03:00
# They change it at 2021-10-31 04:00 -> 2021-10-31 03:00, it means there are two 3:00
execution_date = pendulum.datetime(2021, 10, 31, 3, 8, 0, 123456, tz="Europe/Athens",
dst_rule=pendulum.PRE_TRANSITION)
run_id_11 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone)
assert run_id_11 == "manual__2021-10-31T03:08:00.123456+03:00"
execution_date = pendulum.datetime(2021, 10, 31, 3, 8, 0, 123456, tz="Europe/Athens",
dst_rule=pendulum.POST_TRANSITION)
run_id_12 = DagRun.generate_run_id(run_type, execution_date, dag_timezone=dag_timezone)
assert run_id_12 == "manual__2021-10-31T03:08:00.123456+02:00"

def test_dagrun_deadlock(self):
session = settings.Session()
dag = DAG('text_dagrun_deadlock', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
Expand Down