Skip to content

Fix celery executor bug trying to call len on map#14883

Merged
ashb merged 5 commits into
apache:masterfrom
RNHTTR:celery-executor-map-bug
Apr 6, 2021
Merged

Fix celery executor bug trying to call len on map#14883
ashb merged 5 commits into
apache:masterfrom
RNHTTR:celery-executor-map-bug

Conversation

@RNHTTR

@RNHTTR RNHTTR commented Mar 18, 2021

Copy link
Copy Markdown
Contributor

When the celery executor tries to adopt task instances, and there are indeed task instances to adopt, bulk_state_fetcher.get_many is called, passing a map object. If the celery result_backend is not an instance of BaseKeyValueStoreBackend or DatabaseBackend, the method _get_many_using_multiprocessing will be called. This method attempts to get the len of its parameter, but you can't take the length of a map object. So, it needs to be converted to a list (or perhaps another iterable?) first. Since there is a debug statement that first takes the len before _get_many_using_multiprocessing is called, it makes sense to convert the map object to a list immediately prior even if it wasn't handled in the else block.

I wasn't able to actually reproduce the issue with Airflow, but I was able to reproduce it with the test_try_adopt_task_instances test by setting the celery result backend to rpc:// and the broker to redis.

closes: #14163

@boring-cyborg boring-cyborg Bot added the area:Scheduler including HA (high availability) scheduler label Mar 18, 2021
@RNHTTR

RNHTTR commented Mar 22, 2021

Copy link
Copy Markdown
Contributor Author

@ashb @kaxil Any chance I can get a review for this?

@ashb ashb added this to the Airflow 2.0.2 milestone Mar 22, 2021
Comment thread airflow/executors/celery_executor.py Outdated
Comment on lines 552 to 555

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a test -- similar to the one you mentioned in the PR description

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaxil Done. One of these days I'll have a PR that includes a test ;)

@RNHTTR RNHTTR Mar 26, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaxil Looks like the failed build is due to a bug that was corrected yesterday #14986 actually i just re-rebased and pushed again.

@github-actions

Copy link
Copy Markdown
Contributor

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

Comment thread tests/executors/test_celery_executor.py Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to set logging to debug for this test to cover the new code?

@RNHTTR RNHTTR Mar 27, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

I refactored the tests for the possible iterations of get_many to set the log mode to DEBUG and updated the debug statement to account for being a map object vs being a list object. If it's a map object and the backend is not a base backend, the length of async_results will always be 0 because it comes in as an iterator and _tasks_list_to_task_ids converts it to a set.

Comment thread airflow/executors/celery_executor.py Outdated
Comment on lines 555 to 558

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(async_results, map):
self.log.debug("Fetched state for %d task(s)", len(result))
else:
self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results))
self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results))

I think this should be enough, as you have already changed the type on L552.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if it's not a keyvalue or database celery backend. If it's a keyvalue or database backend, _tasks_list_to_task_ids is called in _get_many_from_kv_backend and _get_many_from_db_backend, respectively. In these cases, async_results will be length 0 if converted to a list before calling _get_many_from_kv_backend (which I think would be kinda redundant).

Comment thread airflow/executors/celery_executor.py Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async_results = list(async_results) if isinstance(async_results, map) else async_results
if isinstance(async_results, map):
async_results = list(async_results)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other possible fix is to change the one call that passes a map (L478) to be list(map(...))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably the most intuitive fix, but in the case of the recommended backends (i.e. BaseKeyValueStoreBackend or DatabaseBackend, we'd be converting it to a list and then immediately creating a set based off that list instead of just creating the set based on the map object. Do you think that could have a potential performance hit if a lot of tasks are running?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb thoughts?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RNHTTR I think most of the cases are not large enough for this to make much of a difference - the only map case is in code that runs once every 30s, and is not performance critical, so for ease of understanding, lets change the one usage instead of complicating this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. PR updated. Any resources you recommend on diving into some of airflow's trickier topics? I can't seem to find much on task adoption for example. We run managed airflow, so a lot of airflow's challenging aspects are abstracted away.

@kaxil kaxil Apr 2, 2021

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task Adoption is when running a scheduler, if it dies another scheduler is able to "adopt" tasks started by the first Scheduler/executor -- this currently works for Celery, Kubernetes (and CeleryKubernete) executor

Code:

Scheduler:

def adopt_or_reset_orphaned_tasks(self, session: Session = None):

Celery:

def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
# See which of the TIs are still alive (or have finished even!)
#
# Since Celery doesn't store "SENT" state for queued commands (if we create an AsyncResult with a made
# up id it just returns PENDING state for it), we have to store Celery's task_id against the TI row to
# look at in future.
#
# This process is not perfect -- we could have sent the task to celery, and crashed before we were
# able to record the AsyncResult.task_id in the TaskInstance table, in which case we won't adopt the
# task (it'll either run and update the TI state, or the scheduler will clear and re-queue it. Either
# way it won't get executed more than once)
#
# (If we swapped it around, and generated a task_id for Celery, stored that in TI and enqueued that
# there is also still a race condition where we could generate and store the task_id, but die before
# we managed to enqueue the command. Since neither way is perfect we always have to deal with this
# process not being perfect.)
celery_tasks = {}
not_adopted_tis = []
for ti in tis:
if ti.external_executor_id is not None:
celery_tasks[ti.external_executor_id] = (AsyncResult(ti.external_executor_id), ti)
else:
not_adopted_tis.append(ti)
if not celery_tasks:
# Nothing to adopt
return tis
states_by_celery_task_id = self.bulk_state_fetcher.get_many(
map(operator.itemgetter(0), celery_tasks.values())
)
adopted = []
cached_celery_backend = next(iter(celery_tasks.values()))[0].backend
for celery_task_id, (state, info) in states_by_celery_task_id.items():
result, ti = celery_tasks[celery_task_id]
result.backend = cached_celery_backend
# Set the correct elements of the state dicts, then update this
# like we just queried it.
self.adopted_task_timeouts[ti.key] = ti.queued_dttm + self.task_adoption_timeout
self.tasks[ti.key] = result
self.running.add(ti.key)
self.update_task_state(ti.key, state, info)
adopted.append(f"{ti} in state {state}")
if adopted:
task_instance_str = '\n\t'.join(adopted)
self.log.info(
"Adopted the following %d tasks from a dead executor\n\t%s", len(adopted), task_instance_str
)
return not_adopted_tis

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Kaxil that's helpful. Do you think it'd be a good idea to put together a GLOSSARY.rst or similar to capture some of these more nuanced concepts?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah probably

@ashb

ashb commented Mar 29, 2021

Copy link
Copy Markdown
Member

@RNHTTR I've just taken another detailed look -- and I think there may be a simpler fix.

@ashb ashb added the priority:medium Bug that should be fixed before next release but would not block a release label Mar 31, 2021
@RNHTTR

RNHTTR commented Apr 4, 2021

Copy link
Copy Markdown
Contributor Author

@ashb @kaxil bump

@kaxil kaxil requested a review from ashb April 4, 2021 15:06
@ashb ashb merged commit 4ee4429 into apache:master Apr 6, 2021
ashb pushed a commit that referenced this pull request Apr 6, 2021
Co-authored-by: RNHTTR <ryan@wiftapp.com>
(cherry picked from commit 4ee4429)
ashb pushed a commit that referenced this pull request Apr 15, 2021
Co-authored-by: RNHTTR <ryan@wiftapp.com>
(cherry picked from commit 4ee4429)
@jayitapanda17

Copy link
Copy Markdown

@ashb @hbrls @RNHTTR I am getting similar issue when trying to start airflow webserver with version 2.0.2. File "/usr/local/bin/airflow", line 8, in
sys.exit(main())
File "/usr/local/lib/python3.6/site-packages/airflow/main.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/webserver_command.py", line 360, in webserver
app = cached_app(None)
File "/usr/local/lib/python3.6/site-packages/airflow/www/app.py", line 145, in cached_app
app = create_app(config=config, testing=testing)
File "/usr/local/lib/python3.6/site-packages/airflow/www/app.py", line 75, in create_app
flask_app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True)
File "/usr/local/lib/python3.6/site-packages/flask/config.py", line 132, in from_pyfile
exec(compile(config_file.read(), filename, "exec"), d.dict)
File "/root/airflow//webserver_config.py", line 21, in
from flask_appbuilder.security.manager import AUTH_DB
File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/security/manager.py", line 13, in
from flask_openid import OpenID
File "/usr/local/lib/python3.6/site-packages/flask_openid.py", line 26, in
from openid.store.filestore import FileOpenIDStore
File "/usr/local/lib/python3.6/site-packages/openid/init.py", line 52, in
if len(version_info) != 3:
TypeError: object of type 'map' has no len(). let me know how to resolve this.thanks

@ashb

ashb commented Nov 17, 2022

Copy link
Copy Markdown
Member

@jayitapanda17 2.0.2 is two years old now, and the stack trace is in the openid library by the looks of things, so we can't really help unless your can upgrade. Sorry!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler priority:medium Bug that should be fixed before next release but would not block a release

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TypeError: object of type 'map' has no len(): When celery executor multi-processes to get Task Instances

5 participants