Apache Airflow Provider(s)
google
Versions of Apache Airflow Providers
apache-airflow-providers-google == 10.21.0
Apache Airflow version
2.9.3
Operating System
linux/arm64
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
My DAG kicks off a Kubernetes job using a GKEStartJobOperator. The operator has the following parameters set:
- deferrable=True
- wait_until_job_complete=True
- job_poll_interval=60
When the task created from the GKEStartJobOperator is executed, the deferred task polls the job every 10 seconds and logs the message The job 'name-of-my-job' is incomplete. Sleeping for 10 sec.
What you think should happen instead
In the above example, the job should poll every 60 seconds, not 10 seconds.
This is happening because GKEJobTrigger isn't being passed job_poll_interval from GKEStartJobOperator which causes a AsyncKubernetesHook.wait_until_job_complete function to default the value of poll_interval 10.
The faulty code is in airflow/airflow/providers/google/cloud/triggers/kubernetes_engine.py in the method GKEJobTrigger.run on line 320.
This:
job: V1Job = await self.hook.wait_until_job_complete(name=self.job_name, namespace=self.job_namespace)
Should be this:
job: V1Job = await self.hook.wait_until_job_complete(name=self.job_name, namespace=self.job_namespace, poll_interval=self.poll_interval)
How to reproduce
- In GCP, create a GKE cluster with a default node pool.
- Create a GCP service account with the
roles/container.developer role.
- Create a connection in Airflow that uses that service account.
- Create a DAG that uses a GKEStartJobOperator.
- Configure the GKE job to run for 60 seconds.
- On the GKEStartJobOperator, set deferrable=True, wait_until_job_complete=True, job_poll_interval to 20 on the GKEStartJobOperator.
- Execute the DAG. Verify that the kubernetes job task is deferred at some point.
- View the logs for the kubernetes job task. Verify that the task polled the job every 10 seconds, not every 20 seconds.
Anything else
The doc string for GKEStartJobOperator says that the poll interval parameter is called poll_interval, but the init says that it should be job_poll_interval. The doc string should be changed to say job_poll_interval.
Are you willing to submit PR?
Code of Conduct
Apache Airflow Provider(s)
google
Versions of Apache Airflow Providers
apache-airflow-providers-google == 10.21.0
Apache Airflow version
2.9.3
Operating System
linux/arm64
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
My DAG kicks off a Kubernetes job using a GKEStartJobOperator. The operator has the following parameters set:
When the task created from the GKEStartJobOperator is executed, the deferred task polls the job every 10 seconds and logs the message
The job 'name-of-my-job' is incomplete. Sleeping for 10 sec.What you think should happen instead
In the above example, the job should poll every 60 seconds, not 10 seconds.
This is happening because GKEJobTrigger isn't being passed job_poll_interval from GKEStartJobOperator which causes a AsyncKubernetesHook.wait_until_job_complete function to default the value of poll_interval 10.
The faulty code is in airflow/airflow/providers/google/cloud/triggers/kubernetes_engine.py in the method GKEJobTrigger.run on line 320.
This:
job: V1Job = await self.hook.wait_until_job_complete(name=self.job_name, namespace=self.job_namespace)Should be this:
job: V1Job = await self.hook.wait_until_job_complete(name=self.job_name, namespace=self.job_namespace, poll_interval=self.poll_interval)How to reproduce
roles/container.developerrole.Anything else
The doc string for GKEStartJobOperator says that the poll interval parameter is called
poll_interval, but the init says that it should bejob_poll_interval. The doc string should be changed to sayjob_poll_interval.Are you willing to submit PR?
Code of Conduct