From ac4eb41bb94725750ba328ef8439de5d20be92ea Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Sun, 8 May 2022 13:47:07 -0700
Subject: [PATCH 01/10] For command DB clean, by not relying on the ORM models,
we will be able to use the command even when the metadatabase is not yet
upgraded to the version of Airflow you have installed.
Additionally we archive all rows before deletion.
---
airflow/utils/db_cleanup.py | 230 ++++++++++++++++++++++--------------
1 file changed, 140 insertions(+), 90 deletions(-)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index b02d08503f3a3..5c3db6ef6b472 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -23,36 +23,20 @@
import logging
from contextlib import AbstractContextManager
from dataclasses import dataclass
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional
from pendulum import DateTime
-from sqlalchemy import and_, false, func
+from sqlalchemy import and_, column, false, func, table, text
from sqlalchemy.exc import OperationalError, ProgrammingError
+from sqlalchemy.orm import Query, Session, aliased
from airflow.cli.simple_table import AirflowConsole
-from airflow.jobs.base_job import BaseJob
-from airflow.models import (
- Base,
- DagModel,
- DagRun,
- DbCallbackRequest,
- ImportError as models_ImportError,
- Log,
- RenderedTaskInstanceFields,
- SensorInstance,
- SlaMiss,
- TaskFail,
- TaskInstance,
- TaskReschedule,
- XCom,
-)
+from airflow.models import Base
from airflow.utils import timezone
+from airflow.utils.db import reflect_tables
from airflow.utils.session import NEW_SESSION, provide_session
-if TYPE_CHECKING:
- from sqlalchemy.orm import Query, Session
- from sqlalchemy.orm.attributes import InstrumentedAttribute
- from sqlalchemy.sql.schema import Column
+logger = logging.getLogger(__file__)
@dataclass
@@ -60,8 +44,9 @@ class _TableConfig:
"""
Config class for performing cleanup on a table
- :param orm_model: the table
- :param recency_column: date column to filter by
+ :param table_name: the table
+ :param extra_columns: any columns besides recency_column_name that we'll need in queries
+ :param recency_column_name: date column to filter by
:param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
:param keep_last_filters: the "keep last" functionality will preserve the most recent record
in the table. to ignore certain records even if they are the latest in the table, you can
@@ -71,20 +56,27 @@ class _TableConfig:
If False then the exception will go uncaught.
"""
- orm_model: Base
- recency_column: Union["Column", "InstrumentedAttribute"]
+ table_name: str
+ recency_column_name: str
+ extra_columns: List[str] = None
keep_last: bool = False
keep_last_filters: Optional[Any] = None
keep_last_group_by: Optional[Any] = None
warn_if_missing: bool = False
+ def __post_init__(self):
+ self.recency_column = column(self.recency_column_name)
+ self.orm_model: Base = table(
+ self.table_name, *[column(x) for x in self.extra_columns or []], self.recency_column
+ )
+
def __lt__(self, other):
- return self.orm_model.__tablename__ < other.orm_model.__tablename__
+ return self.table_name < other.table_name
@property
def readable_config(self):
return dict(
- table=self.orm_model.__tablename__,
+ table=self.orm_model.name,
recency_column=str(self.recency_column),
keep_last=self.keep_last,
keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
@@ -94,56 +86,46 @@ def readable_config(self):
config_list: List[_TableConfig] = [
- _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
- _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+ _TableConfig(table_name='job', recency_column_name='latest_heartbeat'),
+ _TableConfig(table_name='dag', recency_column_name='last_parsed_time'),
_TableConfig(
- orm_model=DagRun,
- recency_column=DagRun.start_date,
+ table_name='dag_run',
+ recency_column_name='start_date',
+ extra_columns=['dag_id', 'external_trigger'],
keep_last=True,
- keep_last_filters=[DagRun.external_trigger == false()],
- keep_last_group_by=DagRun.dag_id,
+ keep_last_filters=[column('external_trigger') == false()],
+ keep_last_group_by=['dag_id'],
),
- _TableConfig(orm_model=models_ImportError, recency_column=models_ImportError.timestamp),
- _TableConfig(orm_model=Log, recency_column=Log.dttm),
+ _TableConfig(table_name='import_error', recency_column_name='timestamp'),
+ _TableConfig(table_name='log', recency_column_name='dttm'),
+ # _TableConfig(table_name='rendered_task_instance_fields', recency_column_name='execution_date'),
_TableConfig(
- orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
- ),
- _TableConfig(
- orm_model=SensorInstance, recency_column=SensorInstance.updated_at
+ table_name='sensor_instance', recency_column_name='updated_at'
), # TODO: add FK to task instance / dag so we can remove here
- _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
- _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
- _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
- _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
- _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
- _TableConfig(orm_model=DbCallbackRequest, recency_column=XCom.timestamp),
+ _TableConfig(table_name='sla_miss', recency_column_name='timestamp'),
+ _TableConfig(table_name='task_fail', recency_column_name='start_date'),
+ _TableConfig(table_name='task_instance', recency_column_name='start_date'),
+ _TableConfig(table_name='task_reschedule', recency_column_name='start_date'),
+ _TableConfig(table_name='xcom', recency_column_name='timestamp'),
+ _TableConfig(table_name='callback_request', recency_column_name='created_at'),
+ _TableConfig(table_name='celery_taskmeta', recency_column_name='date_done', warn_if_missing=True),
+ _TableConfig(table_name='celery_tasksetmeta', recency_column_name='date_done', warn_if_missing=True),
]
-try:
- from celery.backends.database.models import Task, TaskSet
-
- config_list.extend(
- [
- _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
- _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
- ]
- )
-except ImportError:
- pass
-config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+config_dict: Dict[str, _TableConfig] = {x.orm_model.name: x for x in sorted(config_list)}
-def _print_entities(*, query: "Query", print_rows=False):
+def _check_for_rows(*, query: "Query", print_rows=False):
num_entities = query.count()
print(f"Found {num_entities} rows meeting deletion criteria.")
- if not print_rows:
- return
- max_rows_to_print = 100
- if num_entities > 0:
- print(f"Printing first {max_rows_to_print} rows.")
- logger.debug("print entities query: %s", query)
- for entry in query.limit(max_rows_to_print):
- print(entry.__dict__)
+ if print_rows:
+ max_rows_to_print = 100
+ if num_entities > 0:
+ print(f"Printing first {max_rows_to_print} rows.")
+ logger.debug("print entities query: %s", query)
+ for entry in query.limit(max_rows_to_print):
+ print(entry.__dict__)
+ return num_entities
def _do_delete(*, query, session):
@@ -154,21 +136,39 @@ def _do_delete(*, query, session):
print("Finished Performing Delete")
-def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
- subquery = session.query(func.max(recency_column))
+def _subquery_keep_last(*, recency_column, keep_last_filters, group_by_columns, max_date_colname, session):
+ subquery = session.query(*group_by_columns, func.max(recency_column).label(max_date_colname))
if keep_last_filters is not None:
for entry in keep_last_filters:
subquery = subquery.filter(entry)
- if keep_last_group_by is not None:
- subquery = subquery.group_by(keep_last_group_by)
+ if group_by_columns is not None:
+ subquery = subquery.group_by(*group_by_columns)
+
+ return subquery.subquery(name='latest')
+
+
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy.sql.expression import ClauseElement, Executable, tuple_
+
+
+class CreateTableAs(Executable, ClauseElement):
+ """Custom sqlalchemy clause element for CTAS operations."""
+
+ def __init__(self, name, query):
+ self.name = name
+ self.query = query
- # We nest this subquery to work around a MySQL "table specified twice" issue
- # See https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
- # and https://github.com/teamclairvoyant/airflow-maintenance-dags/pull/57/files.
- subquery = subquery.from_self()
- return subquery
+
+@compiles(CreateTableAs)
+def _create_table_as(element, compiler, **kw):
+ return f"CREATE TABLE {element.name} AS {compiler.process(element.query)}"
+
+
+@compiles(CreateTableAs, 'mssql')
+def _create_table_as(element, compiler, **kw):
+ return f"WITH cte AS ( {compiler.process(element.query)} ) SELECT * INTO {element.name} FROM cte"
def _build_query(
@@ -182,23 +182,33 @@ def _build_query(
session,
**kwargs,
):
- query = session.query(orm_model)
- conditions = [recency_column < clean_before_timestamp]
+ base_table_alias = 'base'
+ base_table = aliased(orm_model, name=base_table_alias)
+ query = session.query(base_table).with_entities(text(f"{base_table_alias}.*"))
+ base_table_recency_col = base_table.c[recency_column.name]
+ conditions = [base_table_recency_col < clean_before_timestamp]
if keep_last:
+ max_date_col_name = 'max_date_per_group'
+ group_by_columns = [column(x) for x in keep_last_group_by]
subquery = _subquery_keep_last(
recency_column=recency_column,
keep_last_filters=keep_last_filters,
- keep_last_group_by=keep_last_group_by,
+ group_by_columns=group_by_columns,
+ max_date_colname=max_date_col_name,
session=session,
)
- conditions.append(recency_column.notin_(subquery))
+ query = query.select_from(base_table).outerjoin(
+ subquery,
+ and_(
+ *[base_table.c[x] == subquery.c[x] for x in keep_last_group_by],
+ base_table_recency_col == column(max_date_col_name),
+ ),
+ )
+ conditions.append(column(max_date_col_name).is_(None))
query = query.filter(and_(*conditions))
return query
-logger = logging.getLogger(__file__)
-
-
def _cleanup_table(
*,
orm_model,
@@ -214,7 +224,7 @@ def _cleanup_table(
):
print()
if dry_run:
- print(f"Performing dry run for table {orm_model.__tablename__!r}")
+ print(f"Performing dry run for table {orm_model.name}")
query = _build_query(
orm_model=orm_model,
recency_column=recency_column,
@@ -224,12 +234,47 @@ def _cleanup_table(
clean_before_timestamp=clean_before_timestamp,
session=session,
)
+ logger.debug("old rows query:\n%s", query.selectable.compile())
+ print(f"Checking table {orm_model.name}")
+ num_rows = _check_for_rows(query=query, print_rows=False)
+
+ if num_rows and not dry_run:
+ import re
+ from datetime import datetime
+
+ # create a new table and copy the rows there
+ timestamp_str = re.sub(r'[^\d]', '', datetime.utcnow().isoformat())[:14]
+ target_table_name = f'_airflow_deleted__{orm_model.name}__{timestamp_str}'
+ print(f"Moving data to table {target_table_name}")
+ stmt = CreateTableAs(target_table_name, query.selectable)
+ logger.debug("ctas query:\n%s", stmt.compile())
+ session.execute(stmt)
+ session.commit()
- _print_entities(query=query, print_rows=False)
+ # delete the rows from the old table
+ metadata = reflect_tables([orm_model.name, target_table_name], session)
+ source_table = metadata.tables[orm_model.name]
+ target_table = metadata.tables[target_table_name]
+ logger.debug("rows moved; purging from %s", source_table.name)
+ bind = session.get_bind()
+ dialect_name = bind.dialect.name
+ if dialect_name == 'sqlite':
+ pk_cols = source_table.primary_key.columns
+ delete = source_table.delete().where(
+ tuple_(*pk_cols).in_(
+ session.query(
+ *[target_table.c[x.name] for x in source_table.primary_key.columns]
+ ).subquery()
+ )
+ )
+ else:
+ delete = source_table.delete().where(
+ and_(col == target_table.c[col.name] for col in source_table.primary_key.columns)
+ )
+ logger.debug("delete statement:\n%s", delete.compile())
+ session.execute(delete)
- if not dry_run:
- _do_delete(query=query, session=session)
- session.commit()
+ session.commit()
def _confirm_delete(*, date: DateTime, tables: List[str]):
@@ -255,6 +300,7 @@ class _warn_if_missing(AbstractContextManager):
def __init__(self, table, suppress):
self.table = table
self.suppress = suppress
+ self.excinst = None
def __enter__(self):
return self
@@ -262,7 +308,8 @@ def __enter__(self):
def __exit__(self, exctype, excinst, exctb):
caught_error = exctype is not None and issubclass(exctype, (OperationalError, ProgrammingError))
if caught_error:
- logger.warning("Table %r not found. Skipping.", self.table)
+ logger.warning("Table %s not found. Skipping.", self.table)
+ self.excinst = excinst
return caught_error
@@ -307,7 +354,7 @@ def run_cleanup(
if not dry_run and confirm:
_confirm_delete(date=clean_before_timestamp, tables=list(effective_config_dict.keys()))
for table_name, table_config in effective_config_dict.items():
- with _warn_if_missing(table_name, table_config.warn_if_missing):
+ with _warn_if_missing(table_name, table_config.warn_if_missing) as suppress_ctx:
_cleanup_table(
clean_before_timestamp=clean_before_timestamp,
dry_run=dry_run,
@@ -315,3 +362,6 @@ def run_cleanup(
**table_config.__dict__,
session=session,
)
+ session.commit()
+ if suppress_ctx.excinst is not None:
+ session.rollback()
From 80fe0fe2b4d61f3c0151837a8085c9e24b30e473 Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Mon, 23 May 2022 10:46:09 -0700
Subject: [PATCH 02/10] add tests
---
airflow/cli/cli_parser.py | 6 ++++++
airflow/cli/commands/db_command.py | 1 +
airflow/utils/db_cleanup.py | 7 +++++++
docs/apache-airflow/usage-cli.rst | 2 ++
tests/cli/commands/test_db_command.py | 28 ++++++++++++++++++++++++++-
tests/utils/test_db_cleanup.py | 21 ++++++++++++++++++++
6 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 60c77c7a37feb..0742417ae922a 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -434,6 +434,11 @@ def string_lower_type(val):
help="Perform a dry run",
action="store_true",
)
+ARG_DB_SKIP_ARCHIVE = Arg(
+ ("--skip-archive",),
+ help="Don't preserve purged records in an archive table.",
+ action="store_true",
+)
# pool
@@ -1452,6 +1457,7 @@ class GroupCommand(NamedTuple):
ARG_DB_CLEANUP_TIMESTAMP,
ARG_VERBOSE,
ARG_YES,
+ ARG_DB_SKIP_ARCHIVE,
),
),
)
diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py
index c9201ad59ba80..5f6a84c8a420c 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -198,4 +198,5 @@ def cleanup_tables(args):
clean_before_timestamp=args.clean_before_timestamp,
verbose=args.verbose,
confirm=not args.yes,
+ skip_archive=args.skip_archive,
)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 5c3db6ef6b472..52f1314c033f3 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -219,6 +219,7 @@ def _cleanup_table(
clean_before_timestamp,
dry_run=True,
verbose=False,
+ skip_archive=False,
session=None,
**kwargs,
):
@@ -273,6 +274,9 @@ def _cleanup_table(
)
logger.debug("delete statement:\n%s", delete.compile())
session.execute(delete)
+ session.commit()
+ if skip_archive:
+ target_table.drop()
session.commit()
@@ -321,6 +325,7 @@ def run_cleanup(
dry_run: bool = False,
verbose: bool = False,
confirm: bool = True,
+ skip_archive: bool = False,
session: 'Session' = NEW_SESSION,
):
"""
@@ -339,6 +344,7 @@ def run_cleanup(
:param dry_run: If true, print rows meeting deletion criteria
:param verbose: If true, may provide more detailed output.
:param confirm: Require user input to confirm before processing deletions.
+ :param skip_archive: Set to True if you don't want the purged rows preservied in an archive table.
:param session: Session representing connection to the metadata database.
"""
clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)
@@ -360,6 +366,7 @@ def run_cleanup(
dry_run=dry_run,
verbose=verbose,
**table_config.__dict__,
+ skip_archive=skip_archive,
session=session,
)
session.commit()
diff --git a/docs/apache-airflow/usage-cli.rst b/docs/apache-airflow/usage-cli.rst
index 0e7b1b5455171..c14efacb1d2ff 100644
--- a/docs/apache-airflow/usage-cli.rst
+++ b/docs/apache-airflow/usage-cli.rst
@@ -215,6 +215,8 @@ You can optionally provide a list of tables to perform deletes on. If no list of
You can use the ``--dry-run`` option to print the row counts in the primary tables to be cleaned.
+By default, ``db clean`` will archive purged rows in tables of the form ``_airflow_deleted__
__``. If you don't want the data preserved in this way, you may supply argument ``--skip-archive``.
+
Beware cascading deletes
^^^^^^^^^^^^^^^^^^^^^^^^
diff --git a/tests/cli/commands/test_db_command.py b/tests/cli/commands/test_db_command.py
index 125e5d7c3e28d..63d963202fdb1 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -318,7 +318,7 @@ def test_date_timezone_supplied(self, run_cleanup_mock, timezone):
@patch('airflow.cli.commands.db_command.run_cleanup')
def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
"""
- When tz included in the string then default timezone should not be used.
+ When ``-y`` provided, ``confirm`` should be false.
"""
args = self.parser.parse_args(
[
@@ -339,6 +339,32 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
confirm=expected,
)
+ @pytest.mark.parametrize('extra_arg, expected', [(['--skip-archive'], True), ([], False)])
+ @patch('airflow.cli.commands.db_command.run_cleanup')
+ def test_confirm(self, run_cleanup_mock, extra_arg, expected):
+ """
+ When ``--skip-archive`` provided, ``skip_archive`` should be True (False otherwise).
+ """
+ args = self.parser.parse_args(
+ [
+ 'db',
+ 'clean',
+ '--clean-before-timestamp',
+ '2021-01-01',
+ *extra_arg,
+ ]
+ )
+ db_command.cleanup_tables(args)
+
+ run_cleanup_mock.assert_called_once_with(
+ table_names=None,
+ dry_run=False,
+ clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
+ verbose=False,
+ confirm=True,
+ skip_archive=expected,
+ )
+
@pytest.mark.parametrize('dry_run_arg, expected', [(['--dry-run'], True), ([], False)])
@patch('airflow.cli.commands.db_command.run_cleanup')
def test_dry_run(self, run_cleanup_mock, dry_run_arg, expected):
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 8d227df6e5ee9..972e419a89673 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -68,6 +68,27 @@ def test_run_cleanup_confirm(self, confirm_delete_mock, kwargs, called):
else:
confirm_delete_mock.assert_not_called()
+ @pytest.mark.parametrize(
+ 'kwargs, should_skip',
+ [
+ param(dict(skip_archive=True), True, id='true'),
+ param(dict(), False, id='not supplied'),
+ param(dict(skip_archive=False), False, id='false'),
+ ],
+ )
+ @patch('airflow.utils.db_cleanup._cleanup_table')
+ def test_run_cleanup_skip_archive(self, cleanup_table_mock, kwargs, should_skip):
+ """test that delete confirmation input is called when appropriate"""
+ run_cleanup(
+ clean_before_timestamp=None,
+ table_names=['log'],
+ dry_run=None,
+ verbose=None,
+ confirm=False,
+ **kwargs,
+ )
+ assert cleanup_table_mock.call_args[1]['skip_archive'] is should_skip
+
@pytest.mark.parametrize(
'table_names',
[
From 57d633a6f19fff58299e0e657dc1aa3401aaef16 Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Fri, 27 May 2022 09:21:41 -0700
Subject: [PATCH 03/10] move delete logic back to _do_delete
---
airflow/utils/db.py | 4 +-
airflow/utils/db_cleanup.py | 84 +++++++++++++--------------
tests/cli/commands/test_db_command.py | 6 ++
tests/test_utils/db.py | 10 +++-
tests/utils/test_db_cleanup.py | 23 +++++---
5 files changed, 74 insertions(+), 53 deletions(-)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index d7c5b77c9c8b4..362c7d08e1260 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -890,12 +890,14 @@ def reflect_tables(tables: List[Union[Base, str]], session):
metadata = sqlalchemy.schema.MetaData(session.bind)
- for tbl in tables:
+ for tbl in tables or []:
try:
table_name = tbl if isinstance(tbl, str) else tbl.__tablename__
metadata.reflect(only=[table_name], extend_existing=True, resolve_fks=False)
except exc.InvalidRequestError:
continue
+ if not tables:
+ metadata.reflect(resolve_fks=False)
return metadata
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 52f1314c033f3..d5c56e42e667c 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -128,10 +128,44 @@ def _check_for_rows(*, query: "Query", print_rows=False):
return num_entities
-def _do_delete(*, query, session):
+def _do_delete(*, query, orm_model, skip_archive, session):
+ import re
+ from datetime import datetime
+
print("Performing Delete...")
# using bulk delete
- query.delete(synchronize_session=False)
+ # create a new table and copy the rows there
+ timestamp_str = re.sub(r'[^\d]', '', datetime.utcnow().isoformat())[:14]
+ target_table_name = f'_airflow_deleted__{orm_model.name}__{timestamp_str}'
+ print(f"Moving data to table {target_table_name}")
+ stmt = CreateTableAs(target_table_name, query.selectable)
+ logger.debug("ctas query:\n%s", stmt.compile())
+ session.execute(stmt)
+ session.commit()
+
+ # delete the rows from the old table
+ metadata = reflect_tables([orm_model.name, target_table_name], session)
+ source_table = metadata.tables[orm_model.name]
+ target_table = metadata.tables[target_table_name]
+ logger.debug("rows moved; purging from %s", source_table.name)
+ bind = session.get_bind()
+ dialect_name = bind.dialect.name
+ if dialect_name == 'sqlite':
+ pk_cols = source_table.primary_key.columns
+ delete = source_table.delete().where(
+ tuple_(*pk_cols).in_(
+ session.query(*[target_table.c[x.name] for x in source_table.primary_key.columns]).subquery()
+ )
+ )
+ else:
+ delete = source_table.delete().where(
+ and_(col == target_table.c[col.name] for col in source_table.primary_key.columns)
+ )
+ logger.debug("delete statement:\n%s", delete.compile())
+ session.execute(delete)
+ session.commit()
+ if skip_archive:
+ target_table.drop()
session.commit()
print("Finished Performing Delete")
@@ -162,12 +196,12 @@ def __init__(self, name, query):
@compiles(CreateTableAs)
-def _create_table_as(element, compiler, **kw):
+def _compile_create_table_as(element, compiler, **kw):
return f"CREATE TABLE {element.name} AS {compiler.process(element.query)}"
@compiles(CreateTableAs, 'mssql')
-def _create_table_as(element, compiler, **kw):
+def _compile_create_table_as(element, compiler, **kw):
return f"WITH cte AS ( {compiler.process(element.query)} ) SELECT * INTO {element.name} FROM cte"
@@ -240,43 +274,7 @@ def _cleanup_table(
num_rows = _check_for_rows(query=query, print_rows=False)
if num_rows and not dry_run:
- import re
- from datetime import datetime
-
- # create a new table and copy the rows there
- timestamp_str = re.sub(r'[^\d]', '', datetime.utcnow().isoformat())[:14]
- target_table_name = f'_airflow_deleted__{orm_model.name}__{timestamp_str}'
- print(f"Moving data to table {target_table_name}")
- stmt = CreateTableAs(target_table_name, query.selectable)
- logger.debug("ctas query:\n%s", stmt.compile())
- session.execute(stmt)
- session.commit()
-
- # delete the rows from the old table
- metadata = reflect_tables([orm_model.name, target_table_name], session)
- source_table = metadata.tables[orm_model.name]
- target_table = metadata.tables[target_table_name]
- logger.debug("rows moved; purging from %s", source_table.name)
- bind = session.get_bind()
- dialect_name = bind.dialect.name
- if dialect_name == 'sqlite':
- pk_cols = source_table.primary_key.columns
- delete = source_table.delete().where(
- tuple_(*pk_cols).in_(
- session.query(
- *[target_table.c[x.name] for x in source_table.primary_key.columns]
- ).subquery()
- )
- )
- else:
- delete = source_table.delete().where(
- and_(col == target_table.c[col.name] for col in source_table.primary_key.columns)
- )
- logger.debug("delete statement:\n%s", delete.compile())
- session.execute(delete)
- session.commit()
- if skip_archive:
- target_table.drop()
+ _do_delete(query=query, orm_model=orm_model, skip_archive=skip_archive, session=session)
session.commit()
@@ -311,8 +309,8 @@ def __enter__(self):
def __exit__(self, exctype, excinst, exctb):
caught_error = exctype is not None and issubclass(exctype, (OperationalError, ProgrammingError))
- if caught_error:
- logger.warning("Table %s not found. Skipping.", self.table)
+ if caught_error and self.table in getattr(excinst, 'message', ''):
+ logger.warning("Table %s not found. Skipping. %s", self.table, excinst)
self.excinst = excinst
return caught_error
diff --git a/tests/cli/commands/test_db_command.py b/tests/cli/commands/test_db_command.py
index 63d963202fdb1..5124ba963ae5b 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -293,6 +293,7 @@ def test_date_timezone_omitted(self, run_cleanup_mock, timezone):
clean_before_timestamp=pendulum.parse(timestamp, tz=timezone),
verbose=False,
confirm=False,
+ skip_archive=False,
)
@pytest.mark.parametrize('timezone', ['UTC', 'Europe/Berlin', 'America/Los_Angeles'])
@@ -312,6 +313,7 @@ def test_date_timezone_supplied(self, run_cleanup_mock, timezone):
clean_before_timestamp=pendulum.parse(timestamp),
verbose=False,
confirm=False,
+ skip_archive=False,
)
@pytest.mark.parametrize('confirm_arg, expected', [(['-y'], False), ([], True)])
@@ -337,6 +339,7 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
verbose=False,
confirm=expected,
+ skip_archive=False,
)
@pytest.mark.parametrize('extra_arg, expected', [(['--skip-archive'], True), ([], False)])
@@ -388,6 +391,7 @@ def test_dry_run(self, run_cleanup_mock, dry_run_arg, expected):
clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
verbose=False,
confirm=True,
+ skip_archive=False,
)
@pytest.mark.parametrize(
@@ -415,6 +419,7 @@ def test_tables(self, run_cleanup_mock, extra_args, expected):
clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
verbose=False,
confirm=True,
+ skip_archive=False,
)
@pytest.mark.parametrize('extra_args, expected', [(['--verbose'], True), ([], False)])
@@ -440,4 +445,5 @@ def test_verbose(self, run_cleanup_mock, extra_args, expected):
clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
verbose=expected,
confirm=True,
+ skip_archive=False,
)
diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py
index b7502fc52bfba..ae4a1d6598679 100644
--- a/tests/test_utils/db.py
+++ b/tests/test_utils/db.py
@@ -38,7 +38,7 @@
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel
from airflow.security.permissions import RESOURCE_DAG_PREFIX
-from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections
+from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections, reflect_tables
from airflow.utils.session import create_session
from airflow.www.fab_security.sqla.models import Permission, Resource, assoc_permission_role
@@ -57,6 +57,14 @@ def clear_db_dags():
session.query(DagModel).delete()
+def drop_tables_with_prefix(prefix):
+ with create_session() as session:
+ metadata = reflect_tables(None, session)
+ for table_name, table in metadata.tables.items():
+ if table_name.startswith(prefix):
+ table.drop()
+
+
def clear_db_serialized_dags():
with create_session() as session:
session.query(SerializedDagModel).delete()
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 972e419a89673..e335cdb251afb 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -30,7 +30,7 @@
from airflow.operators.python import PythonOperator
from airflow.utils.db_cleanup import _build_query, _cleanup_table, config_dict, run_cleanup
from airflow.utils.session import create_session
-from tests.test_utils.db import clear_db_dags, clear_db_runs
+from tests.test_utils.db import clear_db_dags, clear_db_runs, drop_tables_with_prefix
@pytest.fixture(autouse=True)
@@ -44,6 +44,10 @@ def clean_database():
class TestDBCleanup:
+ @pytest.fixture(autouse=True)
+ def clear_airflow_tables(self):
+ drop_tables_with_prefix('_airflow_')
+
@pytest.mark.parametrize(
'kwargs, called',
[
@@ -116,12 +120,14 @@ def test_run_cleanup_tables(self, clean_table_mock, table_names):
[None, True, False],
)
@patch('airflow.utils.db_cleanup._build_query', MagicMock())
- @patch('airflow.utils.db_cleanup._print_entities', MagicMock())
- @patch('airflow.utils.db_cleanup._do_delete')
@patch('airflow.utils.db_cleanup._confirm_delete', MagicMock())
- def test_run_cleanup_dry_run(self, do_delete, dry_run):
+ @patch('airflow.utils.db_cleanup._check_for_rows')
+ @patch('airflow.utils.db_cleanup._do_delete')
+ def test_run_cleanup_dry_run(self, do_delete, check_rows_mock, dry_run):
"""Delete should only be called when not dry_run"""
+ check_rows_mock.return_value = 10
base_kwargs = dict(
+ table_names=['log'],
clean_before_timestamp=None,
dry_run=dry_run,
verbose=None,
@@ -156,7 +162,7 @@ def test__build_query(self, table_name, date_add_kwargs, expected_to_delete, ext
dag run is kept.
"""
- base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('America/Los_Angeles'))
+ base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('UTC'))
create_tis(
base_date=base_date,
num_tis=10,
@@ -196,7 +202,7 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, e
associated dag runs should remain.
"""
- base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('America/Los_Angeles'))
+ base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('UTC'))
num_tis = 10
create_tis(
base_date=base_date,
@@ -210,13 +216,14 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, e
clean_before_timestamp=clean_before_date,
dry_run=False,
session=session,
+ table_names=['dag_run', 'task_instance'],
)
model = config_dict[table_name].orm_model
expected_remaining = num_tis - expected_to_delete
assert len(session.query(model).all()) == expected_remaining
- if model == TaskInstance:
+ if model.name == 'task_instance':
assert len(session.query(DagRun).all()) == num_tis
- elif model == DagRun:
+ elif model.name == 'dag_run':
assert len(session.query(TaskInstance).all()) == expected_remaining
else:
raise Exception("unexpected")
From a10d0951262d23cc45225653b98ad39e2de33d96 Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Tue, 7 Jun 2022 15:29:23 -0700
Subject: [PATCH 04/10] remove RTIF
---
airflow/utils/db_cleanup.py | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index d5c56e42e667c..0a0eb2bee974c 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -98,10 +98,8 @@ def readable_config(self):
),
_TableConfig(table_name='import_error', recency_column_name='timestamp'),
_TableConfig(table_name='log', recency_column_name='dttm'),
- # _TableConfig(table_name='rendered_task_instance_fields', recency_column_name='execution_date'),
- _TableConfig(
- table_name='sensor_instance', recency_column_name='updated_at'
- ), # TODO: add FK to task instance / dag so we can remove here
+ _TableConfig(table_name='rendered_task_instance_fields', recency_column_name='execution_date'),
+ _TableConfig(table_name='sensor_instance', recency_column_name='updated_at'),
_TableConfig(table_name='sla_miss', recency_column_name='timestamp'),
_TableConfig(table_name='task_fail', recency_column_name='start_date'),
_TableConfig(table_name='task_instance', recency_column_name='start_date'),
From ee97c01f73200ca89aa6486c9716c18d8d347c65 Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Tue, 7 Jun 2022 16:17:52 -0700
Subject: [PATCH 05/10] be more precise about logging for non-existence, so
that we can bubble up unexpected warnings
---
airflow/utils/db.py | 2 +-
airflow/utils/db_cleanup.py | 22 ++++++++++++++++------
2 files changed, 17 insertions(+), 7 deletions(-)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 362c7d08e1260..defc187fe6418 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -879,7 +879,7 @@ def check_conn_id_duplicates(session: Session) -> Iterable[str]:
)
-def reflect_tables(tables: List[Union[Base, str]], session):
+def reflect_tables(tables: Optional[List[Union[Base, str]]], session):
"""
When running checks prior to upgrades, we use reflection to determine current state of the
database.
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 0a0eb2bee974c..67ab6c382c1fd 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -296,10 +296,14 @@ def _print_config(*, configs: Dict[str, _TableConfig]):
AirflowConsole().print_as_table(data=data)
-class _warn_if_missing(AbstractContextManager):
- def __init__(self, table, suppress):
+class _suppress_with_logging(AbstractContextManager):
+ """
+ Suppresses errors but logs them.
+ Also stores the exception instance so it can be referred to after exiting context.
+ """
+
+ def __init__(self, table):
self.table = table
- self.suppress = suppress
self.excinst = None
def __enter__(self):
@@ -307,8 +311,9 @@ def __enter__(self):
def __exit__(self, exctype, excinst, exctb):
caught_error = exctype is not None and issubclass(exctype, (OperationalError, ProgrammingError))
- if caught_error and self.table in getattr(excinst, 'message', ''):
- logger.warning("Table %s not found. Skipping. %s", self.table, excinst)
+ if caught_error:
+ logger.warning("Encountered error when attempting to clean table '%s'. ", self.table)
+ logger.debug("Traceback for table '%s'", self.table, exc_info=True)
self.excinst = excinst
return caught_error
@@ -355,8 +360,12 @@ def run_cleanup(
_print_config(configs=effective_config_dict)
if not dry_run and confirm:
_confirm_delete(date=clean_before_timestamp, tables=list(effective_config_dict.keys()))
+ existing_tables = reflect_tables(tables=None, session=session).tables
for table_name, table_config in effective_config_dict.items():
- with _warn_if_missing(table_name, table_config.warn_if_missing) as suppress_ctx:
+ if table_name not in existing_tables:
+ logger.warning("Table %s not found. Skipping.", table_name)
+ continue
+ with _suppress_with_logging(table_name) as suppress_ctx:
_cleanup_table(
clean_before_timestamp=clean_before_timestamp,
dry_run=dry_run,
@@ -367,4 +376,5 @@ def run_cleanup(
)
session.commit()
if suppress_ctx.excinst is not None:
+ logger.debug('Rolling back transaction')
session.rollback()
From 30b315fd48216788a0861d854ab47564c1ad642a Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Wed, 8 Jun 2022 08:50:07 -0700
Subject: [PATCH 06/10] use None instead of truthy
---
airflow/utils/db.py | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index defc187fe6418..b4ef33043b3d9 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -890,14 +890,15 @@ def reflect_tables(tables: Optional[List[Union[Base, str]]], session):
metadata = sqlalchemy.schema.MetaData(session.bind)
- for tbl in tables or []:
- try:
- table_name = tbl if isinstance(tbl, str) else tbl.__tablename__
- metadata.reflect(only=[table_name], extend_existing=True, resolve_fks=False)
- except exc.InvalidRequestError:
- continue
- if not tables:
+ if tables is None:
metadata.reflect(resolve_fks=False)
+ else:
+ for tbl in tables:
+ try:
+ table_name = tbl if isinstance(tbl, str) else tbl.__tablename__
+ metadata.reflect(only=[table_name], extend_existing=True, resolve_fks=False)
+ except exc.InvalidRequestError:
+ continue
return metadata
From 2c190a8366ec57c31e3abde1d8cec09c989ea8d1 Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Wed, 8 Jun 2022 14:43:08 -0700
Subject: [PATCH 07/10] fix tests
---
airflow/utils/db_cleanup.py | 12 +++++-------
tests/cli/commands/test_db_command.py | 2 +-
2 files changed, 6 insertions(+), 8 deletions(-)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 67ab6c382c1fd..b0a1ec1fd48ba 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -28,7 +28,9 @@
from pendulum import DateTime
from sqlalchemy import and_, column, false, func, table, text
from sqlalchemy.exc import OperationalError, ProgrammingError
+from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import Query, Session, aliased
+from sqlalchemy.sql.expression import ClauseElement, Executable, tuple_
from airflow.cli.simple_table import AirflowConsole
from airflow.models import Base
@@ -58,7 +60,7 @@ class _TableConfig:
table_name: str
recency_column_name: str
- extra_columns: List[str] = None
+ extra_columns: Optional[List[str]] = None
keep_last: bool = False
keep_last_filters: Optional[Any] = None
keep_last_group_by: Optional[Any] = None
@@ -181,10 +183,6 @@ def _subquery_keep_last(*, recency_column, keep_last_filters, group_by_columns,
return subquery.subquery(name='latest')
-from sqlalchemy.ext.compiler import compiles
-from sqlalchemy.sql.expression import ClauseElement, Executable, tuple_
-
-
class CreateTableAs(Executable, ClauseElement):
"""Custom sqlalchemy clause element for CTAS operations."""
@@ -194,12 +192,12 @@ def __init__(self, name, query):
@compiles(CreateTableAs)
-def _compile_create_table_as(element, compiler, **kw):
+def _compile_create_table_as__other(element, compiler, **kw):
return f"CREATE TABLE {element.name} AS {compiler.process(element.query)}"
@compiles(CreateTableAs, 'mssql')
-def _compile_create_table_as(element, compiler, **kw):
+def _compile_create_table_as__mssql(element, compiler, **kw):
return f"WITH cte AS ( {compiler.process(element.query)} ) SELECT * INTO {element.name} FROM cte"
diff --git a/tests/cli/commands/test_db_command.py b/tests/cli/commands/test_db_command.py
index 5124ba963ae5b..e6e93f6a1c946 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -344,7 +344,7 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
@pytest.mark.parametrize('extra_arg, expected', [(['--skip-archive'], True), ([], False)])
@patch('airflow.cli.commands.db_command.run_cleanup')
- def test_confirm(self, run_cleanup_mock, extra_arg, expected):
+ def test_skip_archive(self, run_cleanup_mock, extra_arg, expected):
"""
When ``--skip-archive`` provided, ``skip_archive`` should be True (False otherwise).
"""
From db41188ee235833b6a2f358116190255b62ba410 Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Thu, 9 Jun 2022 08:12:03 -0700
Subject: [PATCH 08/10] remove warn if missing param
---
airflow/utils/db_cleanup.py | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index b0a1ec1fd48ba..3459f1cdfc7a1 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -54,8 +54,6 @@ class _TableConfig:
in the table. to ignore certain records even if they are the latest in the table, you can
supply additional filters here (e.g. externally triggered dag runs)
:param keep_last_group_by: if keeping the last record, can keep the last record for each group
- :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
- If False then the exception will go uncaught.
"""
table_name: str
@@ -64,7 +62,6 @@ class _TableConfig:
keep_last: bool = False
keep_last_filters: Optional[Any] = None
keep_last_group_by: Optional[Any] = None
- warn_if_missing: bool = False
def __post_init__(self):
self.recency_column = column(self.recency_column_name)
@@ -83,7 +80,6 @@ def readable_config(self):
keep_last=self.keep_last,
keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
keep_last_group_by=str(self.keep_last_group_by),
- warn_if_missing=str(self.warn_if_missing),
)
@@ -108,8 +104,8 @@ def readable_config(self):
_TableConfig(table_name='task_reschedule', recency_column_name='start_date'),
_TableConfig(table_name='xcom', recency_column_name='timestamp'),
_TableConfig(table_name='callback_request', recency_column_name='created_at'),
- _TableConfig(table_name='celery_taskmeta', recency_column_name='date_done', warn_if_missing=True),
- _TableConfig(table_name='celery_tasksetmeta', recency_column_name='date_done', warn_if_missing=True),
+ _TableConfig(table_name='celery_taskmeta', recency_column_name='date_done'),
+ _TableConfig(table_name='celery_tasksetmeta', recency_column_name='date_done'),
]
config_dict: Dict[str, _TableConfig] = {x.orm_model.name: x for x in sorted(config_list)}
From 774608d5f08f204169601c45c6b193d54140079d Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Thu, 9 Jun 2022 08:22:47 -0700
Subject: [PATCH 09/10] simpler context mgr
---
airflow/utils/db_cleanup.py | 33 ++++++++++++---------------------
1 file changed, 12 insertions(+), 21 deletions(-)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 3459f1cdfc7a1..f77ae52a60826 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -21,7 +21,7 @@
"""
import logging
-from contextlib import AbstractContextManager
+from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
@@ -290,26 +290,20 @@ def _print_config(*, configs: Dict[str, _TableConfig]):
AirflowConsole().print_as_table(data=data)
-class _suppress_with_logging(AbstractContextManager):
+@contextmanager
+def _suppress_with_logging(table, session):
"""
Suppresses errors but logs them.
Also stores the exception instance so it can be referred to after exiting context.
"""
-
- def __init__(self, table):
- self.table = table
- self.excinst = None
-
- def __enter__(self):
- return self
-
- def __exit__(self, exctype, excinst, exctb):
- caught_error = exctype is not None and issubclass(exctype, (OperationalError, ProgrammingError))
- if caught_error:
- logger.warning("Encountered error when attempting to clean table '%s'. ", self.table)
- logger.debug("Traceback for table '%s'", self.table, exc_info=True)
- self.excinst = excinst
- return caught_error
+ try:
+ yield
+ except (OperationalError, ProgrammingError):
+ logger.warning("Encountered error when attempting to clean table '%s'. ", table)
+ logger.debug("Traceback for table '%s'", table, exc_info=True)
+ if session.is_active:
+ logger.debug('Rolling back transaction')
+ session.rollback()
@provide_session
@@ -359,7 +353,7 @@ def run_cleanup(
if table_name not in existing_tables:
logger.warning("Table %s not found. Skipping.", table_name)
continue
- with _suppress_with_logging(table_name) as suppress_ctx:
+ with _suppress_with_logging(table_name, session):
_cleanup_table(
clean_before_timestamp=clean_before_timestamp,
dry_run=dry_run,
@@ -369,6 +363,3 @@ def run_cleanup(
session=session,
)
session.commit()
- if suppress_ctx.excinst is not None:
- logger.debug('Rolling back transaction')
- session.rollback()
From 0760e6b02a81c9a7e7d5b7cdf5e02ed848bd398a Mon Sep 17 00:00:00 2001
From: Daniel Standish <15932138+dstandish@users.noreply.github.com>
Date: Tue, 14 Jun 2022 11:20:21 -0700
Subject: [PATCH 10/10] add newsfragment
---
newsfragments/23574.feature.rst | 1 +
1 file changed, 1 insertion(+)
create mode 100644 newsfragments/23574.feature.rst
diff --git a/newsfragments/23574.feature.rst b/newsfragments/23574.feature.rst
new file mode 100644
index 0000000000000..805b7b18bd1dc
--- /dev/null
+++ b/newsfragments/23574.feature.rst
@@ -0,0 +1 @@
+Command ``airflow db clean`` now archives data before purging.