Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
import textwrap
import time
from typing import TYPE_CHECKING, Any, Sequence
from typing import TYPE_CHECKING, Any, Callable, Sequence
from urllib.parse import urlencode

from flask import request, url_for
Expand All @@ -36,6 +36,7 @@
from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from pygments.lexer import Lexer
from sqlalchemy import delete, func, types
from sqlalchemy.ext.associationproxy import AssociationProxy

Expand Down Expand Up @@ -546,20 +547,35 @@ def pygment_html_render(s, lexer=lexers.TextLexer):
return highlight(s, lexer(), HtmlFormatter(linenos=True))


def render(obj, lexer):
def render(obj: Any, lexer: Lexer, handler: Callable[[Any], str] | None = None):
"""Render a given Python object with a given Pygments lexer."""
out = ""
if isinstance(obj, str):
out = Markup(pygment_html_render(obj, lexer))
return Markup(pygment_html_render(obj, lexer))

elif isinstance(obj, (tuple, list)):
out = ""
for i, text_to_render in enumerate(obj):
if lexer is lexers.PythonLexer:
text_to_render = repr(text_to_render)
out += Markup("<div>List item #{}</div>").format(i)
out += Markup("<div>" + pygment_html_render(text_to_render, lexer) + "</div>")
return out

elif isinstance(obj, dict):
out = ""
for k, v in obj.items():
if lexer is lexers.PythonLexer:
v = repr(v)
out += Markup('<div>Dict item "{}"</div>').format(k)
out += Markup("<div>" + pygment_html_render(v, lexer) + "</div>")
return out
return out

elif handler is not None and obj is not None:
return Markup(pygment_html_render(handler(obj), lexer))

else:
# Return empty string otherwise
return ""


def json_render(obj, lexer):
Expand Down Expand Up @@ -600,8 +616,8 @@ def get_attr_renderer():
"mysql": lambda x: render(x, lexers.MySqlLexer),
"postgresql": lambda x: render(x, lexers.PostgresLexer),
"powershell": lambda x: render(x, lexers.PowerShellLexer),
"py": lambda x: render(get_python_source(x), lexers.PythonLexer),
"python_callable": lambda x: render(get_python_source(x), lexers.PythonLexer),
"py": lambda x: render(x, lexers.PythonLexer, get_python_source),
"python_callable": lambda x: render(x, lexers.PythonLexer, get_python_source),
"rst": lambda x: render(x, lexers.RstLexer),
"sql": lambda x: render(x, lexers.SqlLexer),
"tsql": lambda x: render(x, lexers.TransactSqlLexer),
Expand Down
6 changes: 1 addition & 5 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,11 +1475,7 @@ def rendered_templates(self, session):
content = getattr(task, template_field)
renderer = task.template_fields_renderers.get(template_field, template_field)
if renderer in renderers:
if isinstance(content, (dict, list)):
json_content = json.dumps(content, sort_keys=True, indent=4)
html_dict[template_field] = renderers[renderer](json_content)
else:
html_dict[template_field] = renderers[renderer](content)
html_dict[template_field] = renderers[renderer](content)
else:
html_dict[template_field] = Markup("<pre><code>{}</pre></code>").format(pformat(content))

Expand Down
78 changes: 76 additions & 2 deletions tests/www/views/test_views_rendered.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from markupsafe import escape

from airflow.models import DAG, RenderedTaskInstanceFields, Variable
from airflow.models.baseoperator import BaseOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down Expand Up @@ -64,6 +66,39 @@ def task2(dag):
)


@pytest.fixture()
def task3(dag):
class TestOperator(BaseOperator):
template_fields = ("sql",)

def __init__(self, *, sql, **kwargs):
super().__init__(**kwargs)
self.sql = sql

def execute(self, context):
pass

return TestOperator(
task_id="task3",
sql=["SELECT 1;", "SELECT 2;"],
dag=dag,
)


@pytest.fixture()
def task4(dag):
def func(*op_args):
pass

return PythonOperator(
task_id="task4",
python_callable=func,
op_args=["{{ task_instance_key_str }}_args"],
op_kwargs={"0": "{{ task_instance_key_str }}_kwargs"},
dag=dag,
)


@pytest.fixture()
def task_secret(dag):
return BashOperator(
Expand All @@ -85,15 +120,15 @@ def init_blank_db():


@pytest.fixture(autouse=True)
def reset_db(dag, task1, task2, task_secret):
def reset_db(dag, task1, task2, task3, task4, task_secret):
yield
clear_db_dags()
clear_db_runs()
clear_rendered_ti_fields()


@pytest.fixture()
def create_dag_run(dag, task1, task2, task_secret):
def create_dag_run(dag, task1, task2, task3, task4, task_secret):
def _create_dag_run(*, execution_date, session):
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
Expand All @@ -108,6 +143,10 @@ def _create_dag_run(*, execution_date, session):
ti2.state = TaskInstanceState.SCHEDULED
ti3 = dag_run.get_task_instance(task_secret.task_id, session=session)
ti3.state = TaskInstanceState.QUEUED
ti4 = dag_run.get_task_instance(task3.task_id, session=session)
ti4.state = TaskInstanceState.SUCCESS
ti5 = dag_run.get_task_instance(task4.task_id, session=session)
ti5.state = TaskInstanceState.SUCCESS
session.flush()
return dag_run

Expand Down Expand Up @@ -290,3 +329,38 @@ def test_rendered_task_detail_env_secret(patch_app, admin_client, request, env,
if request.node.callspec.id.endswith("-tpld-var"):
Variable.delete("plain_var")
Variable.delete("secret_var")


@pytest.mark.usefixtures("patch_app")
def test_rendered_template_view_for_list_template_field_args(admin_client, create_dag_run, task3):
"""
Test that the Rendered View can show a list of syntax-highlighted SQL statements
"""
assert task3.sql == ["SELECT 1;", "SELECT 2;"]

with create_session() as session:
create_dag_run(execution_date=DEFAULT_DATE, session=session)

url = f"rendered-templates?task_id=task3&dag_id=testdag&execution_date={quote_plus(str(DEFAULT_DATE))}"

resp = admin_client.get(url, follow_redirects=True)
check_content_in_response("List item #0", resp)
check_content_in_response("List item #1", resp)


@pytest.mark.usefixtures("patch_app")
def test_rendered_template_view_for_op_args(admin_client, create_dag_run, task4):
"""
Test that the Rendered View can show rendered values in op_args and op_kwargs
"""
assert task4.op_args == ["{{ task_instance_key_str }}_args"]
assert list(task4.op_kwargs.values()) == ["{{ task_instance_key_str }}_kwargs"]

with create_session() as session:
create_dag_run(execution_date=DEFAULT_DATE, session=session)

url = f"rendered-templates?task_id=task4&dag_id=testdag&execution_date={quote_plus(str(DEFAULT_DATE))}"

resp = admin_client.get(url, follow_redirects=True)
check_content_in_response("testdag__task4__20200301_args", resp)
check_content_in_response("testdag__task4__20200301_kwargs", resp)