Skip to content

Dag File Processing Slowness when using Dag Params #32434

Description

@dlstadther-pl

Apache Airflow version

2.6.2

What happened

After migrating from Airflow 2.2.3 to 2.6.2, we saw a large (~5-10x) increase in DAG File Processing time for our dags. While we have some anti-patterns with dag generation (dynamic dag generation and usage of 5 Airflow Variables), we have isolated the increase in processing duration to the existence of Dag Params (see "How to Reproduce", below).

We're experiencing this issue in our most complex dag file. This dag file creates 1 "main" dag which runs a TriggerDagRunOperator on each "client-specific" dags for which it generates dynamically. Each client-specific dag is assigned 5 Dag Params (which describe certain characteristics of the client) and about 400 tasks.

Dag files which used to take 0.58s now take 2.88s; 3s now take 30s; 95s now take 985s.

What you think should happen instead

I believe DAG Processing is inefficient at serializing dag params during the serialization of tasks.

(However, I have been unable to pinpoint a commit which caused a significant change to the serialization of DagBag.sync_to_db() code).

How to reproduce

I have reproduced the situation we experience locally with a representative (but dumb) dag example which can show that dag file processing runtimes increase as the quantity of Dag Params increase.

I realize this dag may be a bit complex and so I've also included the visual representation of how the dags relate to each other and many of the Dag File Processing times for 2.2.3 and 2.6.2 when using various quantity of Dag Params in the client-specific dag definitions.

Code

import datetime
import random
import time
from typing import Any, Dict, List

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup


# in practice, this CLIENTS_METADATA comes from an Airflow Variable which is set by a different DAG which runs multiple times per day
# e.g. CLIENTS_METADATA: List[Dict[str, Any]] = Variable.get('CLIENTS_METADATA', deserialize_json=True, default_var=list())
# for the sake of an example, we'll create fake client metadata and allow the client qty to be configurable, but deterministic
client_qty = 150
CLIENTS_METADATA: List[Dict[str, Any]] = list()
for i in range(client_qty):
    # if even, set certain values
    if i // 2 == 0:
        client_group = "4"
        shared_resource_group = "us-01"
        client_size = "normal"
    else:
        client_group = "3"
        shared_resource_group = "us-02"
        client_size = "large"
    CLIENTS_METADATA.append(
        {
            "clientId": f"client{i}",
            "clientGroup": client_group,
            "sharedResourceGroup": shared_resource_group,
            "isDataIsolated": False,
            "resourceTags": {"size": client_size},
        }
    )

DE_STACK_NAME = "de-prod-us-01"


ARGS = {
    'owner': 'airflow',
    'start_date': datetime.datetime(2000, 1, 1, 0, 0, 0),
}


def task(**kwargs):
    # would normally receive dag params and use them in the job,
    # but this is a dumb example dag that doesn't actually do anything
    rand_int = random.randint(1, 10)
    print(f"Sleeping: {rand_int}s")
    time.sleep(rand_int)
    return rand_int


def build_abstract_dag(client: dict) -> DAG:
    """One copy of the graph per client"""
    client_id = client.get('clientId')
    client_group = client.get('clientGroup')
    resource_tags = client.get('resourceTags')
    resource_group = client.get('sharedResourceGroup')

    dag_id = 'client-specific-dag-{}'.format(client_id)

    dag = DAG(
        dag_id=dag_id,
        default_args=ARGS,
        schedule_interval=None,
        is_paused_upon_creation=False,
        catchup=False,
        # CHANGE THE QTY OF PARAMS HERE TO SEE THE IMPACT TO DAG PROCESSING RUNTIME
        params={
            'client_id': client_id,
            'client_group': client_group,
            'resource_tags': resource_tags,
            'resource_group': resource_group,
            'airflow_stack': DE_STACK_NAME,
        },
    )

    # mimic multiple sets of jobs
    qty_task_groups = 100
    for i in range(qty_task_groups):
        with TaskGroup(f"task_group_{i}", dag=dag):
            # mimic our generic job structure
            #   data generation job (t1)
            #   distribution jobs (t2 and t3)
            #   visualization or validation job (t4)
            t1 = PythonOperator(dag=dag, task_id="task_1", python_callable=task, retries=0)
            t2 = PythonOperator(dag=dag, task_id="task_2", python_callable=task, retries=0)
            t3 = PythonOperator(dag=dag, task_id="task_3", python_callable=task, retries=0)
            t4 = PythonOperator(dag=dag, task_id="task_4", python_callable=task, retries=0)

            [t3, t2] << t1
            t4 << t1

    return dag


# after all clients complete, go back and requeue anything which didn't succeed
# addresses network-related transient errors which aren't covered by default retry config
def retry_all_main_graph_tasks(retry_meta: list, **context):
    print(f"Retrying: {retry_meta}")


dag_meta = DAG(
    dag_id='a-trigger-dag',
    default_args=ARGS,
    dagrun_timeout=datetime.timedelta(hours=8),
    is_paused_upon_creation=True,
    catchup=False,
    schedule_interval='00 05 * * *',  # daily, at 0500 UTC
)

trigger_dag_tasks = list()
# List containing meta around the retry tasks and triggered dag per client
# Each list record is a dict {'trigger_task_id': <str>, 'dag': <Dag>}
retry_main_graph_tasks_meta = list()

# Create 1 Abstract DAG per client
for client_data in CLIENTS_METADATA:
    abstract_dag = build_abstract_dag(
        client=client_data,
    )
    trigger_dag_id = abstract_dag.dag_id
    globals()[trigger_dag_id] = abstract_dag

    trigger_operator = TriggerDagRunOperator(
        dag=dag_meta,
        task_id='trigger_{}'.format(trigger_dag_id),
        retries=1,
        trigger_dag_id=trigger_dag_id,
        wait_for_completion=True,
        # "success" or "failed" states means the client DAG was triggered successfully.
        # trigger_l3_main_{CLIENT_ID} task failures indicate the DAG was never triggered.
        allowed_states=[State.SUCCESS, State.FAILED],
        # https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/trigger_dagrun.html
        # we cannot set failed_states to None or empty list, as it will set failed_states = ['failed']
        # in the BaseOperator initialization. The code snippet:
        # `self.failed_states = failed_states or [State.FAILED]`
        failed_states=['not-a-state'],
        execution_date='{{ data_interval_start }}',
        execution_timeout=datetime.timedelta(hours=2),
        pool='main_graph_trigger',
    )
    trigger_dag_tasks.append(trigger_operator)
    retry_main_graph_tasks_meta.append(
        dict(
            trigger_task_id=trigger_operator.task_id,
            dag=abstract_dag,
        )
    )

retry_main_graph_tasks = PythonOperator(
    dag=dag_meta,
    task_id='retry_all_main_graph_tasks',
    retries=1,
    trigger_rule='all_done',
    op_kwargs={
        'retry_meta': retry_main_graph_tasks_meta,
    },
    pool='batch',
    python_callable=retry_all_main_graph_tasks,
)
retry_main_graph_tasks << trigger_dag_tasks

Creates:
airflow-sample-dag

Runtimes

  • client_qty = 1
Dag Param Qty Runtime (2.2.3) Runtime (2.6.2)
0 0.3s 1s
1 2s
2 3s
3 4s
4 5s
5 0.4s 6s
  • client_qty = 10
Dag Param Qty Runtime (2.2.3) Runtime (2.6.2)
0 0.3s 4s
1 20s
2 25s
3 30s
4 35s
5 3s 40s
  • client_qty = 150
Dag Param Qty Runtime (2.2.3) Runtime (2.6.2)
0 25s 50s
5 50s 900s (15m)

Operating System

Debian 11 (bullseye)

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

  • kubernetes 1.24
  • official helm chart 1.9.0
  • standalone dag processor
  • celery executor
  • postgres database (with pgbouncer)

Anything else

I've also recreated the same issue (with the sample code provided above) using the Airflow Docker Compose setup.

Our issue differs from #30593 and #30884 , as we are already on 2.6.x and use the default value (5s) for job_heartbeat_sec.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions