Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,6 @@ def initialize():
#
# DASHBOARD_UIALERTS: List["UIAlert"]
DASHBOARD_UIALERTS = []

# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
164 changes: 104 additions & 60 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import time
from typing import Iterable

from sqlalchemy import Table, exc, func
from sqlalchemy import Table, exc, func, inspect, or_, text

from airflow import settings
from airflow.configuration import conf
Expand Down Expand Up @@ -51,15 +51,15 @@
from airflow.models.serialized_dag import SerializedDagModel # noqa: F401

# TODO: remove create_session once we decide to break backward compatibility
from airflow.utils.session import ( # noqa: F401 # pylint: disable=unused-import
create_global_lock,
create_session,
provide_session,
)
from airflow.utils.session import create_global_lock, create_session, provide_session # noqa: F401

log = logging.getLogger(__name__)


def _format_airflow_moved_table_name(source_table, version):
return "__".join([settings.AIRFLOW_MOVED_TABLE_PREFIX, version.replace(".", "_"), source_table])


@provide_session
def merge_conn(conn, session=None):
"""Add new Connection."""
Expand Down Expand Up @@ -697,91 +697,135 @@ def check_conn_type_null(session=None) -> Iterable[str]:
)


def _format_dangling_error(source_table, target_table, invalid_count, reason):
noun = "row" if invalid_count == 1 else "rows"
return (
f"The {source_table} table has {invalid_count} {noun} {reason}, which "
f"is invalid. We could not move them out of the way because the "
f"{target_table} table already exists in your database. Please either "
f"drop the {target_table} table, or manually delete the invalid rows "
f"from the {source_table} table."
)


def _move_dangling_run_data_to_new_table(session, source_table, target_table):
where_clause = "where dag_id is null or run_id is null or execution_date is null"
session.execute(text(f"create table {target_table} as select * from {source_table} {where_clause}"))
session.execute(text(f"delete from {source_table} {where_clause}"))


def check_run_id_null(session) -> Iterable[str]:
import sqlalchemy.schema

metadata = sqlalchemy.schema.MetaData(session.bind)
try:
metadata.reflect(only=["dag_run"])
metadata.reflect(only=[DagRun.__tablename__])
except exc.InvalidRequestError:
# Table doesn't exist -- empty db
return

dag_run = metadata.tables["dag_run"]

for colname in ('run_id', 'dag_id', 'execution_date'):

col = dag_run.columns.get(colname)
if col is None:
continue

if not col.nullable:
continue

num = session.query(dag_run).filter(col.is_(None)).count()
if num > 0:
yield (
f'The {dag_run.name} table has {num} row{"s" if num != 1 else ""} with a NULL value in '
f'{col.name!r}. You must manually correct this problem (possibly by deleting the problem '
'rows).'
# We can't use the model here since it may differ from the db state due to
# this function is run prior to migration. Use the reflected table instead.
dagrun_table = metadata.tables[DagRun.__tablename__]

invalid_dagrun_filter = or_(
dagrun_table.c.dag_id.is_(None),
dagrun_table.c.run_id.is_(None),
dagrun_table.c.execution_date.is_(None),
)
invalid_dagrun_count = session.query(dagrun_table.c.id).filter(invalid_dagrun_filter).count()
if invalid_dagrun_count > 0:
dagrun_dangling_table_name = _format_airflow_moved_table_name(dagrun_table.name, "2.2")
if dagrun_dangling_table_name in inspect(session.get_bind()).get_table_names():
yield _format_dangling_error(
source_table=dagrun_table.name,
target_table=dagrun_dangling_table_name,
invalid_count=invalid_dagrun_count,
reason="with a NULL dag_id, run_id, or execution_date",
)
session.rollback()
return
_move_dangling_run_data_to_new_table(session, dagrun_table.name, dagrun_dangling_table_name)


def _move_dangling_task_data_to_new_table(session, source_table, target_table):
where_clause = f"""
where (task_id, dag_id, execution_date) IN (
select source.task_id, source.dag_id, source.execution_date
from {source_table} as source
left join dag_run as dr
on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
where dr.id is null
)
"""
session.execute(text(f"create table {target_table} as select * from {source_table} {where_clause}"))
session.execute(text(f"delete from {source_table} {where_clause}"))


def check_task_tables_without_matching_dagruns(session) -> Iterable[str]:
from itertools import chain

import sqlalchemy.schema
from sqlalchemy import and_, outerjoin

metadata = sqlalchemy.schema.MetaData(session.bind)
models_to_dagrun = [TaskInstance, TaskFail]
models_to_ti = []
for model in models_to_dagrun + models_to_ti + [DagRun]:
models_to_dagrun = [TaskInstance, TaskReschedule]
for model in models_to_dagrun + [DagRun]:
try:
metadata.reflect(only=[model.__tablename__])
except exc.InvalidRequestError:
# Table doesn't exist, but try the other ones incase the user is upgrading from an _old_ DB
# version
pass

# Key table doesn't exist -- likely empty DB.
if DagRun.__tablename__ not in metadata or TaskInstance.__tablename__ not in metadata:
# Key table doesn't exist -- likely empty DB
session.rollback()
return

for (model, target) in chain(
((m, metadata.tables[DagRun.__tablename__]) for m in models_to_dagrun),
((m, metadata.tables[TaskInstance.__tablename__]) for m in models_to_ti),
):
table = metadata.tables.get(model.__tablename__)
if table is None:
# We can't use the model here since it may differ from the db state due to
# this function is run prior to migration. Use the reflected table instead.
dagrun_table = metadata.tables[DagRun.__tablename__]

existing_table_names = set(inspect(session.get_bind()).get_table_names())
errored = False

for model in models_to_dagrun:
# We can't use the model here since it may differ from the db state due to
# this function is run prior to migration. Use the reflected table instead.
source_table = metadata.tables.get(model.__tablename__)
if source_table is None:
continue
if 'run_id' in table.columns:
# Migration already applied, don't check again

# Migration already applied, don't check again.
if "run_id" in source_table.columns:
continue

# We can't use the model here (as that would have the associationproxy, we instead need to use the
# _reflected_ table)
join_cond = and_(table.c.dag_id == target.c.dag_id, table.c.execution_date == target.c.execution_date)
if "task_id" in target.columns:
join_cond = and_(join_cond, table.c.task_id == target.c.task_id)

query = (
session.query(table.c.dag_id, table.c.task_id, table.c.execution_date)
.select_from(outerjoin(table, target, join_cond))
.filter(target.c.dag_id.is_(None))
) # type: ignore

num = query.count()

if num > 0:
yield (
f'The {table.name} table has {num} row{"s" if num != 1 else ""} without a '
f'corresponding {target.name} row. You must manually correct this problem '
'(possibly by deleting the problem rows).'
source_to_dag_run_join_cond = and_(
source_table.c.dag_id == dagrun_table.c.dag_id,
source_table.c.execution_date == dagrun_table.c.execution_date,
)
invalid_row_count = (
session.query(source_table.c.dag_id, source_table.c.task_id, source_table.c.execution_date)
.select_from(outerjoin(source_table, dagrun_table, source_to_dag_run_join_cond))
.filter(dagrun_table.c.dag_id.is_(None))
.count()
)
if invalid_row_count <= 0:
continue

dangling_table_name = _format_airflow_moved_table_name(source_table.name, "2.2")
if dangling_table_name in existing_table_names:
yield _format_dangling_error(
source_table=source_table.name,
target_table=dangling_table_name,
invalid_count=invalid_row_count,
reason=f"without a corresponding {dagrun_table.name} row",
)
session.rollback()
errored = True
continue
_move_dangling_task_data_to_new_table(session, source_table.name, dangling_table_name)

if errored:
session.rollback()
else:
session.commit()


@provide_session
Expand Down
8 changes: 8 additions & 0 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@
{% for m in dashboard_alerts %}
{{ message(m.message, m.category) }}
{% endfor %}
{% for original_table_name, moved_table_name in migration_moved_data_alerts %}
{% call message(category='error', dismissable=false) %}
Airflow found incompatible data in the <code>{{ original_table_name }}</code> table in the
metadatabase, and has moved them to <code>{{ moved_table_name }}</code> during the database migration
to upgrade. Please inspect the moved data to decide whether you need to keep them, and manually drop
the <code>{{ moved_table_name }}</code> table to dismiss this warning.
{% endcall %}
{% endfor %}
{{ super() }}
{% if sqlite_warning | default(true) %}
{% call message(category='warning', dismissable=false) %}
Expand Down
13 changes: 12 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
from pendulum.parsing.exceptions import ParserError
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy import Date, and_, desc, func, union_all
from sqlalchemy import Date, and_, desc, func, inspect, union_all
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
from wtforms import SelectField, validators
Expand Down Expand Up @@ -692,10 +692,21 @@ def index(self):
fm for fm in settings.DASHBOARD_UIALERTS if fm.should_show(current_app.appbuilder.sm)
]

def _iter_parsed_moved_data_table_names():
for table_name in inspect(session.get_bind()).get_table_names():
segments = table_name.split("__", 2)
if len(segments) < 3:
continue
if segments[0] != settings.AIRFLOW_MOVED_TABLE_PREFIX:
continue
# Second segment is a version marker that we don't need to show.
yield segments[2], table_name

return self.render_template(
'airflow/dags.html',
dags=dags,
dashboard_alerts=dashboard_alerts,
migration_moved_data_alerts=sorted(set(_iter_parsed_moved_data_table_names())),
current_page=current_page,
search_query=arg_search_query if arg_search_query else '',
page_title=page_title,
Expand Down
2 changes: 1 addition & 1 deletion tests/www/views/test_views_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


def test_index(admin_client):
with assert_queries_count(48):
with assert_queries_count(49):
resp = admin_client.get('/', follow_redirects=True)
check_content_in_response('DAGs', resp)

Expand Down