diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 5711342e04d97..b73c8992d8a73 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -195,7 +195,13 @@ def heartbeat_callback(self, session=None): recorded_pid = ti.pid same_process = recorded_pid == current_pid - if ti.run_as_user or self.task_runner.run_as_user: + if recorded_pid is not None and (ti.run_as_user or self.task_runner.run_as_user): + # when running as another user, compare the task runner pid to the parent of + # the recorded pid because user delegation becomes an extra process level. + # However, if recorded_pid is None, pass that through as it signals the task + # runner process has already completed and been cleared out. `psutil.Process` + # uses the current process if the parameter is None, which is not what is intended + # for comparison. recorded_pid = psutil.Process(ti.pid).ppid() same_process = recorded_pid == current_pid diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 362bba18287b2..74348f8f43645 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -154,6 +154,16 @@ def test_localtaskjob_heartbeat(self, dag_maker): with pytest.raises(AirflowException): job1.heartbeat_callback() + # Now, set the ti.pid to None and test that no error + # is raised. + ti.pid = None + session.merge(ti) + session.commit() + assert ti.pid != job1.task_runner.process.pid + assert not ti.run_as_user + assert not job1.task_runner.run_as_user + job1.heartbeat_callback() + @mock.patch('subprocess.check_call') @mock.patch('airflow.jobs.local_task_job.psutil') def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker): @@ -196,6 +206,16 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker with pytest.raises(AirflowException, match='PID of job runner does not match'): job1.heartbeat_callback() + # Here we set the ti.pid to None and test that no error is + # raised + ti.pid = None + session.merge(ti) + session.commit() + assert ti.run_as_user + assert job1.task_runner.run_as_user == ti.run_as_user + assert ti.pid != job1.task_runner.process.pid + job1.heartbeat_callback() + @conf_vars({('core', 'default_impersonation'): 'testuser'}) @mock.patch('subprocess.check_call') @mock.patch('airflow.jobs.local_task_job.psutil') @@ -239,6 +259,16 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _, with pytest.raises(AirflowException, match='PID of job runner does not match'): job1.heartbeat_callback() + # Now, set the ti.pid to None and test that no error + # is raised. + ti.pid = None + session.merge(ti) + session.commit() + assert job1.task_runner.run_as_user == 'testuser' + assert ti.run_as_user is None + assert ti.pid != job1.task_runner.process.pid + job1.heartbeat_callback() + def test_heartbeat_failed_fast(self): """ Test that task heartbeat will sleep when it fails fast