Updated timeout parameter in peek_stdout#46854
Conversation
jscheffl
left a comment
There was a problem hiding this comment.
Sounds promising and I can confirm this change fixes the problem in our env (pathing locally). But before merge would like to have another pair of eyes of experts.
There was a problem hiding this comment.
Copilot reviewed 1 out of 1 changed files in this pull request and generated no comments.
Comments suppressed due to low confidence (2)
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:868
- Replacing the explicit call to resp.update(timeout=1) with a timeout parameter inside peek_stdout may change the timing behavior of output polling. Please verify that this new implementation correctly handles chunked output for large XCom values as intended.
while resp.peek_stdout(timeout=1):
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:871
- The stderr loop continues using peek_stderr() without a timeout, which is inconsistent with the updated stdout loop. Consider evaluating whether a similar timeout parameter is needed for stderr to avoid potential blocking or timing issues.
while resp.peek_stderr():
| resp.update(timeout=1) | ||
| while resp.peek_stdout(): |
There was a problem hiding this comment.
(I'll preface this by saying I don't have hands on experience here)
Are you sure this is the right approach? Is there anything to peek if we don't update? I wonder if the real fix is not to return after reading the first chunk?
This is probably worth adding a test to cover this, maybe an integration test. If nothing else, that'll make me more confident this is the right fix.
|
I gave this a spin with the following DAG, and it would sometimes still retrieve just part of the xcom file. from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
def consume_xcom(**context):
ti = context['ti']
data = ti.xcom_pull(task_ids='produce')
if data is None:
raise ValueError("No data was received from XCom.")
if len(data) != (105 * 1024):
raise ValueError(f"Data has unexpected length: {len(data)}")
print(f"Successfully retrieved xcom value with length: {len(data)}")
with DAG(dag_id='xcom_large_dag', schedule=None):
produce = KubernetesPodOperator(
task_id="produce",
name="produce",
namespace="default",
image="python:3.12-slim",
cmds=["python", "-c"],
arguments=[
"import json; data = 'A' * (105 * 1024); "
"json.dump(data, open('/airflow/xcom/return.json', 'w'))"
],
get_logs=True,
do_xcom_push=True,
)
consume = PythonOperator(
task_id='consume',
python_callable=consume_xcom,
provide_context=True
)
produce >> consume |
|
This PR is super-seeded by #47568 |
Updated timeout parameter in peek_stdout call to support large XCom values (~100kb) which can arrive in chunks.
Fixes: #39267
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.