Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ install_requires =
termcolor>=1.1.0
# typing-extensions can be removed under two scenarios: dropping support for python 3.7
# or bumping the minimum version of airflow for providers to 2.2.* which would allow the use of airflow.typing_compat
typing-extensions>=3.7.4
# Kubernetes Tests also rely on typing-extensions < 4.0.0 - fixing the tests should allow to remove the upperbound
typing-extensions>=3.7.4,<4.0.0
unicodecsv>=0.14.1
# Werkzeug is known to cause breaking changes and it is very closely tied with FlaskAppBuilder and other
# Flask dependencies and the limit to 1.* line should be reviewed when we upgrade Flask and remove
Expand Down
7 changes: 3 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,9 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
dask = [
# Dask support is limited, we need Dask team to upgrade support for dask if we were to continue
# Supporting it in the future
# TODO: upgrade libraries used or maybe deprecate and drop DASK support
'cloudpickle>=1.4.1, <1.5.0',
'dask>=2.9.0, <2021.6.1', # dask 2021.6.1 does not work with `distributed`
'distributed>=2.11.1, <2.20',
'cloudpickle>=1.4.1',
'dask>=2.9.0',
'distributed>=2.11.1',
]
databricks = [
'requests>=2.26.0, <3',
Expand Down
18 changes: 14 additions & 4 deletions tests/executors/test_dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,39 @@
from unittest import mock

import pytest
from distributed import LocalCluster

from airflow.exceptions import AirflowException
from airflow.executors.dask_executor import DaskExecutor
from airflow.jobs.backfill_job import BackfillJob
from airflow.models import DagBag
from airflow.utils import timezone
from tests.test_utils.config import conf_vars

try:
from distributed import LocalCluster

# utility functions imported from the dask testing suite to instantiate a test
# cluster for tls tests
from distributed import tests # noqa
from distributed.utils_test import cluster as dask_testing_cluster, get_cert, tls_security

from airflow.executors.dask_executor import DaskExecutor

skip_tls_tests = False
except ImportError:
skip_tls_tests = True
# In case the tests are skipped because of lacking test harness, get_cert should be
# overridden to avoid get_cert failing during test discovery as get_cert is used
# in conf_vars decorator
get_cert = lambda x: x

DEFAULT_DATE = timezone.datetime(2017, 1, 1)
SUCCESS_COMMAND = ['airflow', 'tasks', 'run', '--help']
FAIL_COMMAND = ['airflow', 'tasks', 'run', 'false']

# For now we are temporarily removing Dask support until we get Dask Team help us in making the
# tests pass again
skip_dask_tests = True

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk , Out of curiosity, is this strictly maintained by the Dask team, is this something I can take a look, I was looking also at options of connecting to a Local Dask cluster along with distributed.

@potiuk potiuk Mar 5, 2022

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not. I started discussion on that here https://lists.apache.org/thread/6stgcpjt5jb3xfw92oo1j486j33c8v7m

This is is a second time I start similar discussion - the previous time it was in Jan 2020 https://lists.apache.org/thread/875fpgb7vfpmtxrmt19jmo8d3p6mgqnh and then Dask team chimed in and helped in fixing the tests.

But more than 1 year later we have similar problem.

I also asked at Dask's disccoure whether they can help again: https://dask.discourse.group/t/potential-removal-of-dask-executor-support-in-airflow/433



@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team")
@pytest.mark.skip(reason="The tests are skipped because it needs testing from Dask team")

skipif is used to dynamically skip tests depending on a dynamic value, e.g. in the active database backend.

@potiuk potiuk Mar 6, 2022

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to give an easy way for Dask committers to be able to "play" with it. it's easier to set up one variable to True to do it, rather than manually remove @pytest.mark.skip for all test cases.

This is really a "temporary" state I wanted to make easy for them to work on (same as last time).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine the Dask developers will do it this way:

  1. setup Breeze
  2. get Dask service to run tests on
  3. set skip_dag_tests to False
  4. fix the tests
  5. remove skips

By having single flag to switch that enables all tests it is just easier to not forget about removing some of the skips.

class TestBaseDask(unittest.TestCase):
def assert_tasks_on_executor(self, executor, timeout_executor=120):

Expand Down Expand Up @@ -75,6 +83,7 @@ def assert_tasks_on_executor(self, executor, timeout_executor=120):
assert fail_future.exception() is not None


@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team")
class TestDaskExecutor(TestBaseDask):
def setUp(self):
self.dagbag = DagBag(include_examples=True)
Expand Down Expand Up @@ -148,6 +157,7 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock
mock_stats_gauge.assert_has_calls(calls)


@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team")
class TestDaskExecutorQueue(unittest.TestCase):
def test_dask_queues_no_resources(self):
self.cluster = LocalCluster()
Expand Down