Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
/docs/apache-airflow/concepts/dynamic-task-mapping.rst @uranusjr

# Async Operators & Triggerer
/airflow-core/src/airflow/cli/commands/local_commands/triggerer_command.py @dstandish @hussein-awala
/airflow-core/src/airflow/cli/commands/triggerer_command.py @dstandish @hussein-awala
/airflow-core/src/airflow/jobs/triggerer_job.py @dstandish @hussein-awala
/airflow-core/src/airflow/jobs/triggerer_job_runner.py @dstandish @hussein-awala
/docs/apache-airflow/authoring-and-scheduling/deferring.rst @dstandish @hussein-awala
Expand Down
8 changes: 4 additions & 4 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,9 @@ labelPRBasedOnFilePath:
- providers/**/log/*

area:Plugins:
- airflow-core/src/airflow/cli/commands/local_commands/plugins_command.py
- airflow-core/src/airflow/cli/commands/plugins_command.py
- airflow-core/src/airflow/plugins_manager.py
- airflow-core/tests/unit/cli/commands/local_commands/test_plugins_command.py
- airflow-core/tests/unit/cli/commands/test_plugins_command.py
- airflow-core/tests/unit/plugins/**/*
- airflow-core/docs/administration-and-deployment/plugins.rst

Expand All @@ -381,11 +381,11 @@ labelPRBasedOnFilePath:
- airflow-core/docs/security/secrets/**/*

area:Triggerer:
- airflow-core/src/airflow/cli/commands/local_commands/triggerer_command.py
- airflow-core/src/airflow/cli/commands/triggerer_command.py
- airflow-core/src/airflow/jobs/triggerer_job_runner.py
- airflow-core/src/airflow/models/trigger.py
- providers/standard/src/airflow/providers/standard/triggers/**/*
- airflow-core/tests/unit/cli/commands/local_commands/test_triggerer_command.py
- airflow-core/tests/unit/cli/commands/test_triggerer_command.py
- airflow-core/tests/unit/jobs/test_triggerer_job.py
- airflow-core/tests/unit/models/test_trigger.py
- providers/standard/tests/unit/standard/triggers/**/*
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api_fastapi/gunicorn_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def post_worker_init(_):
"""
Set process title.

This is used by airflow.cli.commands.local_commands.api_server_command to track the status of the worker.
This is used by airflow.cli.commands.api_server_command to track the status of the worker.
"""
old_title = setproctitle.getproctitle()
setproctitle.setproctitle(settings.GUNICORN_WORKER_READY_PREFIX + old_title)
178 changes: 85 additions & 93 deletions airflow-core/src/airflow/cli/cli_config.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
from typing import Any

from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.dag_processing.manager import DagFileProcessorManager, reload_configuration_for_dag_processing
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.job import Job, run_job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from __future__ import annotations

from airflow import settings
from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.security import kerberos as krb
from airflow.security.kerberos import KerberosMode
from airflow.utils import cli as cli_utils
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from multiprocessing import Process

from airflow import settings
from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job, run_job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from multiprocessing import Process

from airflow import settings
from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.jobs.job import Job, run_job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class TestGetJobs(TestJobEndpoint):
@pytest.mark.parametrize(
"testcase, query_params, expected_status_code, expected_total_entries",
[
# original testcases refactor from tests/cli/commands/remote_commands/test_jobs_command.py
# original testcases refactor from tests/cli/commands/test_jobs_command.py
(TESTCASE_ONE_SCHEDULER, {}, 200, 1),
(TESTCASE_ONE_SCHEDULER_WITH_HOSTNAME, {"hostname": "HOSTNAME"}, 200, 1),
(TESTCASE_HA_SCHEDULERS, {"limit": 100}, 200, 3),
Expand Down
16 changes: 0 additions & 16 deletions airflow-core/tests/unit/cli/commands/local_commands/__init__.py

This file was deleted.

16 changes: 0 additions & 16 deletions airflow-core/tests/unit/cli/commands/remote_commands/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pytest
from rich.console import Console

from airflow.cli.commands.local_commands import api_server_command
from airflow.cli.commands import api_server_command
from airflow.exceptions import AirflowConfigException

from unit.cli.commands._common_cli_classes import _CommonCLIGunicornTestClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import pytest

from airflow.cli import cli_parser
from airflow.cli.commands.remote_commands import asset_command
from airflow.cli.commands import asset_command

from tests_common.test_utils.db import clear_db_dags, clear_db_runs, parse_and_sync_to_db

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pendulum
import pytest

import airflow.cli.commands.remote_commands.backfill_command
import airflow.cli.commands.backfill_command
from airflow.cli import cli_parser
from airflow.models.backfill import ReprocessBehavior
from airflow.utils import timezone
Expand Down Expand Up @@ -62,7 +62,7 @@ def setup_method(self):
clear_db_dags()
clear_db_backfills()

@mock.patch("airflow.cli.commands.remote_commands.backfill_command._create_backfill")
@mock.patch("airflow.cli.commands.backfill_command._create_backfill")
@pytest.mark.parametrize(
"repro, expected_repro",
[
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_backfill(self, mock_create, repro, expected_repro):
repro,
]
)
airflow.cli.commands.remote_commands.backfill_command.create_backfill(self.parser.parse_args(args))
airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args))

mock_create.assert_called_once_with(
dag_id="example_bash_operator",
Expand All @@ -102,7 +102,7 @@ def test_backfill(self, mock_create, repro, expected_repro):
reprocess_behavior=expected_repro,
)

@mock.patch("airflow.cli.commands.remote_commands.backfill_command._do_dry_run")
@mock.patch("airflow.cli.commands.backfill_command._do_dry_run")
@pytest.mark.parametrize(
"reverse",
[False, True],
Expand All @@ -123,7 +123,7 @@ def test_backfill_dry_run(self, mock_dry_run, reverse):
]
if reverse:
args.append("--run-backwards")
airflow.cli.commands.remote_commands.backfill_command.create_backfill(self.parser.parse_args(args))
airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args))

mock_dry_run.assert_called_once_with(
dag_id="example_bash_operator",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import pytest

from airflow.cli import cli_parser
from airflow.cli.commands.remote_commands import config_command
from airflow.cli.commands.remote_commands.config_command import ConfigChange, ConfigParameter
from airflow.cli.commands import config_command
from airflow.cli.commands.config_command import ConfigChange, ConfigParameter

from tests_common.test_utils.config import conf_vars

Expand All @@ -38,8 +38,8 @@ class TestCliConfigList:
def setup_class(cls):
cls.parser = cli_parser.get_parser()

@mock.patch("airflow.cli.commands.remote_commands.config_command.StringIO")
@mock.patch("airflow.cli.commands.remote_commands.config_command.conf")
@mock.patch("airflow.cli.commands.config_command.StringIO")
@mock.patch("airflow.cli.commands.config_command.conf")
def test_cli_show_config_should_write_data(self, mock_conf, mock_stringio):
config_command.show_config(self.parser.parse_args(["config", "list", "--color", "off"]))
mock_conf.write.assert_called_once_with(
Expand All @@ -54,8 +54,8 @@ def test_cli_show_config_should_write_data(self, mock_conf, mock_stringio):
only_defaults=False,
)

@mock.patch("airflow.cli.commands.remote_commands.config_command.StringIO")
@mock.patch("airflow.cli.commands.remote_commands.config_command.conf")
@mock.patch("airflow.cli.commands.config_command.StringIO")
@mock.patch("airflow.cli.commands.config_command.conf")
def test_cli_show_config_should_write_data_specific_section(self, mock_conf, mock_stringio):
config_command.show_config(
self.parser.parse_args(["config", "list", "--section", "core", "--color", "off"])
Expand Down Expand Up @@ -222,7 +222,7 @@ def test_should_display_value(self):

assert temp_stdout.getvalue().strip() == "test_value"

@mock.patch("airflow.cli.commands.remote_commands.config_command.conf")
@mock.patch("airflow.cli.commands.config_command.conf")
def test_should_not_raise_exception_when_section_for_config_with_value_defined_elsewhere_is_missing(
self, mock_conf
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import pytest

from airflow.cli import cli_config, cli_parser
from airflow.cli.commands.remote_commands import connection_command
from airflow.cli.commands import connection_command
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.utils.db import merge_conn
Expand Down Expand Up @@ -986,7 +986,7 @@ def test_cli_connections_test_disabled_by_default(self):


class TestCliCreateDefaultConnection:
@mock.patch("airflow.cli.commands.remote_commands.connection_command.db_create_default_connections")
@mock.patch("airflow.cli.commands.connection_command.db_create_default_connections")
def test_cli_create_default_connections(self, mock_db_create_default_connections):
create_default_connection_fnc = dict(
(db_command.name, db_command.func) for db_command in cli_config.CONNECTIONS_COMMANDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from airflow import settings
from airflow.cli import cli_parser
from airflow.cli.commands.remote_commands import dag_command
from airflow.cli.commands import dag_command
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.models import DagBag, DagModel, DagRun
Expand Down Expand Up @@ -95,7 +95,7 @@ def test_show_dag_dependencies_print(self):
assert "digraph" in out
assert "graph [rankdir=LR]" in out

@mock.patch("airflow.cli.commands.remote_commands.dag_command.render_dag_dependencies")
@mock.patch("airflow.cli.commands.dag_command.render_dag_dependencies")
def test_show_dag_dependencies_save(self, mock_render_dag_dependencies):
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
dag_command.dag_dependencies_show(
Expand All @@ -115,7 +115,7 @@ def test_show_dag_print(self):
assert "graph [label=example_bash_operator labelloc=t rankdir=LR]" in out
assert "runme_2 -> run_after_loop" in out

@mock.patch("airflow.cli.commands.remote_commands.dag_command.render_dag")
@mock.patch("airflow.cli.commands.dag_command.render_dag")
def test_show_dag_save(self, mock_render_dag):
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
dag_command.dag_show(
Expand All @@ -127,8 +127,8 @@ def test_show_dag_save(self, mock_render_dag):
)
assert "File awesome.png saved" in out

@mock.patch("airflow.cli.commands.remote_commands.dag_command.subprocess.Popen")
@mock.patch("airflow.cli.commands.remote_commands.dag_command.render_dag")
@mock.patch("airflow.cli.commands.dag_command.subprocess.Popen")
@mock.patch("airflow.cli.commands.dag_command.render_dag")
def test_show_dag_imgcat(self, mock_render_dag, mock_popen):
mock_render_dag.return_value.pipe.return_value = b"DOT_DATA"
mock_proc = mock.MagicMock()
Expand Down Expand Up @@ -381,7 +381,7 @@ def test_pause(self):
dag_command.dag_unpause(args)
assert not DagModel.get_dagmodel("example_bash_operator").is_paused

@mock.patch("airflow.cli.commands.remote_commands.dag_command.ask_yesno")
@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", "^example_.*$", "--treat-dag-id-as-regex"])
dag_command.dag_pause(args)
Expand All @@ -396,15 +396,15 @@ def test_pause_regex(self, mock_yesno):
assert not DagModel.get_dagmodel("example_kubernetes_executor").is_paused
assert not DagModel.get_dagmodel("example_xcom_args").is_paused

@mock.patch("airflow.cli.commands.remote_commands.dag_command.ask_yesno")
@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_operation_cancelled(self, ask_yesno, capsys):
args = self.parser.parse_args(["dags", "pause", "example_bash_operator", "--treat-dag-id-as-regex"])
ask_yesno.return_value = False
dag_command.dag_pause(args)
stdout = capsys.readouterr().out
assert "Operation cancelled by user" in stdout

@mock.patch("airflow.cli.commands.remote_commands.dag_command.ask_yesno")
@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_yes(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", ".*", "--treat-dag-id-as-regex", "--yes"])
dag_command.dag_pause(args)
Expand Down Expand Up @@ -564,7 +564,7 @@ def test_dag_state(self):
is None
)

@mock.patch("airflow.cli.commands.remote_commands.dag_command._parse_and_get_dag")
@mock.patch("airflow.cli.commands.dag_command._parse_and_get_dag")
def test_dag_test(self, mock_parse_and_get_dag):
cli_args = self.parser.parse_args(["dags", "test", "example_bash_operator", DEFAULT_DATE.isoformat()])
dag_command.dag_test(cli_args)
Expand All @@ -583,7 +583,7 @@ def test_dag_test(self, mock_parse_and_get_dag):
]
)

@mock.patch("airflow.cli.commands.remote_commands.dag_command._parse_and_get_dag")
@mock.patch("airflow.cli.commands.dag_command._parse_and_get_dag")
def test_dag_test_fail_raise_error(self, mock_parse_and_get_dag):
logical_date_str = DEFAULT_DATE.isoformat()
mock_parse_and_get_dag.return_value.test.return_value = DagRun(
Expand All @@ -593,7 +593,7 @@ def test_dag_test_fail_raise_error(self, mock_parse_and_get_dag):
with pytest.raises(SystemExit, match=r"DagRun failed"):
dag_command.dag_test(cli_args)

@mock.patch("airflow.cli.commands.remote_commands.dag_command._parse_and_get_dag")
@mock.patch("airflow.cli.commands.dag_command._parse_and_get_dag")
@mock.patch("airflow.utils.timezone.utcnow")
def test_dag_test_no_logical_date(self, mock_utcnow, mock_parse_and_get_dag):
now = pendulum.now()
Expand All @@ -618,7 +618,7 @@ def test_dag_test_no_logical_date(self, mock_utcnow, mock_parse_and_get_dag):
]
)

@mock.patch("airflow.cli.commands.remote_commands.dag_command._parse_and_get_dag")
@mock.patch("airflow.cli.commands.dag_command._parse_and_get_dag")
def test_dag_test_conf(self, mock_parse_and_get_dag):
cli_args = self.parser.parse_args(
[
Expand Down Expand Up @@ -646,10 +646,8 @@ def test_dag_test_conf(self, mock_parse_and_get_dag):
]
)

@mock.patch(
"airflow.cli.commands.remote_commands.dag_command.render_dag", return_value=MagicMock(source="SOURCE")
)
@mock.patch("airflow.cli.commands.remote_commands.dag_command._parse_and_get_dag")
@mock.patch("airflow.cli.commands.dag_command.render_dag", return_value=MagicMock(source="SOURCE"))
@mock.patch("airflow.cli.commands.dag_command._parse_and_get_dag")
def test_dag_test_show_dag(self, mock_parse_and_get_dag, mock_render_dag):
mock_parse_and_get_dag.return_value.test.return_value.run_id = (
"__test_dag_test_show_dag_fake_dag_run_run_id__"
Expand Down
Loading